| @@ -0,0 +1 @@ | |||
| __pycache__ | |||
| @@ -0,0 +1,21 @@ | |||
| MIT License | |||
| Copyright (c) 2022 Nathaniel Walizer | |||
| Permission is hereby granted, free of charge, to any person obtaining a copy | |||
| of this software and associated documentation files (the "Software"), to deal | |||
| in the Software without restriction, including without limitation the rights | |||
| to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |||
| copies of the Software, and to permit persons to whom the Software is | |||
| furnished to do so, subject to the following conditions: | |||
| The above copyright notice and this permission notice shall be included in all | |||
| copies or substantial portions of the Software. | |||
| THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |||
| IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |||
| FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |||
| AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |||
| LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |||
| OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |||
| SOFTWARE. | |||
| @@ -0,0 +1,6 @@ | |||
| Featureful Python controller code for WS2811/WS2812/NeoPixels | |||
| - Based on Adafruit CircuitPython NeoPixel library | |||
| - Dozens of holiday and other animation presets | |||
| - Fully configurable animations | |||
| - Multiple animations in different sections | |||
| - Service-ready with dynamic MQTT and config file updates | |||
| @@ -0,0 +1,27 @@ | |||
| { | |||
| "length": 50, | |||
| "patterns": [ | |||
| { | |||
| "length": 50, | |||
| "gradient": { | |||
| "preset": "peacock" | |||
| }, | |||
| "cycle_length": 20 | |||
| }, { | |||
| "length": 0, | |||
| "gradient": { | |||
| "colors": [ | |||
| "red", | |||
| "orange", | |||
| "yellow", | |||
| "green", | |||
| "blue", | |||
| "purple", | |||
| "black" | |||
| ] | |||
| }, | |||
| "cycle_length": 7, | |||
| "march": true | |||
| } | |||
| ] | |||
| } | |||
| @@ -0,0 +1,114 @@ | |||
| #!/usr/bin/python3 | |||
| from pixels import TermLEDs | |||
| import board | |||
| import neopixel | |||
| import json | |||
| import os | |||
| import time | |||
| import pyinotify, threading | |||
| import paho.mqtt.subscribe as subscribe | |||
| from pattern import Pattern | |||
| lock_reload = threading.Lock() | |||
| cond_reload = threading.Condition(lock_reload) | |||
| reload = False | |||
| config = None | |||
| # Monitor config from file | |||
| config_file = "blinky.json" | |||
| def file_thread(file): | |||
| class EventHandler(pyinotify.ProcessEvent): | |||
| def process_IN_CLOSE_WRITE(self, event): | |||
| global reload, config | |||
| with lock_reload: | |||
| print("Triggering reload") | |||
| config = None | |||
| reload = True | |||
| cond_reload.notify_all() | |||
| wm = pyinotify.WatchManager() | |||
| notifier = pyinotify.Notifier(wm, EventHandler()) | |||
| wdd = wm.add_watch(file, pyinotify.IN_CLOSE_WRITE) | |||
| notifier.loop() | |||
| threading.Thread(target=file_thread, args=[config_file]).start() | |||
| # Monitor config from MQTT | |||
| class mqtt: | |||
| topic = "blinky/config" | |||
| hostname = "mqtt.jrhoffa.com" | |||
| auth = {'username':"blinky", 'password':"rainbow"} | |||
| tls = {'ca_certs':"/etc/ssl/certs/ca-certificates.crt"} | |||
| def mqtt_thread(): | |||
| def on_message(client, userdata, message): | |||
| global reload, config | |||
| with lock_reload: | |||
| config = json.loads(message.payload) | |||
| reload = True | |||
| cond_reload.notify_all() | |||
| subscribe.callback(on_message, mqtt.topic, hostname=mqtt.hostname, auth=mqtt.auth, port=8883, tls=mqtt.tls) | |||
| threading.Thread(target=mqtt_thread).start() | |||
| ### Entry Point ## | |||
| framerate = 30 | |||
| length = None | |||
| patterns = None | |||
| pixels = None | |||
| with lock_reload: | |||
| while True: | |||
| print("Loading config") | |||
| reload = False | |||
| try: | |||
| config = config or json.load(open(config_file)) | |||
| new_length = config['length'] | |||
| if length != new_length: | |||
| print("New string config") | |||
| length = new_length | |||
| pixels = neopixel.NeoPixel(board.D18, length, pixel_order=neopixel.RGB, auto_write=False) | |||
| #pixels = TermLEDs(length) | |||
| patterns = [(pattern['length'], Pattern.from_dict(pattern)) for pattern in config['patterns']] | |||
| except Exception as e: | |||
| print("Failed to load config:", e) | |||
| if not pixels or not patterns: | |||
| print("Waiting for valid config") | |||
| while not reload: | |||
| cond_reload.wait() | |||
| else: | |||
| target = time.time() | |||
| while not reload: | |||
| lock_reload.release() | |||
| # Generate and show new light pattern | |||
| pixels[:] = [x for y in (pattern.step(length) for (length, pattern) in patterns) for x in y] | |||
| pixels.show() | |||
| # Go ahead and sleep, maintaining maximum framerate | |||
| target += (1 / framerate) | |||
| now = time.time() | |||
| if target > now: | |||
| time.sleep(target - now) | |||
| lock_reload.acquire() | |||
| @@ -0,0 +1,30 @@ | |||
| class Colors: | |||
| red = (255, 0, 0) | |||
| orange = (78, 25, 0) | |||
| yellow = (255, 160, 0) | |||
| green = ( 0, 255, 0) | |||
| blue = ( 0, 0, 255) | |||
| purple = ( 38, 10, 42) | |||
| dark_red = ( 32, 0, 0) | |||
| dark_orange = ( 20, 6, 0) | |||
| dark_yellow = ( 64, 40, 0) | |||
| dark_green = ( 0, 32, 0) | |||
| dark_blue = ( 0, 0, 32) | |||
| dark_purple = ( 8, 3, 10) | |||
| pale_red = (255, 64, 64) | |||
| pale_orange = ( 78, 25, 10) | |||
| pale_yellow = (255, 160, 10) | |||
| pale_green = ( 64, 255, 25) | |||
| pale_blue = ( 70, 70, 255) | |||
| pale_purple = ( 42, 20, 42) | |||
| black = ( 0, 0, 0) | |||
| white = (255, 255, 255) | |||
| pink = pale_red | |||
| peach = pale_orange | |||
| teal = ( 0, 85, 35) | |||
| magenta = (255, 0, 255) | |||
| brown = ( 6, 2, 0) | |||
| @@ -0,0 +1,153 @@ | |||
| from random import random | |||
| from colors import Colors | |||
| import colors | |||
| def clamp8(v): | |||
| return 255 if v >= 255 else 0 if v <= 0 else round(v) | |||
| class Gradient: | |||
| def __init__(self, rgb_colors): | |||
| self.colors = rgb_colors[::-1] | |||
| def at(self, x): | |||
| i_left = int(x * len(self.colors)) | |||
| i_right = i_left + 1 if i_left < len(self.colors) - 1 else 0 | |||
| ratio = (x * len(self.colors)) - i_left | |||
| return tuple(clamp8(left + ratio * (right - left)) for (left, right) in zip(self.colors[i_left], self.colors[i_right])) | |||
| def from_dict(dict): | |||
| if 'preset' in dict: | |||
| return getattr(Gradients, dict['preset']) | |||
| return Gradient([getattr(Colors, color) if isinstance(color, str) else color for color in dict['colors']]) | |||
| rainbow = Gradient([ | |||
| Colors.red, | |||
| Colors.orange, | |||
| Colors.yellow, | |||
| Colors.green, | |||
| Colors.blue, | |||
| Colors.purple | |||
| ]) | |||
| class Gradients: | |||
| # Gradient Presets | |||
| # Fun | |||
| rainbow = rainbow | |||
| dark_rainbow = Gradient([ | |||
| Colors.dark_red, | |||
| Colors.dark_orange, | |||
| Colors.dark_yellow, | |||
| Colors.dark_green, | |||
| Colors.dark_blue, | |||
| Colors.dark_purple | |||
| ]) | |||
| pale_rainbow = Gradient([ | |||
| Colors.pale_red, | |||
| Colors.pale_orange, | |||
| Colors.pale_yellow, | |||
| Colors.pale_green, | |||
| Colors.pale_blue, | |||
| Colors.pale_purple | |||
| ]) | |||
| random = Gradient([Colors.black if i % 2 else rainbow.at(random()) for i in range(0, 100)]) | |||
| pusheen = Gradient([ | |||
| Colors.teal, | |||
| Colors.pink | |||
| ]) | |||
| peacock = Gradient([ | |||
| Colors.teal, | |||
| Colors.black, | |||
| Colors.purple, | |||
| Colors.blue, | |||
| ]) | |||
| # Sports | |||
| kraken = Gradient([ | |||
| ( 0, 1, 3), # Navy blue | |||
| ( 24, 60, 48), # Light blue | |||
| ( 0, 1, 3), # Navy blue (again) | |||
| ( 10, 25, 25), # Blue (greenish) | |||
| ]) | |||
| # Holiday | |||
| love = Gradient([ | |||
| Colors.pink, | |||
| Colors.black, | |||
| Colors.red, | |||
| Colors.black | |||
| ]) | |||
| irish = Gradient([ | |||
| Colors.dark_green, | |||
| Colors.black, | |||
| Colors.green, | |||
| Colors.black, | |||
| ]) | |||
| bunny = Gradient([ | |||
| Colors.pink, | |||
| Colors.pale_purple, | |||
| Colors.black, | |||
| Colors.pale_yellow, | |||
| Colors.pale_green, | |||
| Colors.black, | |||
| ]) | |||
| patriot = Gradient([ | |||
| Colors.red, | |||
| Colors.black, | |||
| Colors.black, | |||
| Colors.white, | |||
| Colors.black, | |||
| Colors.black, | |||
| Colors.blue, | |||
| Colors.black, | |||
| Colors.black, | |||
| ]) | |||
| pumpkin = Gradient([ | |||
| Colors.orange, | |||
| Colors.black, | |||
| Colors.purple, | |||
| Colors.black, | |||
| ]) | |||
| turkey = Gradient([ | |||
| Colors.orange, | |||
| Colors.black, | |||
| Colors.dark_green, | |||
| Colors.black, | |||
| Colors.brown, | |||
| Colors.black, | |||
| Colors.dark_yellow, | |||
| Colors.black, | |||
| ]) | |||
| mazel = Gradient([ | |||
| Colors.blue, | |||
| Colors.black, | |||
| Colors.white, | |||
| Colors.black, | |||
| ]) | |||
| santa = Gradient([ | |||
| Colors.red, | |||
| Colors.white, | |||
| Colors.green, | |||
| Colors.black, | |||
| ]) | |||
| @@ -0,0 +1,33 @@ | |||
| import time | |||
| from gradients import Gradient | |||
| class Pattern: | |||
| def __init__(self, gradient, cycle_length, cycle_time_s=None, reverse=False, march=False): | |||
| self.gradient = gradient | |||
| self.cycle_length = cycle_length | |||
| self.cycle_time_s = cycle_time_s or (cycle_length / 2 if march else cycle_length) | |||
| self.reverse = reverse | |||
| self.march = march | |||
| self.last = None | |||
| self.offset = 0 | |||
| def step(self, length): | |||
| now = time.time() | |||
| duration = 0 if self.last is None else now - self.last | |||
| self.last = now | |||
| offset_delta = self.cycle_length * duration / self.cycle_time_s | |||
| self.offset = (self.offset + (offset_delta if self.reverse else -offset_delta)) % self.cycle_length | |||
| offset = self.offset if not self.march else int(self.offset) | |||
| return (self.gradient.at(((i + offset) / self.cycle_length + 1) % 1) for i in range(0, length)) | |||
| def from_dict(dict): | |||
| return Pattern( | |||
| Gradient.from_dict(dict['gradient']), | |||
| dict['cycle_length'], | |||
| cycle_time_s = dict.get('cycle_time_s'), | |||
| reverse = not not dict.get('reverse', False), | |||
| march = not not dict.get('march', False) | |||
| ) | |||
| @@ -0,0 +1,18 @@ | |||
| from colors import Colors | |||
| def termcolor(r, g, b): | |||
| return f"\x1b[48;2;{r};{g};{b}m" | |||
| class TermLEDs: | |||
| def __init__(self, length): | |||
| self.leds = [Colors.black] * length | |||
| def __getitem__(self, key): | |||
| return self.leds[key] | |||
| def __setitem__(self, key, value): | |||
| self.leds[key] = value | |||
| def show(self): | |||
| print("\r" + " ".join(termcolor(*led) for led in self.leds) + " \x1b[0m", end="") | |||