Featureful Python controller code for WS2811/WS2812/NeoPixels
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

119 行
3.1KB

  1. #!/usr/bin/python3
  2. from pixels import TermLEDs
  3. import board
  4. import neopixel
  5. import json
  6. import os
  7. import time
  8. import pyinotify, threading
  9. import paho.mqtt.subscribe as subscribe
  10. from pattern import Pattern
  11. lock_reload = threading.Lock()
  12. cond_reload = threading.Condition(lock_reload)
  13. reload = False
  14. config = None
  15. # Monitor config from file
  16. config_file = "blinky.json"
  17. def file_thread(file):
  18. class EventHandler(pyinotify.ProcessEvent):
  19. def process_IN_CLOSE_WRITE(self, event):
  20. global reload, config
  21. with lock_reload:
  22. print("Triggering file reload")
  23. config = None
  24. reload = True
  25. cond_reload.notify_all()
  26. wm = pyinotify.WatchManager()
  27. notifier = pyinotify.Notifier(wm, EventHandler())
  28. wdd = wm.add_watch(file, pyinotify.IN_CLOSE_WRITE)
  29. notifier.loop()
  30. threading.Thread(target=file_thread, args=[config_file]).start()
  31. # Monitor config from MQTT
  32. class mqtt:
  33. topic = "blinky/config"
  34. hostname = "mqtt.jrhoffa.com"
  35. auth = {'username':"blinky", 'password':"rainbow"}
  36. tls = {'ca_certs':"/etc/ssl/certs/ca-certificates.crt"}
  37. def mqtt_thread():
  38. def on_message(client, userdata, message):
  39. global reload, config
  40. with lock_reload:
  41. print("Triggering MQTT reload")
  42. try:
  43. config = json.loads(message.payload)
  44. reload = True
  45. cond_reload.notify_all()
  46. except:
  47. print("Invalid MQTT config")
  48. subscribe.callback(on_message, mqtt.topic, hostname=mqtt.hostname, auth=mqtt.auth, port=8883, tls=mqtt.tls)
  49. threading.Thread(target=mqtt_thread).start()
  50. ### Entry Point ##
  51. framerate = 30
  52. length = None
  53. patterns = None
  54. pixels = None
  55. with lock_reload:
  56. while True:
  57. print("Loading config")
  58. reload = False
  59. try:
  60. config = config or json.load(open(config_file))
  61. new_length = config['length']
  62. if length != new_length:
  63. print("New string config")
  64. length = new_length
  65. pixels = neopixel.NeoPixel(board.D18, length, pixel_order=neopixel.RGB, auto_write=False)
  66. #pixels = TermLEDs(length)
  67. patterns = [(pattern['length'], Pattern.from_dict(pattern)) for pattern in config['patterns']]
  68. except Exception as e:
  69. print("Failed to load config:", e)
  70. if not pixels or not patterns:
  71. print("Waiting for valid config")
  72. while not reload:
  73. cond_reload.wait()
  74. else:
  75. target = time.time()
  76. while not reload:
  77. lock_reload.release()
  78. # Generate and show new light pattern
  79. pixels[:] = [x for y in (pattern.step(length) for (length, pattern) in patterns) for x in y]
  80. pixels.show()
  81. # Go ahead and sleep, maintaining maximum framerate
  82. target += (1 / framerate)
  83. now = time.time()
  84. if target > now:
  85. time.sleep(target - now)
  86. lock_reload.acquire()