130 lines
3.7 KiB
Python
Executable File
130 lines
3.7 KiB
Python
Executable File
# Pérenisation de données arrivant sur un MQTT
|
|
# vers une base de données MariaDb/MySQL
|
|
# Auteur : Arofarn
|
|
# v0.2
|
|
|
|
#################
|
|
# Configuration #
|
|
#################
|
|
|
|
###########
|
|
# IMPORTS #
|
|
###########
|
|
|
|
from cameteo import *
|
|
import paho.mqtt.client as mqtt
|
|
import json
|
|
from math import isnan
|
|
|
|
mqtt_client_id = "sql_persistance"
|
|
|
|
#############
|
|
# CALLBACKS #
|
|
#############
|
|
|
|
#Callback pour la connection au MQTT : souscriptions aux topics
|
|
def on_connect(client, userdata, flags, rc):
|
|
print(mqtt.connack_string(rc))
|
|
if rc == 0:
|
|
print("Subscribing to %s ..." % mqtt_topic)
|
|
client.subscribe([(mqtt_topic, 0),
|
|
(camera_mqtt_topic + "/last_photo", 0)
|
|
])
|
|
print("OK")
|
|
|
|
|
|
#Callback de gestion des messages arrivant au MQTT :
|
|
# affichage et enregistrement en base de données
|
|
def on_message(client, userdata, msg):
|
|
top=msg.topic[len(mqtt_topic)-1:].strip()
|
|
|
|
subtopics = top.split("/")
|
|
payload = msg.payload.decode()
|
|
#print(payload)
|
|
val = json.loads(payload)
|
|
|
|
#Test présence et cohérence de la valeur
|
|
try:
|
|
val['value'] = float(val['value'] )
|
|
except:
|
|
print("Value error: {}".format(val['value']))
|
|
val['value'] = float('nan')
|
|
|
|
#Remplacement de la valeur en cas de nullité
|
|
if isnan(val['value']) or val['value'] == -99.9:
|
|
val['value'] = 'NULL'
|
|
|
|
#Gestion du symbole des degrés parfois difficile pour certaines sources
|
|
val['unit'] = val['unit'].replace('deg', '°')
|
|
|
|
#print("%s : %s %s (%s)" % (val['type'], val['value'], val['unit'], subtopics[0]))
|
|
|
|
#Enregistrement des données en base
|
|
data= Data(valdate = datetime.strptime(val['date'], "%H:%M:%S %d/%m/%Y"),
|
|
dbdate = datetime.now(),
|
|
value = val['value'],
|
|
unit = val['unit'],
|
|
type_id = val['type'],
|
|
sensor_id = subtopics[0] )
|
|
|
|
print(data)
|
|
|
|
try:
|
|
db.session.add(data)
|
|
db.session.commit()
|
|
except:
|
|
print("Erreur lors de l'enregistrement en base de données")
|
|
finally:
|
|
db.session.close()
|
|
|
|
#Callback particulier pour gérer les date/heure arrivent sur le broker MQTT
|
|
def on_message_date(client, userdata, msg):
|
|
payload = msg.payload.decode()
|
|
try:
|
|
d = datetime.strptime(payload, "%H:%M:%S %d/%m/%Y")
|
|
print("Date : %s" % d)
|
|
except:
|
|
print("Date mal formatée : %s" % payload)
|
|
|
|
#Update display with info from camera (camera shooting new photo or name of the
|
|
# last photo)
|
|
def on_message_camera(client, userdata, msg):
|
|
pl = msg.payload.decode()
|
|
print("New photo : " + pl)
|
|
|
|
# New photo to add to database
|
|
photo = Photo(file_name = pl,
|
|
file_date = datetime.strptime(os.path.splitext(pl)[0], photo_name),
|
|
image_type = photo_format)
|
|
try:
|
|
db.session.add(photo)
|
|
db.session.commit()
|
|
except:
|
|
print("Erreur lors de l'enregistrement en base de données")
|
|
finally:
|
|
db.session.close()
|
|
|
|
#Callback de déconnexion au broker MQTT
|
|
def on_disconnect(client, userdata, msg):
|
|
if msg != 0:
|
|
print("Déconnexion imprévu : %s" % msg)
|
|
exit()
|
|
|
|
########
|
|
# Main #
|
|
########
|
|
|
|
#Connect to MQTT broker and loop...
|
|
mqtt_client = mqtt.Client(mqtt_client_id, clean_session=False)
|
|
mqtt_client.username_pw_set(mqtt_user, mqtt_pass)
|
|
mqtt_client.on_connect = on_connect
|
|
mqtt_client.on_message = on_message
|
|
mqtt_client.on_disconnect = on_disconnect
|
|
|
|
mqtt_client.message_callback_add(camera_mqtt_topic + "/last_photo", on_message_camera)
|
|
mqtt_client.message_callback_add("huzzah0/NTP/date", on_message_date)
|
|
|
|
mqtt_client.connect(mqtt_host, int(mqtt_port), 60)
|
|
|
|
mqtt_client.loop_forever()
|