Cameteo/raspberry/python/mqtt2sqlal.py

130 lines
3.7 KiB
Python

# Pérenisation de données arrivant sur un MQTT
# vers une base de données MariaDb/MySQL
# Auteur : Arofarn
# v0.2
#################
# Configuration #
#################
###########
# IMPORTS #
###########
from cameteo import *
import paho.mqtt.client as mqtt
import json
from math import isnan
mqtt_client_id = "sql_persistance"
#############
# CALLBACKS #
#############
#Callback pour la connection au MQTT : souscriptions aux topics
def on_connect(client, userdata, flags, rc):
print(mqtt.connack_string(rc))
if rc == 0:
print("Subscribing to %s ..." % mqtt_topic)
client.subscribe([(mqtt_topic, 0),
(camera_mqtt_topic + "/last_photo", 0)
])
print("OK")
#Callback de gestion des messages arrivant au MQTT :
# affichage et enregistrement en base de données
def on_message(client, userdata, msg):
top=msg.topic[len(mqtt_topic)-1:].strip()
subtopics = top.split("/")
payload = msg.payload.decode()
#print(payload)
val = json.loads(payload)
#Test présence et cohérence de la valeur
try:
val['value'] = float(val['value'] )
except:
print("Value error: {}".format(val['value']))
val['value'] = float('nan')
#Remplacement de la valeur en cas de nullité
if isnan(val['value']) or val['value'] == -99.9:
val['value'] = 'NULL'
#Gestion du symbole des degrés parfois difficile pour certaines sources
val['unit'] = val['unit'].replace('deg', '°')
#print("%s : %s %s (%s)" % (val['type'], val['value'], val['unit'], subtopics[0]))
#Enregistrement des données en base
data= Data(valdate = datetime.strptime(val['date'], "%H:%M:%S %d/%m/%Y"),
dbdate = datetime.now(),
value = val['value'],
unit = val['unit'],
type_id = val['type'],
sensor_id = subtopics[0] )
print(data)
try:
db.session.add(data)
db.session.commit()
except:
print("Erreur lors de l'enregistrement en base de données")
finally:
db.session.close()
#Callback particulier pour gérer les date/heure arrivent sur le broker MQTT
def on_message_date(client, userdata, msg):
payload = msg.payload.decode()
try:
d = datetime.strptime(payload, "%H:%M:%S %d/%m/%Y")
print("Date : %s" % d)
except:
print("Date mal formatée : %s" % payload)
#Update display with info from camera (camera shooting new photo or name of the
# last photo)
def on_message_camera(client, userdata, msg):
pl = msg.payload.decode()
print("New photo : " + pl)
# New photo to add to database
photo = Photo(file_name = pl,
file_date = datetime.strptime(os.path.splitext(pl)[0], photo_name),
image_type = photo_format)
try:
db.session.add(photo)
db.session.commit()
except:
print("Erreur lors de l'enregistrement en base de données")
finally:
db.session.close()
#Callback de déconnexion au broker MQTT
def on_disconnect(client, userdata, msg):
if msg != 0:
print("Déconnexion imprévu : %s" % msg)
exit()
########
# Main #
########
#Connect to MQTT broker and loop...
mqtt_client = mqtt.Client(mqtt_client_id, clean_session=False)
mqtt_client.username_pw_set(mqtt_user, mqtt_pass)
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
mqtt_client.on_disconnect = on_disconnect
mqtt_client.message_callback_add(camera_mqtt_topic + "/last_photo", on_message_camera)
mqtt_client.message_callback_add("huzzah0/NTP/date", on_message_date)
mqtt_client.connect(mqtt_host, int(mqtt_port), 60)
mqtt_client.loop_forever()