|
- #!/usr/bin/python3
-
- from pixels import TermLEDs
- import board
- import neopixel
- import json
- import os
- import time
-
- import pyinotify, threading
- import paho.mqtt.subscribe as subscribe
-
- from pattern import Pattern
-
-
- lock_reload = threading.Lock()
- cond_reload = threading.Condition(lock_reload)
- reload = False
- config = None
-
-
- # Monitor config from file
-
- config_file = "blinky.json"
-
- def file_thread(file):
- class EventHandler(pyinotify.ProcessEvent):
- def process_IN_CLOSE_WRITE(self, event):
- global reload, config
- with lock_reload:
- print("Triggering file reload")
- config = None
- reload = True
- cond_reload.notify_all()
-
- wm = pyinotify.WatchManager()
- notifier = pyinotify.Notifier(wm, EventHandler())
- wdd = wm.add_watch(file, pyinotify.IN_CLOSE_WRITE)
-
- notifier.loop()
-
- threading.Thread(target=file_thread, args=[config_file]).start()
-
-
- # Monitor config from MQTT
-
- class mqtt:
- topic = "blinky/config"
- hostname = "mqtt.jrhoffa.com"
- auth = {'username':"blinky", 'password':"rainbow"}
- tls = {'ca_certs':"/etc/ssl/certs/ca-certificates.crt"}
-
- def mqtt_thread():
- def on_message(client, userdata, message):
- global reload, config
- with lock_reload:
- print("Triggering MQTT reload")
- try:
- config = json.loads(message.payload)
- reload = True
- cond_reload.notify_all()
- except:
- print("Invalid MQTT config")
-
- subscribe.callback(on_message, mqtt.topic, hostname=mqtt.hostname, auth=mqtt.auth, port=8883, tls=mqtt.tls)
-
- threading.Thread(target=mqtt_thread).start()
-
-
- ### Entry Point ##
-
- framerate = 30
-
- length = None
- patterns = None
- pixels = None
-
- with lock_reload:
- while True:
- print("Loading config")
- reload = False
-
- try:
- config = config or json.load(open(config_file))
-
- new_length = config['length']
-
- if length != new_length:
- print("New string config")
- length = new_length
- pixels = neopixel.NeoPixel(board.D18, length, pixel_order=neopixel.RGB, auto_write=False)
- #pixels = TermLEDs(length)
-
- patterns = [(pattern['length'], Pattern.from_dict(pattern)) for pattern in config['patterns']]
-
- except Exception as e:
- print("Failed to load config:", e)
-
- if not pixels or not patterns:
- print("Waiting for valid config")
- while not reload:
- cond_reload.wait()
- else:
- target = time.time()
- while not reload:
- lock_reload.release()
-
- # Generate and show new light pattern
- pixels[:] = [x for y in (pattern.step(length) for (length, pattern) in patterns) for x in y]
- pixels.show()
-
- # Go ahead and sleep, maintaining maximum framerate
- target += (1 / framerate)
- now = time.time()
- if target > now:
- time.sleep(target - now)
-
- lock_reload.acquire()
|