|
- #!/usr/bin/python3
-
- from pixels import TermLEDs
- import board
- import neopixel
- import json
- import os
- import sys
- import time
- from pathlib import Path
- import uuid
- from threading import Lock, Condition
-
- import pyinotify, threading
- import paho.mqtt.client as mqtt
-
- from pattern import Pattern
- from gradients import Gradients
-
-
- ### Parameters ###
-
- if len(sys.argv) > 1:
- config_file = sys.argv[1]
- else:
- config_file = "blinky.json"
-
- if not Path(config_file).is_file():
- print("WARNING: Config file does not exist:", config_file)
-
-
- # Monitor config from MQTT
-
- class Device:
- id = "blinky" #f"{uuid.getnode():0>12x}"
-
- state_topic = f"light/{id}/state"
- command_topic = f"light/{id}/cmd"
- fx_state_topic = f"light/{id}/fx"
- ha_topic = 'homeassistant/status'
-
- ready = False
- changed = False
- should_publish = False
-
- power_on = True
- effect = None
-
- def advertise_locked(client, device):
- client.publish(
- f"homeassistant/light/{device.id}/config",
- json.dumps({
- 'unique_id': device.id,
- 'device': {
- 'name': "blinky",
- 'identifiers': [device.id],
- },
-
- 'state_topic': device.state_topic,
- 'command_topic': device.command_topic,
- 'effect_state_topic': device.fx_state_topic,
-
- 'schema': "json",
-
- 'effect': True,
- 'effect_list': device.effects,
- })
- )
-
- device.publish_state_locked()
-
- def on_mqtt_connect(client, device, flags, rc):
- if rc == 0:
- with device.lock:
- print("Connected to MQTT")
-
- device.ready = True
- Device.advertise_locked(client, device)
-
- client.subscribe(device.command_topic)
- client.subscribe(device.ha_topic)
-
- device.cond.notify_all()
- else:
- print(f"MQTT connection failed: {rc}")
-
- def set_mqtt_config(self, data):
- with self.lock:
- changed = False
- if 'state' in data:
- device.power_on = data['state'] == "ON"
- changed = True
- if 'effect' in data:
- device.effect = data['effect']
- changed = True
- if changed:
- self.changed = True
- self.update_state_locked()
-
- def on_mqtt_message(client, device, msg):
- if msg.topic == device.command_topic:
- device.set_mqtt_config(json.loads(msg.payload))
- elif msg.topic == device.ha_topic:
- with device.lock:
- Device.advertise_locked(client, device)
-
- def __init__(self, effects=[]):
- print(f"Creating device with ID {self.id}")
-
- self.effects = effects
-
- self.lock = Lock()
- self.cond = Condition(self.lock)
-
- self.client = mqtt.Client(userdata=self)
- self.client.on_connect = Device.on_mqtt_connect
- self.client.on_message = Device.on_mqtt_message
- self.client.tls_set("/etc/ssl/certs/ca-certificates.crt")
- self.client.username_pw_set("blinky", "rainbow")
- self.client.connect_async("mqtt.jrhoffa.com", 8883)
- self.client.loop_start();
-
- def is_ready(self): return self.ready
- def is_changed(self): return self.changed
-
- def wait_ready(self, timeout=None):
- with self.lock: self.cond.wait_for(self.is_ready, timeout=timeout)
- def wait_changed(self, timeout=None):
- with self.lock: self.cond.wait_for(self.is_changed, timeout=timeout)
-
- def publish_state_locked(self):
- self.should_publish = not self.ready
- if self.ready:
- self.client.publish(
- device.state_topic,
- json.dumps({
- 'state': "ON" if self.power_on else "OFF",
- 'effect': self.effect
- })
- )
-
- def update_state_locked(self):
- self.changed = True
- self.publish_state_locked()
- self.cond.notify_all()
-
- def set_power_state(self, power_on):
- with self.lock:
- self.power_on = power_on
- self.update_state_locked()
-
- def set_effect(self, effect):
- with self.lock:
- self.effect = effect
- self.update_state_locked()
-
- device = Device([gradient for gradient in dir(Gradients) if not gradient.startswith('__')])
-
-
- # Monitor config from file
-
- file_changed = True
- ignore_file = False
-
- config = {}
- new_config = None
-
- def file_thread(file, device):
- class EventHandler(pyinotify.ProcessEvent):
- def process_IN_CLOSE_WRITE(self, event):
- global file_changed, new_config, ignore_file
- with device.lock:
- if ignore_file:
- print("Ignoring file change")
- ignore_file = False
- else:
- print("Triggering file reload")
- new_config = None
- file_changed = True
- device.cond.notify_all()
-
- wm = pyinotify.WatchManager()
- notifier = pyinotify.Notifier(wm, EventHandler())
- wdd = wm.add_watch(file, pyinotify.IN_CLOSE_WRITE)
-
- notifier.loop()
-
- threading.Thread(target=file_thread, args=[config_file, device]).start()
-
-
- ### Entry Point ##
-
- framerate = 30
-
- power_on = True
- length = None
- patterns = None
- pixels = None
-
- with device.lock:
- while True:
- try:
- if file_changed:
- print("Loading config file")
- new_config = json.load(open(config_file, 'r'))
- file_changed = False
- elif device.changed:
- print("Using new config from MQTT")
- new_config = {'power_on': device.power_on}
- if device.effect:
- new_config['patterns'] = [{
- 'gradient': {'preset': device.effect},
- 'cycle_length': 20
- }]
- device.changed = False
-
- if 'length' in new_config:
- new_length = new_config['length']
- if length != new_length:
- config['length'] = new_length
- length = new_length
- pixels = neopixel.NeoPixel(board.D18, length, pixel_order=neopixel.RGB, auto_write=False)
- #pixels = TermLEDs(length)
- print("String configured")
-
- if 'power_on' in new_config:
- config['power_on'] = new_config['power_on']
- power_on = config['power_on']
-
- if 'patterns' in new_config:
- config['patterns'] = new_config['patterns']
-
- # Don't regenerate the patterns if this is just a power command
- if 'length' in new_config or 'patterns' in new_config:
- patterns = [(pattern.get('length', length), Pattern.from_dict(pattern)) for pattern in config['patterns']]
- if file_changed and len(patterns) > 0 and 'gradient' in config['patterns'][0]:
- device.set_effect(config['patterns'][0]['gradient'].get('preset', None))
- print("Pattern configured")
-
- except Exception as e:
- print("Failed to set config:", e)
-
- finally:
- print("Saving config")
- ignore_file = True
- json.dump(config, open(config_file, 'w'))
-
- if not device.power_on or not length or not pixels or not patterns:
- if not device.power_on:
- pixels[:] = [0, 0, 0] * length
- pixels.show()
- print("Power off, waiting ...")
- else:
- print("Waiting for valid config")
- while not file_changed and not device.is_changed():
- device.cond.wait()
-
- else:
- print("Starting animation")
- last = target = time.time()
- while not file_changed and not device.is_changed():
- device.lock.release()
-
- # Generate and show new light pattern
- pixels[:] = [x for y in (pattern.step(length) for (length, pattern) in patterns) for x in y]
- pixels.show()
-
- # Go ahead and sleep, maintaining maximum framerate
- target += (1 / framerate)
- now = time.time()
- sleeptime = target - now
-
- # Release before sleep because we're using the condition variable
- # to wake up early in case there's an important message
- #device.lock.acquire()
-
- if sleeptime > 0:
- time.sleep(sleeptime)
- #while not file_changed and not device.is_changed():
- # device.cond.wait(sleeptime)
-
- device.lock.acquire()
|