Cameteo/circuitpython/code/cameteo.py

439 lines
16 KiB
Python

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""Cameteo module
Data classes for GPS, BME280 sensors and some system stuff
Function to handle data files rotation and set internal RTC date from other
(more reliable) sources.
"""
import os
import time
import rtc
import microcontroller
import board
from busio import I2C, UART
from analogio import AnalogIn
from digitalio import DigitalInOut, Direction
from adafruit_bme280 import Adafruit_BME280_I2C
from adafruit_gps import GPS
TIME_FORMAT = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format
# Set the pin N°13 to use the onboard LED as read-only/read-write indicator
# (RED = read-only, no data recorded)
LED13 = DigitalInOut(board.D13)
LED13.direction = Direction.OUTPUT
LED13.value = False
###########
# Classes #
###########
class Data:
"""Class for handling data"""
def __init__(self,
name,
update_interval=10,
write_interval=300,
send_interval=60,
debug=False):
self.name = name
self.data = {}
self.debug = debug
self.update_interval = update_interval
self.write_interval = write_interval
self.send_interval = send_interval
self._last_update = 0
self._last_backup = 0
self._last_send = 0
self._last_file_rotation = time.localtime()
# Check if data directories exists if we need to use them
if self.write_interval >= 0:
try:
if 'data' not in os.listdir():
os.mkdir('data')
os.mkdir('data/hourly')
os.mkdir('data/daily')
elif 'hourly' not in os.listdir('data'):
os.mkdir('data/hourly')
elif 'daily' not in os.listdir('data'):
os.mkdir('data/daily')
except OSError as err:
print("Err {}: readonly".format(err))
self.write_interval = -1 # to avoid trying again till next reset
# Turn onboard led on to indicate read-only error
LED13.value = True
def __str__(self):
"""Serialize data for visualization on serial console"""
output = self.name + ":\n"
for item in self.data.items():
output += "\t{0}: {1}\n".format(*item)
return output
def __repr__(self):
return self.json
def update(self, current, verbose=True):
"""Read the data from sensors and update the data dict variable"""
if self.debug:
print("Update {} : Current : {} | Last : {} | Interval : {}".format(
self.name,
current,
self._last_update,
self.update_interval))
if current - self._last_update >= self.update_interval:
self._last_update = current
self._update()
if verbose:
print(self)
def _update(self):
"""Here comes specific update code"""
pass
@property
def json(self):
"""Serialized data to compact json-formatted string"""
output = '{"' + self.name + '":{'
first_data = True
for name, value in self.data.items():
if first_data:
comma = ""
first_data = False
else:
comma = ","
output = '{}{}"{}":"{}"'.format(output, comma, name, value)
output = output + '}}'
return output
def csv(self, header=False):
"""Serialized data values (or keys if header=True) as CSV line"""
if header:
return ";".join([str(x) for x in self.data.keys()])
return ";".join([str(x) for x in self.data.values()])
def write_on_flash(self, current, verbose=True):
"""Save the current data as csv file on SPI flash"""
if self.write_interval >= 0:
if self.debug:
print("Write {} : Current : {} | Last : {} | Interval : {}".format(
self.name,
current,
self._last_backup,
self.write_interval))
if current - self._last_backup >= self.write_interval:
self._last_backup = current
#First check if a rotation is needed
self._file_rotation()
file_name = "{}.csv".format(self.name)
file_path = "data"
try:
#Check if the file exists. If not, creates it with CSV header
if file_name not in os.listdir(file_path):
with open("/".join((file_path, file_name)), "w") as csv_file:
csv_file.write(self.csv(header=True))
csv_file.write("\n")
if verbose:
print("File created : {}".format(file_name))
with open("/".join((file_path, file_name)), "a") as csv_file:
csv_file.write(self.csv())
csv_file.write("\n")
if verbose:
print("Written data : \n{}".format(self.csv()))
except OSError as err:
print("Err {}: readonly".format(err))
# to avoid trying again till next reset
self.write_interval = -1
# Turn onboard led on to indicate read-only filesystem
LED13.value = True
def send_json(self, current, uart, verbose=True):
"""Send JSON string over UART"""
if self.debug:
print("Send {} : Current : {} | Last : {} | Interval : {}".format(
self.name,
current,
self._last_send,
self.send_interval))
if current - self._last_send >= self.send_interval:
self._last_send = current
uart.write(self.json)
uart.write("\n")
if verbose:
print(self.json)
#print("\n")
def _file_rotation(self):
"""Check if files need to rotate
=> every new hour
=> every new day
"""
current_time = time.localtime()
if self.debug:
print(self._last_file_rotation[3])
print(current_time[3])
# If the hour changed : copy current csv to hourly directory
if current_time[3] != self._last_file_rotation[3]:
print("Time to move hourly data !")
os.rename("data/{}.csv".format(self.name),
"data/hourly/{}_{:02}.csv".format(
self.name,
self._last_file_rotation[3]))
# If the day changed : copy content of hourly to daily directories
if current_time[2] != self._last_file_rotation[2]:
print("Time to move daily data !")
# Create new dir for the date of "yesterday"
newdir = "{}{:02}{:02}".format(*self._last_file_rotation[0:3])
if newdir not in os.listdir("data/daily/"):
os.mkdir("data/daily/" + newdir)
# Move each "hourly file" to the new directory
for file in os.listdir('data/hourly'):
# Move only file which name is beginning with self.name
if file.find(self.name) == 0:
print("Move {} to {}".format(file, newdir))
os.rename("data/hourly/{}".format(file), "{}/{}".format(newdir, file))
#Finally : remember last rotation check
self._last_file_rotation = current_time
return
class SysData(Data):
"""Subclass for Feather board data"""
def __init__(self, name="SYS", update_interval=10, write_interval=300,
send_interval=60, debug=False):
Data.__init__(self, name=name,
update_interval=update_interval,
write_interval=write_interval,
send_interval=send_interval,
debug=debug)
self.data = {'time': "2000/01/01_00:00:00",
'vbat': int(),
'cput': float()}
# Battery voltage
self.vbat = AnalogIn(board.VOLTAGE_MONITOR)
self._rtc = rtc.RTC()
def _update(self):
""" Update data from Feather board"""
self.data['time'] = TIME_FORMAT.format(*self._rtc.datetime[0:6])
self.data['cput'] = round(microcontroller.cpu.temperature, 2)
# Note about v_bat calculation :
# 0.000100708 = 2*3.3/65536 with
# 2 : voltage is divided by 2
# 3.3 : Vref = 3.3V
# 65536 : 16bit ADC
val = 0
samples = 10
for sample in range(samples):
val += self.vbat.value
time.sleep(0.01)
self.data['vbat'] = round(val/samples*0.000100708, 3)
class BME280Data(Data):
"""Subclass for BME280 sensor"""
def __init__(self, name="BME", update_interval=10, write_interval=300,
send_interval=60, debug=False):
Data.__init__(self, name=name,
update_interval=update_interval,
write_interval=write_interval,
send_interval=send_interval,
debug=debug)
self.data = {'time' : "2000/01/01_00:00:00",
'temp': float(),
'hum': int(),
'press': float()}
# BME280 sensors (I2C)
# i2c addresses for BME280 breakout :
# 0x77 = adafruit breakout board
# 0x76 = tiny cheap chinese board
self.bme280 = Adafruit_BME280_I2C(I2C(board.SCL, board.SDA),
address=0x76)
def _update(self):
"""Update data from BME280"""
self.data['time'] = TIME_FORMAT.format(*time.localtime()[0:6])
self.data['temp'] = round(self.bme280.temperature, 1)
self.data['hum'] = int(self.bme280.humidity)
self.data['press'] = round(self.bme280.pressure, 2)
def rgb(self, neopixel_max=70):
"""Convert atmospheric data from BME280 sensor into NeoPixel color as
a tuple (RED, BLUE, GREEN):
* RED => temperature : max = 35degC, min =10degC (range 25°C)
* BLUE => humidity : max= 100%, mini=0%
* GREEN => pression : mini=960hPa, maxi = 1030hPa (range 70hPa)
"""
# RED componant calculation from temperature data
# 10 is the min temperature, 25 is the range
# (10+25=35°C = max temperature)
red = int((self.data['temp']-10)*neopixel_max/25)
if red > neopixel_max:
red = neopixel_max
if red < 0:
red = 0
# BLUE componant calculation: very simple! By definition relative
# humidity cannot be more than 100 or less than 0, physically
blue = int(self.data['hum']*neopixel_max/100)
# GREEN component calculation : 960 is the minimum pressure and 70 is
# the range (960+70 = 1030hPa = max pressure)
green = int((self.data['press']-960)*neopixel_max/70)
if green > neopixel_max:
green = neopixel_max
if green < 0:
green = 0
if self.debug:
print("Col:{}".format((red, green, blue)))
return (red, green, blue)
class GPSData(Data):
"""Sub class for GPS"""
def __init__(self, name="GPS",
update_interval=0, write_interval=60, send_interval=60,
enable=True, enable_pin=DigitalInOut(board.A5),
debug=False):
Data.__init__(self, name=name,
update_interval=update_interval,
write_interval=write_interval,
send_interval=send_interval,
debug=debug)
self.data = {'time': "2000/01/01_00:00:00",
'lat': float(),
'lon': float(),
'alt': float(),
'qual': int(),
'age': int()}
self._enable = enable
self._gps_last_fix = int()
self._gps_current_fix = int()
# Set the pin to control the power to GPS module
self._gps_en_pin = enable_pin
self._gps_en_pin.direction = Direction.OUTPUT
self._gps = GPS(UART(board.TX, board.RX, baudrate=9600, timeout=3000)) #, debug=self.debug)
if self._enable:
self.enable()
else:
self.disable()
def _update(self):
self._gps.update()
if self._enable:
self._gps_current_fix = int(time.monotonic())
if self._gps.timestamp_utc is not None:
self.data['time'] = TIME_FORMAT.format(*self._gps.timestamp_utc[0:6])
else:
self.data['time'] = TIME_FORMAT.format(2018, 9, 10, 10, 0, 0)
if self._gps.has_fix:
self._gps_last_fix = self._gps_current_fix
self.data['lat'] = self._gps.latitude
self.data['lon'] = self._gps.longitude
self.data['alt'] = self._gps.altitude_m
self.data['qual'] = self._gps.fix_quality
self.data['age'] = 0
else:
self.data['age'] = int(self._gps_current_fix - self._gps_last_fix)
else:
self.data = {}
def enable(self):
"""Enable GPS module"""
# Set GPS module on FeatherWing board
if not self._enable:
self._enable = True
self._gps_en_pin.value = not self._enable # Set enable pin high to disable GPS module
time.sleep(0.1)
# Turn on the basic GGA and RMC info
self._gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0')
self._gps.send_command('PMTK220,1000') # 1000 ms refresh rate
def disable(self):
"""Disable GPS module"""
if self._enable:
self._enable = False
self._gps_en_pin.value = not self._enable # Set enable pin high to disable GPS module
@property
def datetime(self):
"""Wrapper to feed rtc.set_time_source()"""
return self._gps.timestamp_utc
#############
# Functions #
#############
def set_clock_from_localtime(clock, threshold=2.0, debug=False):
"""
Compare internal RTC and date-time from time.localtime() and set
the RTC date/time to this value if the difference is more or equal to
threshold (in seconds).
The source of time.localtime() can be set with rtc.set_time_source()
"""
if debug:
print("Clock date/time : " + TIME_FORMAT.format(*clock.datetime[0:6]))
print("Local date/time : " + TIME_FORMAT.format(*time.localtime()[0:6]))
# Check if time.localtime() is valid and can be converted in seconds
try:
local_seconds = time.mktime(time.localtime())
except OverflowError as err:
print(err)
print("Local time source : " + TIME_FORMAT.format(*time.localtime()[0:6]))
print("Wait some time to have proper time from time source...")
return 1
# Max difference between GPS and internal RTC (in seconds):
if abs(local_seconds - time.mktime(clock.datetime)) >= threshold:
print("Clock difference with GPS!")
print("Previous date/time : " + TIME_FORMAT.format(*clock.datetime[0:6]))
clock.datetime = time.localtime() # Trust localtime if there is a bias
print("Clocks synced !")
return 0