#!/usr/bin/python3 from pixels import TermLEDs import board import neopixel import json import os import sys import time from pathlib import Path import pyinotify, threading import paho.mqtt.subscribe as subscribe from pattern import Pattern lock_reload = threading.Lock() cond_reload = threading.Condition(lock_reload) reload = False ignore_file = False config = {} new_config = None # Monitor config from file if len(sys.argv) > 1: config_file = sys.argv[1] else: config_file = "blinky.json" if not Path(config_file).is_file(): print("WARNING: Config file does not exist:", config_file) def file_thread(file): class EventHandler(pyinotify.ProcessEvent): def process_IN_CLOSE_WRITE(self, event): global reload, new_config, ignore_file with lock_reload: if ignore_file: print("Ignoring file change") ignore_file = False else: print("Triggering file reload") new_config = None reload = True cond_reload.notify_all() wm = pyinotify.WatchManager() notifier = pyinotify.Notifier(wm, EventHandler()) wdd = wm.add_watch(file, pyinotify.IN_CLOSE_WRITE) notifier.loop() threading.Thread(target=file_thread, args=[config_file]).start() # Monitor config from MQTT class mqtt: topic = "blinky/config" hostname = "mqtt.jrhoffa.com" auth = {'username':"blinky", 'password':"rainbow"} tls = {'ca_certs':"/etc/ssl/certs/ca-certificates.crt"} def mqtt_thread(): def on_message(client, userdata, message): global reload, new_config with lock_reload: print("Triggering MQTT reload") try: new_config = json.loads(message.payload) reload = True cond_reload.notify_all() except: print("Invalid MQTT config") global reload, new_config while True: print(f"Subscribing to MQTT topic {mqtt.topic}") try: message = subscribe.simple(mqtt.topic, hostname=mqtt.hostname, auth=mqtt.auth, port=8883, tls=mqtt.tls) except Exception as e: print("MQTT subscription error:", e) message = None if not message: print("Subscription failed - retrying") time.sleep(1) else: with lock_reload: print("Triggering MQTT reload") try: new_config = json.loads(message.payload) reload = True cond_reload.notify_all() except: print("Invalid MQTT config") threading.Thread(target=mqtt_thread).start() ### Entry Point ## framerate = 30 length = None patterns = None pixels = None with lock_reload: while True: print("Loading config") reload = False try: # If it wasn't delivered, load from file new_config = new_config or json.load(open(config_file, 'r')) if 'length' in new_config: new_length = new_config['length'] if length != new_length: length = new_length config['length'] = length pixels = neopixel.NeoPixel(board.D18, length, pixel_order=neopixel.RGB, auto_write=False) #pixels = TermLEDs(length) print("String configured") patterns = [(pattern.get('length', length), Pattern.from_dict(pattern)) for pattern in new_config['patterns']] config['patterns'] = new_config['patterns'] print("Pattern configured") except Exception as e: print("Failed to load config:", e) finally: print("Saving config") ignore_file = True json.dump(config, open(config_file, 'w')) if not length or not pixels or not patterns: print("Waiting for valid config") while not reload: cond_reload.wait() else: print("Starting animation") last = target = time.time() while not reload: lock_reload.release() # Generate and show new light pattern pixels[:] = [x for y in (pattern.step(length) for (length, pattern) in patterns) for x in y] pixels.show() # Go ahead and sleep, maintaining maximum framerate target += (1 / framerate) now = time.time() sleeptime = target - now if sleeptime > 0: time.sleep(sleeptime) lock_reload.acquire()