# Pérenisation de données arrivant sur un MQTT # vers une base de données MariaDb/MySQL # Auteur : Arofarn # v0.2 ################# # Configuration # ################# ########### # IMPORTS # ########### from cameteo import * import paho.mqtt.client as mqtt import json from math import isnan mqtt_client_id = "sql_persistance" ############# # CALLBACKS # ############# #Callback pour la connection au MQTT : souscriptions aux topics def on_connect(client, userdata, flags, rc): print(mqtt.connack_string(rc)) if rc == 0: print("Subscribing to %s ..." % mqtt_topic) client.subscribe([(mqtt_topic, 0), (camera_mqtt_topic + "/last_photo", 0) ]) print("OK") #Callback de gestion des messages arrivant au MQTT : # affichage et enregistrement en base de données def on_message(client, userdata, msg): top=msg.topic[len(mqtt_topic)-1:].strip() subtopics = top.split("/") payload = msg.payload.decode() #print(payload) val = json.loads(payload) #Test présence et cohérence de la valeur try: val['value'] = float(val['value'] ) except: print("Value error: {}".format(val['value'])) val['value'] = float('nan') #Remplacement de la valeur en cas de nullité if isnan(val['value']) or val['value'] == -99.9: val['value'] = 'NULL' #Gestion du symbole des degrés parfois difficile pour certaines sources val['unit'] = val['unit'].replace('deg', '°') #print("%s : %s %s (%s)" % (val['type'], val['value'], val['unit'], subtopics[0])) #Enregistrement des données en base data= Data(valdate = datetime.strptime(val['date'], "%H:%M:%S %d/%m/%Y"), dbdate = datetime.now(), value = val['value'], unit = val['unit'], type_id = val['type'], sensor_id = subtopics[0] ) print(data) try: db.session.add(data) db.session.commit() except: print("Erreur lors de l'enregistrement en base de données") finally: db.session.close() #Callback particulier pour gérer les date/heure arrivent sur le broker MQTT def on_message_date(client, userdata, msg): payload = msg.payload.decode() try: d = datetime.strptime(payload, "%H:%M:%S %d/%m/%Y") print("Date : %s" % d) except: print("Date mal formatée : %s" % payload) #Update display with info from camera (camera shooting new photo or name of the # last photo) def on_message_camera(client, userdata, msg): pl = msg.payload.decode() print("New photo : " + pl) # New photo to add to database photo = Photo(file_name = pl, file_date = datetime.strptime(os.path.splitext(pl)[0], photo_name), image_type = photo_format) try: db.session.add(photo) db.session.commit() except: print("Erreur lors de l'enregistrement en base de données") finally: db.session.close() #Callback de déconnexion au broker MQTT def on_disconnect(client, userdata, msg): if msg != 0: print("Déconnexion imprévu : %s" % msg) exit() ######## # Main # ######## #Connect to MQTT broker and loop... mqtt_client = mqtt.Client(mqtt_client_id, clean_session=False) mqtt_client.username_pw_set(mqtt_user, mqtt_pass) mqtt_client.on_connect = on_connect mqtt_client.on_message = on_message mqtt_client.on_disconnect = on_disconnect mqtt_client.message_callback_add(camera_mqtt_topic + "/last_photo", on_message_camera) mqtt_client.message_callback_add("huzzah0/NTP/date", on_message_date) mqtt_client.connect(mqtt_host, int(mqtt_port), 60) mqtt_client.loop_forever()