Update from simple test script to more complete script
This commit is contained in:
parent
71310249c3
commit
db92dbe984
@ -1,126 +1,229 @@
|
|||||||
# Simple weather and GPS logger
|
# Weather and GPS logger
|
||||||
|
|
||||||
import board
|
##########
|
||||||
|
# Config #
|
||||||
|
##########
|
||||||
|
print_data = True
|
||||||
|
data_to_neopixel = True
|
||||||
|
gps_enable = False
|
||||||
|
update_interval = 5.0 # in seconds
|
||||||
|
send_json_data = True
|
||||||
|
|
||||||
|
#######################
|
||||||
|
|
||||||
|
import board, time, microcontroller
|
||||||
|
import gc, micropython, os
|
||||||
from busio import I2C, UART
|
from busio import I2C, UART
|
||||||
from time import sleep
|
|
||||||
from analogio import AnalogIn
|
from analogio import AnalogIn
|
||||||
import microcontroller
|
|
||||||
import gc
|
|
||||||
import micropython
|
|
||||||
import os
|
|
||||||
|
|
||||||
from adafruit_bme280 import Adafruit_BME280_I2C
|
from adafruit_bme280 import Adafruit_BME280_I2C
|
||||||
from adafruit_gps import GPS
|
from adafruit_gps import GPS
|
||||||
import neopixel
|
import neopixel
|
||||||
|
|
||||||
#########
|
###########
|
||||||
# Setup #
|
# Classes #
|
||||||
#########
|
###########
|
||||||
|
|
||||||
# BME280 sensors (I2C)
|
class Data:
|
||||||
i2c = I2C(board.SCL, board.SDA)
|
"""Class for handling data format and transmission"""
|
||||||
bme280 = Adafruit_BME280_I2C(i2c)
|
def __init__(self):
|
||||||
|
self.data = {}
|
||||||
|
|
||||||
# Integrated Neopixel
|
def update(self):
|
||||||
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
|
"""Read the data from various sensors and update the data dict variable"""
|
||||||
|
#Data from Feather board
|
||||||
|
self.data['SYS'] = {'v_bat': { 'val': vbat.value*0.000100708, 'unit': 'V' },
|
||||||
|
'CPU_temp': { 'val': microcontroller.cpu.temperature, 'unit': '°C' }}
|
||||||
|
# Note about v_bat calculation :
|
||||||
|
# 0.000100708 = 2*3.3/65536 with
|
||||||
|
# 2 : voltage is divided by 2
|
||||||
|
# 3.3 : Vref = 3.3V
|
||||||
|
# 65536 : 16bit ADC
|
||||||
|
|
||||||
# Battery voltage
|
#Data from BME280
|
||||||
vbat = AnalogIn(board.D9, )
|
self.data['BME280'] = {'temp': { 'val': bme280.temperature, 'unit': '°C' },
|
||||||
|
'hum': { 'val': bme280.humidity, 'unit': '%' },
|
||||||
|
'press': { 'val': bme280.pressure, 'unit': 'hPa' }}
|
||||||
|
if gps_enable:
|
||||||
|
if gps.has_fix:
|
||||||
|
self.data['GPS'] = {'timestamp': '{:04}/{:02}/{:02} {:02}:{:02}:{:02}'.format(gps.timestamp_utc.tm_year,
|
||||||
|
gps.timestamp_utc.tm_mon,
|
||||||
|
gps.timestamp_utc.tm_mday,
|
||||||
|
gps.timestamp_utc.tm_hour,
|
||||||
|
gps.timestamp_utc.tm_min,
|
||||||
|
gps.timestamp_utc.tm_sec),
|
||||||
|
'lat': {'val': gps.latitude, 'unit': 'deg'},
|
||||||
|
'lon': {'val': gps.longitude, 'unit': 'deg'},
|
||||||
|
'alt': {'val': gps.altitude_m, 'unit': 'm'},
|
||||||
|
'qual': {'val': gps.fix_quality, 'unit': None}}
|
||||||
|
else:
|
||||||
|
self.data['GPS'] = {'lat': {'val': None, 'unit': 'deg'},
|
||||||
|
'lon': {'val': None, 'unit': 'deg'},
|
||||||
|
'alt': {'val': None, 'unit': 'm'}}
|
||||||
|
else:
|
||||||
|
self.data['GPS'] = None
|
||||||
|
|
||||||
# GPS on Feather board
|
def show(self):
|
||||||
gps_uart = UART(board.TX, board.RX, baudrate=9600, timeout=3000)
|
"""Serialize data to json-formatted string for
|
||||||
gps = GPS(gps_uart)
|
visualization on serial console
|
||||||
# Turn on the basic GGA and RMC info
|
"""
|
||||||
gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0')
|
for source in self.data.keys():
|
||||||
gps.send_command('PMTK220,2000') # 1000 ms refresh
|
print(source + ": ")
|
||||||
|
if not self.data[source] == None:
|
||||||
|
for d in self.data[source].items():
|
||||||
|
print("\t{0}: {val} {unit}".format(d[0], **d[1]))
|
||||||
|
|
||||||
gc.collect()
|
def json(self):
|
||||||
micropython.mem_info()
|
"""Serialize data to json-formatted string"""
|
||||||
|
output = "{"
|
||||||
|
for source in self.data.keys():
|
||||||
|
output = "".join((output, "'", source, "': \n"))
|
||||||
|
if not self.data[source] == None:
|
||||||
|
for d in self.data[source].items():
|
||||||
|
output = "".join((output, "{",
|
||||||
|
"'{}': ".format(d[0]),
|
||||||
|
"{",
|
||||||
|
"'val': {val},'unit': {unit}".format(**d[1]),
|
||||||
|
"}}\n"))
|
||||||
|
output = output + "}, \n"
|
||||||
|
output = output + "}"
|
||||||
|
return output
|
||||||
|
|
||||||
temp, hum, press = 0.0, 0.0, 0.0
|
#############
|
||||||
rouge, vert, bleu = 0, 0, 0
|
# Functions #
|
||||||
|
#############
|
||||||
|
|
||||||
# Check if data directory exists
|
def check_data_dir():
|
||||||
if 'data' not in os.listdir():
|
"""Check if data directories exists"""
|
||||||
|
if 'data' not in os.listdir():
|
||||||
os.mkdir('data')
|
os.mkdir('data')
|
||||||
os.mkdir('data/hourly')
|
os.mkdir('data/hourly')
|
||||||
os.mkdir('data/daily')
|
os.mkdir('data/daily')
|
||||||
|
elif 'hourly' not in os.listdir('data'):
|
||||||
|
os.mkdir('data/hourly')
|
||||||
|
elif 'daily' not in os.listdir('data'):
|
||||||
|
os.mkdir('data/daily')
|
||||||
|
|
||||||
#############
|
def update_gps():
|
||||||
# Main loop #
|
"""Update and print data from GPS module"""
|
||||||
#############
|
|
||||||
while True:
|
|
||||||
sleep(5)
|
|
||||||
|
|
||||||
gc.collect()
|
|
||||||
# micropython.mem_info(1)
|
|
||||||
# print('Memory free: {} allocated: {}'.format(gc.mem_free(), gc.mem_alloc()))
|
|
||||||
|
|
||||||
temp = bme280.temperature
|
|
||||||
hum = bme280.humidity
|
|
||||||
press = bme280.pressure
|
|
||||||
|
|
||||||
print("Temperature: {:>+.1f} degC | Humidite: {:>.1f} % | Pression: {:>.1f} hPa".format(temp, hum, press))
|
|
||||||
print("Tension batterie : {:>.2f} V | CPU Temp: {:>+.1f} degC".format((vbat.value*2*3.3/65536), microcontroller.cpu.temperature))
|
|
||||||
# 0.00644531 = 2*3.3/1024 :
|
|
||||||
# 2 : voltage is divided by 2
|
|
||||||
# 3.3 : Vref = 3.3V
|
|
||||||
# 1024 : 10bit ADC
|
|
||||||
|
|
||||||
# Conversion des donn?es en couleur
|
|
||||||
# ROUGE => temp?rature : max = 35?C, min =10?C soit une amplitude de 25?C
|
|
||||||
rouge = int((temp-10)*255/25)
|
|
||||||
if rouge > 255:
|
|
||||||
rouge = 255
|
|
||||||
if rouge < 0:
|
|
||||||
rouge = 0
|
|
||||||
|
|
||||||
# BLEU => humidit? : max= 100%, mini=0%
|
|
||||||
bleu = int(hum*255/100)
|
|
||||||
|
|
||||||
# VERT => Pression : mini=960hPa, maxi = 1030hPa soit une amplitude 70hPa
|
|
||||||
vert = int((press-960)*255/70)
|
|
||||||
if vert > 255:
|
|
||||||
vert = 255
|
|
||||||
if vert < 0:
|
|
||||||
vert = 0
|
|
||||||
|
|
||||||
rvb = (rouge, vert, bleu)
|
|
||||||
print("Couleur : {}".format(rvb))
|
|
||||||
pixel[0] = rvb
|
|
||||||
|
|
||||||
gps.update()
|
|
||||||
if not gps.has_fix:
|
if not gps.has_fix:
|
||||||
# Try again if we don't have a fix yet.
|
# Try again if we don't have a fix yet.
|
||||||
print('Waiting for fix... {} - {}'.format(gps.has_fix, gps.satellites))
|
print('Waiting for fix...{}-{}'.format(gps.has_fix, gps.satellites))
|
||||||
continue
|
|
||||||
|
|
||||||
print('Fix timestamp: {}/{}/{} {:02}:{:02}:{:02}'.format(
|
else: # GPS fix OK !!!
|
||||||
|
#UTC Time
|
||||||
|
print('{}/{}/{} {:02}:{:02}:{:02}'.format(
|
||||||
gps.timestamp_utc.tm_mon, # Grab parts of the time from the
|
gps.timestamp_utc.tm_mon, # Grab parts of the time from the
|
||||||
gps.timestamp_utc.tm_mday, # struct_time object that holds
|
gps.timestamp_utc.tm_mday, # struct_time object that holds
|
||||||
gps.timestamp_utc.tm_year, # the fix time. Note you might
|
gps.timestamp_utc.tm_year, # the fix time. Note you might
|
||||||
gps.timestamp_utc.tm_hour, # not get all data like year, day,
|
gps.timestamp_utc.tm_hour, # not get all data like year, day,
|
||||||
gps.timestamp_utc.tm_min, # month!
|
gps.timestamp_utc.tm_min, # month!
|
||||||
gps.timestamp_utc.tm_sec))
|
gps.timestamp_utc.tm_sec))
|
||||||
|
|
||||||
if gps.altitude_m is not None:
|
if gps.altitude_m is not None:
|
||||||
print('Latitude: {} deg | Longitude: {} deg | Altitude: {} m'.format(gps.latitude,
|
print('Lat: {}deg | Lon: {}deg | Alt: {}m'.format(gps.latitude,
|
||||||
gps.longitude,
|
gps.longitude,
|
||||||
gps.altitude_m))
|
gps.altitude_m))
|
||||||
else:
|
else:
|
||||||
print('Latitude: {} deg | Longitude: {} deg'.format(gps.latitude,
|
print('Lat: {}deg | Lon: {}deg'.format(gps.latitude,
|
||||||
gps.longitude))
|
gps.longitude))
|
||||||
|
|
||||||
print('Fix quality: {}'.format(gps.fix_quality))
|
print('Qual: {}'.format(gps.fix_quality))
|
||||||
# Some attributes beyond latitude, longitude and timestamp are optional
|
# Some attributes beyond latitude, longitude and timestamp are optional
|
||||||
# and might not be present. Check if they're None before trying to use!
|
# and might not be present. Check if they're None before trying to use!
|
||||||
if gps.satellites is not None:
|
# if gps.satellites is not None:
|
||||||
print('# satellites: {}'.format(gps.satellites))
|
# print('# sat: {}'.format(gps.satellites))
|
||||||
if gps.track_angle_deg is not None:
|
# if gps.track_angle_deg is not None:
|
||||||
print('Speed: {} knots'.format(gps.speed_knots))
|
# print('Speed: {} knots'.format(gps.speed_knots))
|
||||||
if gps.track_angle_deg is not None:
|
# if gps.track_angle_deg is not None:
|
||||||
print('Track angle: {} degrees'.format(gps.track_angle_deg))
|
# print('Track angle: {} degrees'.format(gps.track_angle_deg))
|
||||||
if gps.horizontal_dilution is not None:
|
# if gps.horizontal_dilution is not None:
|
||||||
print('Horizontal dilution: {}'.format(gps.horizontal_dilution))
|
# print('Horizontal dilution: {}'.format(gps.horizontal_dilution))
|
||||||
if gps.height_geoid is not None:
|
# if gps.height_geoid is not None:
|
||||||
print('Height geo ID: {} meters'.format(gps.height_geoid))
|
# print('Height geo ID: {} meters'.format(gps.height_geoid))
|
||||||
|
|
||||||
|
def update_neopixel(data):
|
||||||
|
"""Conversion des données en couleur pour affichage sur NeoPixel
|
||||||
|
* ROUGE => temp?rature : max = 35?C, min =10?C soit une amplitude de 25?C
|
||||||
|
* BLEU => humidit? : max= 100%, mini=0%
|
||||||
|
* VERT => Pression : mini=960hPa, maxi = 1030hPa soit une amplitude 70hPa
|
||||||
|
"""
|
||||||
|
|
||||||
|
rouge = int((data['BME280']['temp']['val']-10)*255/25)
|
||||||
|
if rouge > 255:
|
||||||
|
rouge = 255
|
||||||
|
if rouge < 0:
|
||||||
|
rouge = 0
|
||||||
|
|
||||||
|
bleu = int(data['BME280']['hum']['val']*255/100)
|
||||||
|
|
||||||
|
vert = int((data['BME280']['press']['val']-960)*255/70)
|
||||||
|
if vert > 255:
|
||||||
|
vert = 255
|
||||||
|
if vert < 0:
|
||||||
|
vert = 0
|
||||||
|
|
||||||
|
if print_data:
|
||||||
|
print("Col:{}".format((rouge, vert, bleu)))
|
||||||
|
|
||||||
|
return (rouge, vert, bleu)
|
||||||
|
|
||||||
|
#########
|
||||||
|
# Setup #
|
||||||
|
#########
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
micropython.mem_info()
|
||||||
|
|
||||||
|
# BME280 sensors (I2C)
|
||||||
|
i2c = I2C(board.SCL, board.SDA)
|
||||||
|
# i2c addresses for BME280 breakout :
|
||||||
|
# 0x77 = adafruit board
|
||||||
|
# 0x76 = chinese board
|
||||||
|
bme280 = Adafruit_BME280_I2C(i2c, address=0x76)
|
||||||
|
|
||||||
|
# Battery voltage
|
||||||
|
vbat = AnalogIn(board.D9, )
|
||||||
|
|
||||||
|
# GPS on FeatherWing board
|
||||||
|
if gps_enable:
|
||||||
|
gps_uart = UART(board.TX, board.RX, baudrate=9600, timeout=3000)
|
||||||
|
gps = GPS(gps_uart)
|
||||||
|
# Turn on the basic GGA and RMC info
|
||||||
|
gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0')
|
||||||
|
gps.send_command('PMTK220,1000') # 1000 ms refresh rate
|
||||||
|
|
||||||
|
# Integrated Neopixel
|
||||||
|
if data_to_neopixel:
|
||||||
|
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
|
||||||
|
else:
|
||||||
|
#if neopixel is disable : turn off the LED
|
||||||
|
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
|
||||||
|
pixel[0] = (0,0,0)
|
||||||
|
pixel = None
|
||||||
|
|
||||||
|
check_data_dir()
|
||||||
|
|
||||||
|
#############
|
||||||
|
# Main loop #
|
||||||
|
#############
|
||||||
|
|
||||||
|
data = Data()
|
||||||
|
last_update = time.monotonic()
|
||||||
|
while True:
|
||||||
|
|
||||||
|
if gps_enable:
|
||||||
|
gps.update()
|
||||||
|
|
||||||
|
current = time.monotonic()
|
||||||
|
if current - last_update >= update_interval:
|
||||||
|
last_update = current
|
||||||
|
data.update()
|
||||||
|
if print_data:
|
||||||
|
data.show()
|
||||||
|
# print(data.json())
|
||||||
|
if data_to_neopixel:
|
||||||
|
pixel[0] = update_neopixel(data.data)
|
||||||
|
gc.collect()
|
||||||
|
# micropython.mem_info(1)
|
||||||
|
# print('Memory free: {} allocated: {}'.format(gc.mem_free(), gc.mem_alloc()))
|
||||||
|
Loading…
Reference in New Issue
Block a user