Cameteo/circuitpython/code/main.py

363 lines
13 KiB
Python

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
##########################
# Weather and GPS logger #
##########################
Author : Pierrick "Arofarn" Couturier
Use with:
* Adafruit Feather M4 Express (CircuitPython firmware)
* Adafruit Ultimate GPS FeatherWing
* Bosch BME280 sensor (air temperature, humidity, atmospheric pressure) on I2C
Pins used:
* TX/RX : Ultimate GPS FeatherWing (data)
* A5 : Ultimate GPS FeatherWing EN (enabe/disable GPS power)
* I2C SDA/SCL : BME280 sensor
* D5 : RW / RO jumper
* A2, A3 : UART TX & RX to Raspberry pi (115200 baudrate)
* D9 : to EN pin on Raspberry Lipo SHIM
* D13 : onboard LED (as RW/RO indicator)
Back pins diagramm:
D05 A2 A3 D09 NC_
GND GND SCL SDA 3V3
Git repository :
https://framagit.org/arofarn/Cameteo
TODO for v1 :
* write data on flash drive (work-in-progress)
* send data through UART (work-in-progress)
"""
__version__ = 0.2
##########
# config #
##########
print_data = True # Print data on USB UART / REPL output ?
send_json_data = True # Send data as JSON on second UART ?
backup_data = True # Write data as CSV files on onboard SPI Flash ?
data_to_neopixel = True # Display atmospheric data as color on onboard neopixel ?
gps_enable = True # Use GPS module ?
update_interval = const(10) # Interval between data acquisition (in seconds)
write_interval = const(60) # Interval between data written on flash Memory
send_interval = const(60) # Interval between packet of data sent to Rpi
datetime_format = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format
neopixel_max_value =const(70) #max value instead of brightness to spare some mem
#######################
import microcontroller, board
import gc, os
# import micropython
import time, rtc
from busio import I2C, UART
from analogio import AnalogIn
from digitalio import DigitalInOut, Direction
from adafruit_bme280 import Adafruit_BME280_I2C
from adafruit_gps import GPS
import neopixel
###########
# Classes #
###########
class Data:
"""Class for handling data"""
def __init__(self):
self.data = {'SYS': {'time': "2000/01/01_00:00:00",
'vbat': int(),
'cput': float()},
'BME': {'temp': float(),
'hum': int(),
'press': float()},
'GPS': {'time': "2000/01/01_00:00:00",
'lat': float(),
'lon': float(),
'alt': float(),
'qual': int(),
'age': int() }
}
self._gps_last_fix = int()
self._gps_current_fix = int()
def update(self):
"""Read the data from various sensors and update the data dict variable"""
#Data from Feather board
self.data['SYS']['time'] = datetime_format.format(*clock.datetime[0:6])
self.data['SYS']['vbat'] = round(measure_vbat(), 3)
self.data['SYS']['cput'] = round(microcontroller.cpu.temperature, 2)
#Data from BME280
self.data['BME']['temp'] = round(bme280.temperature, 1)
self.data['BME']['hum'] = int(bme280.humidity)
self.data['BME']['press'] = round(bme280.pressure, 2)
if gps_enable:
self._gps_current_fix = int(time.monotonic())
if gps.has_fix:
self._gps_last_fix = self._gps_current_fix
self.data['GPS']['time'] = datetime_format.format(gps.timestamp_utc.tm_year,
gps.timestamp_utc.tm_mon,
gps.timestamp_utc.tm_mday,
gps.timestamp_utc.tm_hour,
gps.timestamp_utc.tm_min,
gps.timestamp_utc.tm_sec)
self.data['GPS']['lat'] = gps.latitude
self.data['GPS']['lon'] = gps.longitude
self.data['GPS']['alt'] = gps.altitude_m
self.data['GPS']['qual'] = gps.fix_quality
self.data['GPS']['age'] = 0
else:
self.data['GPS']['age'] = int(self._gps_current_fix - self._gps_last_fix)
else:
self.data['GPS'] = None
def show(self):
"""Serialize data for visualization on serial console"""
for source in self.data.keys():
print(source + ": ")
if not self.data[source] == None:
for d in self.data[source].items():
print("\t{0}: {1}".format(d[0], d[1]))
@property
def json(self):
"""Serialized data to compact json-formatted string"""
output = '{'
first_source = True
for source in self.data.keys():
if first_source:
first_source = False
comma_src = ""
else:
comma_src = ","
output = '{}{}"{}":'.format(output, comma_src, source)
if not self.data[source] == None:
output = output + '{'
first_data = True
for d in self.data[source].items():
if first_data:
comma = ""
first_data = False
else:
comma = ","
output = '{}{}"{}":"{}"'.format(output, comma, d[0], d[1])
output = output + '}'
output = output + '}'
return output
@property
def rgb(self):
"""Convert atmospheric data from BME280 sensor into NeoPixel color as
a tuple (RED, BLUE, GREEN):
* RED => temperature : max = 35degC, min =10degC (range 25°C)
* BLUE => humidity : max= 100%, mini=0%
* GREEN => pression : mini=960hPa, maxi = 1030hPa (range 70hPa)
"""
# RED componant calculation from temperature data
# 10 is the min temperature, 25 is the range
# (10+25=35°C = max temperature)
red = int((self.data['BME']['temp']-10)*neopixel_max_value/25)
if red > neopixel_max_value:
red = neopixel_max_value
if red < 0:
red = 0
# BLUE componant calculation: very simple! By definition relative
# humidity cannot be more than 100 or less than 0, physically
blue = int(self.data['BME']['hum']*neopixel_max_value/100)
# GREEN component calculation : 960 is the minimum pressure and 70 is
# the range (960+70 = 1030hPa = max pressure)
green = int((self.data['BME']['press']-960)*neopixel_max_value/70)
if green > neopixel_max_value:
green = neopixel_max_value
if green < 0:
green = 0
if print_data:
print("Col:{}".format((red, green, blue)))
return (red, green, blue)
def write_on_flash(self):
"""Save the current data as csv file on SPI flash"""
try:
with open("data/data.csv", "a") as csv_file:
csv_file.write("{};{};{};{}\n".format(self.data['SYS']['time'],
self.data['BME']['temp'],
self.data['BME']['hum'],
self.data['BME']['press']))
except OSError as e:
print("Err {}: readonly".format(e))
backup_data = False #to avoid trying again till next reset
# Turn onboard led on to indicate read-only error
led13.value = True
#############
# Functions #
#############
def check_data_dir():
"""Check if data directories exists"""
try:
if 'data' not in os.listdir():
os.mkdir('data')
os.mkdir('data/hourly')
os.mkdir('data/daily')
elif 'hourly' not in os.listdir('data'):
os.mkdir('data/hourly')
elif 'daily' not in os.listdir('data'):
os.mkdir('data/daily')
except OSError as e:
print("Err {}: readonly".format(e))
backup_data = False #to avoid trying again till next reset
# Turn onboard led on to indicate read-only error
led13.value = True
def set_clock_from_GPS(treshold=5.0):
"""Compare internal RTC and date-time from GPS (if enable and fixed) and set
the RTC date time to GPS date-time if the difference is more or equal to
threshold (in seconds)"""
if gps_enable and gps.has_fix:
#Congreen GPS timestamp into struct_time
gps_datetime = time.struct_time((gps.timestamp_utc.tm_year,
gps.timestamp_utc.tm_mon,
gps.timestamp_utc.tm_mday,
gps.timestamp_utc.tm_hour,
gps.timestamp_utc.tm_min,
gps.timestamp_utc.tm_sec, 0, 0, 0))
#Max difference between GPS and internal RTC (in seconds):
if abs(time.mktime(gps_datetime) - time.mktime(clock.datetime)) >= treshold:
# print("Clock difference with GPS!")
# print("Previous date/time : " + datetime_format.format(*clock.datetime[0:6]))
clock.datetime = gps_datetime #Trust GPS if there is a bias
print("Clocks synced !")
def measure_vbat(samples=10, timestep=0.01):
"""Measure Vbattery as the mean of n samples with timestep second between
each measurement"""
# Note about v_bat calculation :
# 0.000100708 = 2*3.3/65536 with
# 2 : voltage is divided by 2
# 3.3 : Vref = 3.3V
# 65536 : 16bit ADC
v = 0
for i in range(samples):
v = v + vbat.value
time.sleep(timestep)
return v/samples*0.000100708
#########
# Setup #
#########
gc.collect()
#micropython.mem_info()
#Enable RTC of the feather M0 board
clock = rtc.RTC()
#clock.datetime = time.struct_time((2018, 7, 29, 15, 31, 30, 0, 0, 0))
# BME280 sensors (I2C)
i2c = I2C(board.SCL, board.SDA)
# i2c addresses for BME280 breakout :
# 0x77 = adafruit breakout board
# 0x76 = tiny chinese board
bme280 = Adafruit_BME280_I2C(i2c, address=0x76)
# Battery voltage
vbat = AnalogIn(board.VOLTAGE_MONITOR)
#Set the pin to control the power to GPS module
gps_en_pin = DigitalInOut(board.A5)
gps_en_pin.direction = Direction.OUTPUT
#Set the pin N°13 to use the onboard LED as read-only/read-write indicator
led13 = DigitalInOut(board.D13)
led13.direction = Direction.OUTPUT
led13.value = False
# Set GPS module on FeatherWing board
gps_en_pin.value = not gps_enable #Set enable pin high to disable GPS module
if gps_enable:
gps_uart = UART(board.TX, board.RX,
baudrate=9600, timeout=3000)
gps = GPS(gps_uart)
# Turn on the basic GGA and RMC info
gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0')
gps.send_command('PMTK220,1000') # 1000 ms refresh rate
# Second UART to communicate with raspberry pi (throught GPIO)
if send_json_data:
rpi_uart = UART(board.A2, board.A3,
baudrate=115200, timeout=2000)
# Set onboard Neopixel : used as atmo data output
# brightness is fixed to 1 to spare some memory. Use neopixel_max_value instead
if data_to_neopixel:
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
else:
#if neopixel is disable : turn off the LED
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
pixel[0] = (0,0,0)
pixel = None
#Finally check if data directories exist
check_data_dir()
#############
# Main loop #
#############
#Create the Data object
data = Data()
#Init timer
last_update = last_sent_packet = last_written_data = time.monotonic()
while True:
if gps_enable:
gps.update()
current = time.monotonic()
if current - last_update >= update_interval:
last_update = current
set_clock_from_GPS()
data.update()
if print_data:
data.show()
if data_to_neopixel:
pixel[0] = data.rgb
if send_json_data and current - last_sent_packet >= send_interval :
last_sent_packet = current
rpi_uart.write(data.json + '\n')
print(data.json + '\n')
if backup_data and current - last_written_data >= write_interval :
last_written_data = current
print("Backup data...")
data.write_on_flash()
gc.collect()
# micropython.mem_info(1)
# print('Memory free: {} allocated: {}'.format(gc.mem_free(), gc.mem_alloc()))