Cameteo/circuitpython/main.py

230 lines
8.1 KiB
Python
Raw Normal View History

# Weather and GPS logger
##########
# Config #
##########
print_data = True
data_to_neopixel = True
gps_enable = False
update_interval = 5.0 # in seconds
send_json_data = True
#######################
import board, time, microcontroller
import gc, micropython, os
from busio import I2C, UART
from analogio import AnalogIn
from adafruit_bme280 import Adafruit_BME280_I2C
from adafruit_gps import GPS
import neopixel
###########
# Classes #
###########
class Data:
"""Class for handling data format and transmission"""
def __init__(self):
self.data = {}
def update(self):
"""Read the data from various sensors and update the data dict variable"""
#Data from Feather board
self.data['SYS'] = {'v_bat': { 'val': vbat.value*0.000100708, 'unit': 'V' },
'CPU_temp': { 'val': microcontroller.cpu.temperature, 'unit': '°C' }}
# Note about v_bat calculation :
# 0.000100708 = 2*3.3/65536 with
# 2 : voltage is divided by 2
# 3.3 : Vref = 3.3V
# 65536 : 16bit ADC
#Data from BME280
self.data['BME280'] = {'temp': { 'val': bme280.temperature, 'unit': '°C' },
'hum': { 'val': bme280.humidity, 'unit': '%' },
'press': { 'val': bme280.pressure, 'unit': 'hPa' }}
if gps_enable:
if gps.has_fix:
self.data['GPS'] = {'timestamp': '{:04}/{:02}/{:02} {:02}:{:02}:{:02}'.format(gps.timestamp_utc.tm_year,
gps.timestamp_utc.tm_mon,
gps.timestamp_utc.tm_mday,
gps.timestamp_utc.tm_hour,
gps.timestamp_utc.tm_min,
gps.timestamp_utc.tm_sec),
'lat': {'val': gps.latitude, 'unit': 'deg'},
'lon': {'val': gps.longitude, 'unit': 'deg'},
'alt': {'val': gps.altitude_m, 'unit': 'm'},
'qual': {'val': gps.fix_quality, 'unit': None}}
else:
self.data['GPS'] = {'lat': {'val': None, 'unit': 'deg'},
'lon': {'val': None, 'unit': 'deg'},
'alt': {'val': None, 'unit': 'm'}}
else:
self.data['GPS'] = None
def show(self):
"""Serialize data to json-formatted string for
visualization on serial console
"""
for source in self.data.keys():
print(source + ": ")
if not self.data[source] == None:
for d in self.data[source].items():
print("\t{0}: {val} {unit}".format(d[0], **d[1]))
def json(self):
"""Serialize data to json-formatted string"""
output = "{"
for source in self.data.keys():
output = "".join((output, "'", source, "': \n"))
if not self.data[source] == None:
for d in self.data[source].items():
output = "".join((output, "{",
"'{}': ".format(d[0]),
"{",
"'val': {val},'unit': {unit}".format(**d[1]),
"}}\n"))
output = output + "}, \n"
output = output + "}"
return output
#############
# Functions #
#############
def check_data_dir():
"""Check if data directories exists"""
if 'data' not in os.listdir():
os.mkdir('data')
os.mkdir('data/hourly')
os.mkdir('data/daily')
elif 'hourly' not in os.listdir('data'):
os.mkdir('data/hourly')
elif 'daily' not in os.listdir('data'):
os.mkdir('data/daily')
def update_gps():
"""Update and print data from GPS module"""
if not gps.has_fix:
# Try again if we don't have a fix yet.
print('Waiting for fix...{}-{}'.format(gps.has_fix, gps.satellites))
else: # GPS fix OK !!!
#UTC Time
print('{}/{}/{} {:02}:{:02}:{:02}'.format(
gps.timestamp_utc.tm_mon, # Grab parts of the time from the
gps.timestamp_utc.tm_mday, # struct_time object that holds
gps.timestamp_utc.tm_year, # the fix time. Note you might
gps.timestamp_utc.tm_hour, # not get all data like year, day,
gps.timestamp_utc.tm_min, # month!
gps.timestamp_utc.tm_sec))
if gps.altitude_m is not None:
print('Lat: {}deg | Lon: {}deg | Alt: {}m'.format(gps.latitude,
gps.longitude,
gps.altitude_m))
else:
print('Lat: {}deg | Lon: {}deg'.format(gps.latitude,
gps.longitude))
print('Qual: {}'.format(gps.fix_quality))
# Some attributes beyond latitude, longitude and timestamp are optional
# and might not be present. Check if they're None before trying to use!
# if gps.satellites is not None:
# print('# sat: {}'.format(gps.satellites))
# if gps.track_angle_deg is not None:
# print('Speed: {} knots'.format(gps.speed_knots))
# if gps.track_angle_deg is not None:
# print('Track angle: {} degrees'.format(gps.track_angle_deg))
# if gps.horizontal_dilution is not None:
# print('Horizontal dilution: {}'.format(gps.horizontal_dilution))
# if gps.height_geoid is not None:
# print('Height geo ID: {} meters'.format(gps.height_geoid))
def update_neopixel(data):
"""Conversion des données en couleur pour affichage sur NeoPixel
* ROUGE => temp?rature : max = 35?C, min =10?C soit une amplitude de 25?C
* BLEU => humidit? : max= 100%, mini=0%
* VERT => Pression : mini=960hPa, maxi = 1030hPa soit une amplitude 70hPa
"""
rouge = int((data['BME280']['temp']['val']-10)*255/25)
if rouge > 255:
rouge = 255
if rouge < 0:
rouge = 0
bleu = int(data['BME280']['hum']['val']*255/100)
vert = int((data['BME280']['press']['val']-960)*255/70)
if vert > 255:
vert = 255
if vert < 0:
vert = 0
if print_data:
print("Col:{}".format((rouge, vert, bleu)))
return (rouge, vert, bleu)
#########
# Setup #
#########
gc.collect()
micropython.mem_info()
# BME280 sensors (I2C)
i2c = I2C(board.SCL, board.SDA)
# i2c addresses for BME280 breakout :
# 0x77 = adafruit board
# 0x76 = chinese board
bme280 = Adafruit_BME280_I2C(i2c, address=0x76)
# Battery voltage
vbat = AnalogIn(board.D9, )
# GPS on FeatherWing board
if gps_enable:
gps_uart = UART(board.TX, board.RX, baudrate=9600, timeout=3000)
gps = GPS(gps_uart)
# Turn on the basic GGA and RMC info
gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0')
gps.send_command('PMTK220,1000') # 1000 ms refresh rate
# Integrated Neopixel
if data_to_neopixel:
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
else:
#if neopixel is disable : turn off the LED
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
pixel[0] = (0,0,0)
pixel = None
check_data_dir()
#############
# Main loop #
#############
data = Data()
last_update = time.monotonic()
while True:
if gps_enable:
gps.update()
current = time.monotonic()
if current - last_update >= update_interval:
last_update = current
data.update()
if print_data:
data.show()
# print(data.json())
if data_to_neopixel:
pixel[0] = update_neopixel(data.data)
gc.collect()
# micropython.mem_info(1)
# print('Memory free: {} allocated: {}'.format(gc.mem_free(), gc.mem_alloc()))