# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ Eclairage à LED Neopixel avec contrôle (encodeur rotatif) """ __author__ = "arofarn" __version__ = 0.4 # Imports import time import machine import esp import neopixel from encoder import Encoder import ntptime from umqtt.robust import MQTTClient import light_modes import config # Déclaration des objets et initialisation des variables NEOPIX_PIN = 2 NEOPIX2_PIN = 16 # Les deux broches suivantes doivent être capable d'interruption !!! # sur ESP8266 => : 4, 5, 12, 13 et 14 ENC_PIN_A = 14 # N° de la 1ere broche de l'encodeur ENC_PIN_B = 12 # N° de la 2e broche de l'encodeur ENC_PIN_C = 13 # broche du clic central de l'encodeur WIFI_LED_PIN = 0 # With ESP8266 (no timing parameter) NPXL_STRIP = neopixel.NeoPixel(machine.Pin(NEOPIX_PIN), config.NB_PIX) # NPXL_STRIP2 = neopixel.NeoPixel(machine.Pin(NEOPIX2_PIN), config.NB_PIX2) # Eteint tout à l'initialisation NPXL_STRIP.fill([0, 0, 0]) NPXL_STRIP.write() # Encodeur rotatif ENCODER = Encoder(ENC_PIN_A, ENC_PIN_B, min_val=0, max_val=config.MAX_BRIGHT, clicks=1) # Bouton ENC_BUT = machine.Pin(ENC_PIN_C, machine.Pin.IN) # LED status WIFI_LED_PIN WIFI_LED = machine.Pin(WIFI_LED_PIN, machine.Pin.OUT) # RTC setup rtc = machine.RTC() # initialisation des variables d'état BRIGHTN = 50 # Luminosité (0 - 100) PWR = True # Est-ce que l'éclairage est allumé ? BUTTN_STATE = 1 CURRENT_MODE = 0 PWROFF_DELAY = 0 MQTT_DELAY = 2 # Délais en sec. avant d'envoyer les données au MQTT ############# # FUNCTIONS # ############# def MqttConnect(config, cb): # Connection au broker MQTT CLIENT = MQTTClient(client_id=config.CLIENT_ID, server=config.MQTT_HOST, user=config.MQTT_USERNAME, password=config.MQTT_PASSWD, port=config.MQTT_PORT) CLIENT.DEBUG = True try: if WLAN.isconnected(): CLIENT.reconnect() except Exception as err: print(err) if CLIENT.is_connected: # MQTT subscription print("MqttConnect: Set callback...") CLIENT.set_callback(cb) print("OK") print("MqttConnect: Subcribe...") CLIENT.subscribe(b"{location}/{device}/#".format(location=config.LOCATION, device=CLIENT.client_id)) print("OK") return CLIENT def set_rtc(retry = 3): n = 0 while True: n += 1 try: return ntptime.settime() except OSError as err: print("set_rtc() : OS error: {0}".format(err)) if err.args[0] == 110: time.sleep(0.5) except OverflowError as err: print("set_rtc() : OverflowError: {0}".format(err)) time.sleep(0.2) except Exception as err: return print("set_rtc() : ", err) if n >= retry: print("set_rtc() : retry limit reached : ", n) break def now(clock): """Return a string representing date/time now.""" return "{0:04d}/{1:02d}/{2:02d}_{4:02d}:{5:02d}:{6:02d}".format(*clock.datetime()) def mqtt_publish(topic, message, retain=False, qos=0, sleep=0): """MQTT publish helper.""" global CLIENT CLIENT.publish("{location}/{device}/{topic}".format(location=config.LOCATION, device=CLIENT.client_id, topic=topic), message, retain=retain, qos=qos) time.sleep_ms(sleep) def mqtt_multipub(payload, sleep=0): """Send multiple data""" for pl in payload: mqtt_publish(pl[0], pl[1], retain=pl[2], qos=pl[3], sleep=sleep) def mqtt_callback(topic, msg): """Callback en cas de changement de mode""" global CURRENT_MODE global BRIGHTN global PWR # print(topic, msg) t = topic.split(b'/') l = len(t) if t[l-1] == b'value' and msg.isdigit(): msg = int(msg) # print(t[l-2], msg) if t[l-2] == b'mode' and msg in range(0, len(light_modes.MODES_LST)): # print("Nouveau mode: ", int(msg)) CURRENT_MODE = msg if t[l-2] == b'brightness' and msg in range(0, 101): # print("Modif. luminosité :", int(msg)) BRIGHTN = msg ENCODER.reset(BRIGHTN) if t[l-2] == b'power' and msg in (0, 1): PWR = bool(msg) else: print("Pas une valeur. Ignore...") def power_cycle(): """ON/OFF function""" global PWR global BRIGHTN PWR = not PWR print("Power :", PWR) if not PWR: # Extinction des LED NPXL_STRIP.fill((0, 0, 0)) NPXL_STRIP.write() else: # Luminosité basse si on rallume avec une luminisité de 0 if BRIGHTN == 0: BRIGHTN = 10 # On remet l'encodeur à la dernière luminosité connue # pour ignorer les mouvements pendant l'extinction ENCODER.reset(BRIGHTN) # on attend que le bouton soit relâché while ENC_BUT.value() == 0: time.sleep_ms(10) # Envoi de la nouvelle info au MQTT mqtt_multipub([['power/date', now(rtc), True, 1], ['power/value', "{:d}".format(PWR), True, 0]]) def update_button_c(button, button_state, pwr, mode): """Surveille le bouton C (clic central)""" global PWROFF_DELAY if button.value() == 0: # print("appui") if button_state == 1: # Deadline avant de changer l'état d'allumage PWROFF_DELAY = time.ticks_add(time.ticks_ms(), 2000) # print(PWROFF_DELAY) else: # Est-ce que la deadline est atteinte ? si oui on change l'état # d'allumage # print(PWROFF_DELAY, "/", time.ticks_ms()) if time.ticks_diff(time.ticks_ms(), PWROFF_DELAY) >= 0: PWROFF_DELAY = 0 power_cycle() else: if pwr and button_state == 0: # Si on a juste un clic rapide sur le bouton, on change de mode au # relachement du bouton mode = mode + 1 print("Mode : {}/{}".format(CURRENT_MODE, len(light_modes.MODES_LST))) if mode >= len(light_modes.MODES_LST): mode = 0 mqtt_multipub([['mode/date', now(rtc), True, 1], ['mode/value', "{}".format(mode), True, 1]]) return mode def update_conn_led(): """Refresh Wifi status LED""" WIFI_LED.value(not WLAN.isconnected()) ##################### # BOUCLE PRINCIPALE # ##################### if WLAN.isconnected(): set_rtc() CLIENT = MqttConnect(config, mqtt_callback) # Envoi de l'état de départ au broker data = [["last_boot", now(rtc), True, 1], ["location", config.LOCATION, True, 1], ["sys/wifi_ssid", WLAN.config('essid'), True, 0], ["sys/ip", WLAN.ifconfig()[0], True, 1], ["sys/esp8266_device_id", "{:d}".format(esp.flash_id()), True, 1], ['mode/date', now(rtc), True, 1], ["mode/value", "{}".format(CURRENT_MODE), True, 1], ['brightness/date', now(rtc), True, 1], ["brightness/value", "{}".format(BRIGHTN), True, 1], ['power/date', now(rtc), True, 1], ["power/value", "{:d}".format(PWR), True, 1], ] mqtt_multipub(data) BRIGHTN_DATA = None while True: # Si on a un changement de valeur, on met à jour la luminosité if PWR: if BRIGHTN != ENCODER.value: print(ENCODER.value) BRIGHTN = ENCODER.value MQTT_TIMER = time.ticks_add(time.ticks_ms(), MQTT_DELAY * 1000) BRIGHTN_DATA = [['brightness/date', now(rtc), True, 1], ['brightness/value', "{}".format(BRIGHTN), True, 0]] # Mise à jour des LED light_modes.update_neopixel(CURRENT_MODE, NPXL_STRIP, config.USR_COLOR, BRIGHTN) # Si on laisse appuyer 2 secondes: extinction CURRENT_MODE = update_button_c(ENC_BUT, BUTTN_STATE, PWR, CURRENT_MODE) BUTTN_STATE = ENC_BUT.value() update_conn_led() if WLAN.isconnected() and CLIENT.is_connected: CLIENT.check_msg() # On n'envoie les données de luminosité qu'une fois stabilisée (MQTT_DELAY) if BRIGHTN_DATA is not None and time.ticks_diff(time.ticks_ms(), MQTT_TIMER) >= 0: if WLAN.isconnected() and CLIENT.is_connected: mqtt_multipub(BRIGHTN_DATA) BRIGHTN_DATA = None time_to_test = rtc.datetime() if time_to_test[5] + time_to_test[6] == 0: # (chaque heure quand min et sec = 0) print("Old Time: {0:04d}/{1:02d}/{2:02d}_{4:02d}:{5:02d}:{6:02d}.{7:03d}".format(*time_to_test)) set_rtc() print("Time set: {0:04d}/{1:02d}/{2:02d}_{4:02d}:{5:02d}:{6:02d}.{7:03d}".format(*rtc.datetime())) time.sleep(1) if time_to_test[6] == 0 and not CLIENT.is_connected: CLIENT = MqttConnect(config, mqtt_callback)