pylint and PEP 8 checks
Only styles changes
This commit is contained in:
parent
b4036b8092
commit
67b7fdce68
@ -15,7 +15,6 @@
|
||||
##########################
|
||||
# Weather and GPS logger #
|
||||
##########################
|
||||
Author : Pierrick "Arofarn" Couturier
|
||||
|
||||
Use with:
|
||||
* Adafruit Feather M4 Express (CircuitPython firmware)
|
||||
@ -43,41 +42,46 @@ TODO for v1 :
|
||||
* send data through UART (work-in-progress)
|
||||
"""
|
||||
__version__ = 0.2
|
||||
__author__ = "arofarn"
|
||||
|
||||
#######################
|
||||
import time
|
||||
import gc
|
||||
import os
|
||||
import rtc
|
||||
import microcontroller
|
||||
import board
|
||||
# import micropython
|
||||
from busio import I2C, UART
|
||||
from analogio import AnalogIn
|
||||
from digitalio import DigitalInOut, Direction
|
||||
from adafruit_bme280 import Adafruit_BME280_I2C
|
||||
from adafruit_gps import GPS
|
||||
import neopixel
|
||||
|
||||
|
||||
##########
|
||||
# config #
|
||||
##########
|
||||
print_data = True # Print data on USB UART / REPL output ?
|
||||
send_json_data = True # Send data as JSON on second UART ?
|
||||
backup_data = True # Write data as CSV files on onboard SPI Flash ?
|
||||
data_to_neopixel = True # Display atmospheric data as color on onboard neopixel ?
|
||||
gps_enable = True # Use GPS module ?
|
||||
update_interval = const(10) # Interval between data acquisition (in seconds)
|
||||
write_interval = const(60) # Interval between data written on flash Memory
|
||||
send_interval = const(60) # Interval between packet of data sent to Rpi
|
||||
datetime_format = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format
|
||||
neopixel_max_value =const(70) #max value instead of brightness to spare some mem
|
||||
|
||||
#######################
|
||||
|
||||
import microcontroller, board
|
||||
import gc, os
|
||||
# import micropython
|
||||
import time, rtc
|
||||
from busio import I2C, UART
|
||||
from analogio import AnalogIn
|
||||
from digitalio import DigitalInOut, Direction
|
||||
|
||||
from adafruit_bme280 import Adafruit_BME280_I2C
|
||||
from adafruit_gps import GPS
|
||||
import neopixel
|
||||
print_data_flag = True # Print data on USB UART / REPL output ?
|
||||
send_json_flag = True # Send data as JSON on second UART ?
|
||||
backup_data_flag = True # Write data as CSV files on onboard SPI Flash ?
|
||||
neopixel_flag = True # Display atmospheric data as color on onboard neopixel ?
|
||||
gps_enable_flag = True # Use GPS module ?
|
||||
UPDATE_INTERVAL = 10 # Interval between data acquisition (in seconds)
|
||||
WRITE_INTERVAL = 60 # Interval between data written on flash Memory
|
||||
SEND_INTERVAL = 60 # Interval between packet of data sent to Rpi
|
||||
TIME_FORMAT = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format
|
||||
NEOPIXEL_MAX_VALUE = 70 # max value instead of brightness to spare some mem
|
||||
|
||||
###########
|
||||
# Classes #
|
||||
###########
|
||||
|
||||
|
||||
class Data:
|
||||
"""Class for handling data"""
|
||||
|
||||
def __init__(self):
|
||||
self.data = {'SYS': {'time': "2000/01/01_00:00:00",
|
||||
'vbat': int(),
|
||||
@ -90,33 +94,32 @@ class Data:
|
||||
'lon': float(),
|
||||
'alt': float(),
|
||||
'qual': int(),
|
||||
'age': int() }
|
||||
}
|
||||
'age': int()}}
|
||||
self._gps_last_fix = int()
|
||||
self._gps_current_fix = int()
|
||||
|
||||
def update(self):
|
||||
"""Read the data from various sensors and update the data dict variable"""
|
||||
#Data from Feather board
|
||||
self.data['SYS']['time'] = datetime_format.format(*clock.datetime[0:6])
|
||||
# Data from Feather board
|
||||
self.data['SYS']['time'] = TIME_FORMAT.format(*clock.datetime[0:6])
|
||||
self.data['SYS']['vbat'] = round(measure_vbat(), 3)
|
||||
self.data['SYS']['cput'] = round(microcontroller.cpu.temperature, 2)
|
||||
|
||||
#Data from BME280
|
||||
# Data from BME280
|
||||
self.data['BME']['temp'] = round(bme280.temperature, 1)
|
||||
self.data['BME']['hum'] = int(bme280.humidity)
|
||||
self.data['BME']['press'] = round(bme280.pressure, 2)
|
||||
|
||||
if gps_enable:
|
||||
if gps_enable_flag:
|
||||
self._gps_current_fix = int(time.monotonic())
|
||||
if gps.has_fix:
|
||||
self._gps_last_fix = self._gps_current_fix
|
||||
self.data['GPS']['time'] = datetime_format.format(gps.timestamp_utc.tm_year,
|
||||
gps.timestamp_utc.tm_mon,
|
||||
gps.timestamp_utc.tm_mday,
|
||||
gps.timestamp_utc.tm_hour,
|
||||
gps.timestamp_utc.tm_min,
|
||||
gps.timestamp_utc.tm_sec)
|
||||
self.data['GPS']['time'] = TIME_FORMAT.format(gps.timestamp_utc.tm_year,
|
||||
gps.timestamp_utc.tm_mon,
|
||||
gps.timestamp_utc.tm_mday,
|
||||
gps.timestamp_utc.tm_hour,
|
||||
gps.timestamp_utc.tm_min,
|
||||
gps.timestamp_utc.tm_sec)
|
||||
self.data['GPS']['lat'] = gps.latitude
|
||||
self.data['GPS']['lon'] = gps.longitude
|
||||
self.data['GPS']['alt'] = gps.altitude_m
|
||||
@ -128,12 +131,12 @@ class Data:
|
||||
self.data['GPS'] = None
|
||||
|
||||
def show(self):
|
||||
"""Serialize data for visualization on serial console"""
|
||||
for source in self.data.keys():
|
||||
print(source + ": ")
|
||||
if not self.data[source] == None:
|
||||
for d in self.data[source].items():
|
||||
print("\t{0}: {1}".format(d[0], d[1]))
|
||||
"""Serialize data for visualization on serial console"""
|
||||
for source in self.data.keys():
|
||||
print(source + ": ")
|
||||
if not self.data[source] is None:
|
||||
for d in self.data[source].items():
|
||||
print("\t{0}: {1}".format(d[0], d[1]))
|
||||
|
||||
@property
|
||||
def json(self):
|
||||
@ -147,7 +150,7 @@ class Data:
|
||||
else:
|
||||
comma_src = ","
|
||||
output = '{}{}"{}":'.format(output, comma_src, source)
|
||||
if not self.data[source] == None:
|
||||
if not self.data[source] is None:
|
||||
output = output + '{'
|
||||
first_data = True
|
||||
for d in self.data[source].items():
|
||||
@ -172,55 +175,60 @@ class Data:
|
||||
# RED componant calculation from temperature data
|
||||
# 10 is the min temperature, 25 is the range
|
||||
# (10+25=35°C = max temperature)
|
||||
red = int((self.data['BME']['temp']-10)*neopixel_max_value/25)
|
||||
if red > neopixel_max_value:
|
||||
red = neopixel_max_value
|
||||
red = int((self.data['BME']['temp']-10)*NEOPIXEL_MAX_VALUE/25)
|
||||
if red > NEOPIXEL_MAX_VALUE:
|
||||
red = NEOPIXEL_MAX_VALUE
|
||||
if red < 0:
|
||||
red = 0
|
||||
|
||||
# BLUE componant calculation: very simple! By definition relative
|
||||
# humidity cannot be more than 100 or less than 0, physically
|
||||
blue = int(self.data['BME']['hum']*neopixel_max_value/100)
|
||||
blue = int(self.data['BME']['hum']*NEOPIXEL_MAX_VALUE/100)
|
||||
|
||||
# GREEN component calculation : 960 is the minimum pressure and 70 is
|
||||
# the range (960+70 = 1030hPa = max pressure)
|
||||
green = int((self.data['BME']['press']-960)*neopixel_max_value/70)
|
||||
if green > neopixel_max_value:
|
||||
green = neopixel_max_value
|
||||
green = int((self.data['BME']['press']-960)*NEOPIXEL_MAX_VALUE/70)
|
||||
if green > NEOPIXEL_MAX_VALUE:
|
||||
green = NEOPIXEL_MAX_VALUE
|
||||
if green < 0:
|
||||
green = 0
|
||||
|
||||
if print_data:
|
||||
if print_data_flag:
|
||||
print("Col:{}".format((red, green, blue)))
|
||||
|
||||
return (red, green, blue)
|
||||
|
||||
def write_on_flash(self):
|
||||
"""Save the current data as csv file on SPI flash"""
|
||||
global backup_data_flag
|
||||
try:
|
||||
with open("data/data.csv", "a") as csv_file:
|
||||
if gps_enable:
|
||||
csv_file.write("{};{};{};{};{};{};{};{};{};{}\n".format(self.data['SYS']['time'],
|
||||
self.data['BME']['temp'],
|
||||
self.data['BME']['hum'],
|
||||
self.data['BME']['press'],
|
||||
self.data['SYS']['vbat'],
|
||||
self.data['GPS']['time'],
|
||||
self.data['GPS']['lon'],
|
||||
self.data['GPS']['lat'],
|
||||
self.data['GPS']['alt'],
|
||||
self.data['GPS']['qual'],
|
||||
))
|
||||
if gps_enable_flag:
|
||||
csv_file.write("{};{};{};{};{};{};{};{};{};{}\n".format(
|
||||
self.data['SYS']['time'],
|
||||
self.data['BME']['temp'],
|
||||
self.data['BME']['hum'],
|
||||
self.data['BME']['press'],
|
||||
self.data['SYS']['vbat'],
|
||||
self.data['GPS']['time'],
|
||||
self.data['GPS']['lon'],
|
||||
self.data['GPS']['lat'],
|
||||
self.data['GPS']['alt'],
|
||||
self.data['GPS']['qual'],
|
||||
))
|
||||
else:
|
||||
csv_file.write("{};{};{};{};{};;;;;\n".format(self.data['SYS']['time'],
|
||||
self.data['BME']['temp'],
|
||||
self.data['BME']['hum'],
|
||||
self.data['BME']['press'],
|
||||
self.data['SYS']['vbat']
|
||||
))
|
||||
except OSError as e:
|
||||
print("Err {}: readonly".format(e))
|
||||
backup_data = False #to avoid trying again till next reset
|
||||
csv_file.write("{};{};{};{};{};;;;;\n".format(
|
||||
self.data['SYS']['time'],
|
||||
self.data['BME']['temp'],
|
||||
self.data['BME']['hum'],
|
||||
self.data['BME']['press'],
|
||||
self.data['SYS']['vbat'],
|
||||
))
|
||||
|
||||
except OSError as err:
|
||||
print("Err {}: readonly".format(err))
|
||||
# to avoid trying again till next reset
|
||||
backup_data_flag = False
|
||||
# Turn onboard led on to indicate read-only error
|
||||
led13.value = True
|
||||
|
||||
@ -228,8 +236,10 @@ class Data:
|
||||
# Functions #
|
||||
#############
|
||||
|
||||
|
||||
def check_data_dir():
|
||||
"""Check if data directories exists"""
|
||||
global backup_data_flag
|
||||
try:
|
||||
if 'data' not in os.listdir():
|
||||
os.mkdir('data')
|
||||
@ -239,60 +249,60 @@ def check_data_dir():
|
||||
os.mkdir('data/hourly')
|
||||
elif 'daily' not in os.listdir('data'):
|
||||
os.mkdir('data/daily')
|
||||
except OSError as e:
|
||||
print("Err {}: readonly".format(e))
|
||||
backup_data = False #to avoid trying again till next reset
|
||||
except OSError as err:
|
||||
print("Err {}: readonly".format(err))
|
||||
backup_data_flag = False # to avoid trying again till next reset
|
||||
# Turn onboard led on to indicate read-only error
|
||||
led13.value = True
|
||||
|
||||
def rotate_files():
|
||||
|
||||
def rotate_files(last_time):
|
||||
"""Check if files need to rotate
|
||||
=> every new hour
|
||||
=> every new day
|
||||
"""
|
||||
global last_time
|
||||
current_time = clock.datetime
|
||||
|
||||
#If the hour changed : copy current data.csv to hourly directory
|
||||
if current_time[3] != last_time[3] and backup_data:
|
||||
# If the hour changed : copy current data.csv to hourly directory
|
||||
if current_time[3] != last_time[3] and backup_data_flag:
|
||||
print("Time to move hourly data !")
|
||||
os.rename("data/data.csv", "data/hourly/{:02}.csv".format(last_time[3]))
|
||||
|
||||
#If the day changed : copy content of hourly to daily directories
|
||||
if current_time[2] != last_time[2] and backup_data:
|
||||
# If the day changed : copy content of hourly to daily directories
|
||||
if current_time[2] != last_time[2] and backup_data_flag:
|
||||
print("Time to move daily data !")
|
||||
#Create new dir for the date of yesterday
|
||||
# Create new dir for the date of yesterday
|
||||
newdir = "data/daily/{}{:02}{:02}".format(*last_time[0:3])
|
||||
os.mkdir(newdir)
|
||||
#Move each "hourly file" to the new directory
|
||||
# Move each "hourly file" to the new directory
|
||||
for file in os.listdir('data/hourly'):
|
||||
print("Move {} to {}".format(file, newdir))
|
||||
os.rename("data/hourly/{}".format(file), "{}/{}".format(newdir, file))
|
||||
|
||||
#Finally update last_time, each time
|
||||
last_time = current_time
|
||||
# Finally update last_time, each time
|
||||
return current_time
|
||||
|
||||
|
||||
def set_clock_from_GPS(treshold=5.0):
|
||||
def set_clock_from_gps(treshold=5.0):
|
||||
"""Compare internal RTC and date-time from GPS (if enable and fixed) and set
|
||||
the RTC date time to GPS date-time if the difference is more or equal to
|
||||
threshold (in seconds)"""
|
||||
|
||||
if gps_enable and gps.has_fix:
|
||||
#Congreen GPS timestamp into struct_time
|
||||
gps_datetime = time.struct_time((gps.timestamp_utc.tm_year,
|
||||
gps.timestamp_utc.tm_mon,
|
||||
gps.timestamp_utc.tm_mday,
|
||||
gps.timestamp_utc.tm_hour,
|
||||
gps.timestamp_utc.tm_min,
|
||||
gps.timestamp_utc.tm_sec, 0, 0, 0))
|
||||
#Max difference between GPS and internal RTC (in seconds):
|
||||
if gps_enable_flag and gps.has_fix:
|
||||
gps_datetime = time.struct_time((gps.timestamp_utc.tm_year,
|
||||
gps.timestamp_utc.tm_mon,
|
||||
gps.timestamp_utc.tm_mday,
|
||||
gps.timestamp_utc.tm_hour,
|
||||
gps.timestamp_utc.tm_min,
|
||||
gps.timestamp_utc.tm_sec, 0, 0, 0))
|
||||
# Max difference between GPS and internal RTC (in seconds):
|
||||
if abs(time.mktime(gps_datetime) - time.mktime(clock.datetime)) >= treshold:
|
||||
# print("Clock difference with GPS!")
|
||||
# print("Previous date/time : " + datetime_format.format(*clock.datetime[0:6]))
|
||||
clock.datetime = gps_datetime #Trust GPS if there is a bias
|
||||
# print("Previous date/time : " + TIME_FORMAT.format(*clock.datetime[0:6]))
|
||||
clock.datetime = gps_datetime # Trust GPS if there is a bias
|
||||
print("Clocks synced !")
|
||||
|
||||
|
||||
def measure_vbat(samples=10, timestep=0.01):
|
||||
"""Measure Vbattery as the mean of n samples with timestep second between
|
||||
each measurement"""
|
||||
@ -301,22 +311,23 @@ def measure_vbat(samples=10, timestep=0.01):
|
||||
# 2 : voltage is divided by 2
|
||||
# 3.3 : Vref = 3.3V
|
||||
# 65536 : 16bit ADC
|
||||
v = 0
|
||||
val = 0
|
||||
for i in range(samples):
|
||||
v = v + vbat.value
|
||||
val = val + vbat.value
|
||||
time.sleep(timestep)
|
||||
return v/samples*0.000100708
|
||||
return val/samples*0.000100708
|
||||
|
||||
#########
|
||||
# Setup #
|
||||
#########
|
||||
|
||||
gc.collect()
|
||||
#micropython.mem_info()
|
||||
|
||||
#Enable RTC of the feather M0 board
|
||||
gc.collect()
|
||||
# micropython.mem_info()
|
||||
|
||||
# Enable RTC of the feather M0 board
|
||||
clock = rtc.RTC()
|
||||
#clock.datetime = time.struct_time((2018, 7, 29, 15, 31, 30, 0, 0, 0))
|
||||
# clock.datetime = time.struct_time((2018, 7, 29, 15, 31, 30, 0, 0, 0))
|
||||
|
||||
# BME280 sensors (I2C)
|
||||
i2c = I2C(board.SCL, board.SDA)
|
||||
@ -328,18 +339,18 @@ bme280 = Adafruit_BME280_I2C(i2c, address=0x76)
|
||||
# Battery voltage
|
||||
vbat = AnalogIn(board.VOLTAGE_MONITOR)
|
||||
|
||||
#Set the pin to control the power to GPS module
|
||||
# Set the pin to control the power to GPS module
|
||||
gps_en_pin = DigitalInOut(board.A5)
|
||||
gps_en_pin.direction = Direction.OUTPUT
|
||||
|
||||
#Set the pin N°13 to use the onboard LED as read-only/read-write indicator
|
||||
# Set the pin N°13 to use the onboard LED as read-only/read-write indicator
|
||||
led13 = DigitalInOut(board.D13)
|
||||
led13.direction = Direction.OUTPUT
|
||||
led13.value = False
|
||||
|
||||
# Set GPS module on FeatherWing board
|
||||
gps_en_pin.value = not gps_enable #Set enable pin high to disable GPS module
|
||||
if gps_enable:
|
||||
gps_en_pin.value = not gps_enable_flag # Set enable pin high to disable GPS module
|
||||
if gps_enable_flag:
|
||||
gps_uart = UART(board.TX, board.RX,
|
||||
baudrate=9600, timeout=3000)
|
||||
gps = GPS(gps_uart)
|
||||
@ -348,57 +359,57 @@ if gps_enable:
|
||||
gps.send_command('PMTK220,1000') # 1000 ms refresh rate
|
||||
|
||||
# Second UART to communicate with raspberry pi (throught GPIO)
|
||||
if send_json_data:
|
||||
if send_json_flag:
|
||||
rpi_uart = UART(board.A2, board.A3,
|
||||
baudrate=115200, timeout=2000)
|
||||
|
||||
# Set onboard Neopixel : used as atmo data output
|
||||
# brightness is fixed to 1 to spare some memory. Use neopixel_max_value instead
|
||||
if data_to_neopixel:
|
||||
# brightness is fixed to 1 to spare some memory. Use NEOPIXEL_MAX_VALUE instead
|
||||
if neopixel_flag:
|
||||
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
|
||||
else:
|
||||
#if neopixel is disable : turn off the LED
|
||||
# if neopixel is disable : turn off the LED
|
||||
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
|
||||
pixel[0] = (0,0,0)
|
||||
pixel[0] = (0, 0, 0)
|
||||
pixel = None
|
||||
|
||||
#Finally check if data directories exist
|
||||
# Finally check if data directories exist
|
||||
check_data_dir()
|
||||
|
||||
#############
|
||||
# Main loop #
|
||||
#############
|
||||
|
||||
#Create the Data object
|
||||
data = Data()
|
||||
|
||||
#Init timer
|
||||
last_update = last_sent_packet = last_written_data = time.monotonic()
|
||||
last_time = clock.datetime
|
||||
# Init timers
|
||||
last_update = last_sent_packet = last_written_data = time.monotonic()
|
||||
last_rotation = clock.datetime
|
||||
|
||||
while True:
|
||||
|
||||
if gps_enable:
|
||||
if gps_enable_flag:
|
||||
gps.update()
|
||||
|
||||
current = time.monotonic()
|
||||
if current - last_update >= update_interval:
|
||||
if current - last_update >= UPDATE_INTERVAL:
|
||||
last_update = current
|
||||
set_clock_from_GPS()
|
||||
set_clock_from_gps()
|
||||
data.update()
|
||||
if print_data:
|
||||
if print_data_flag:
|
||||
data.show()
|
||||
if data_to_neopixel:
|
||||
if neopixel_flag:
|
||||
pixel[0] = data.rgb
|
||||
|
||||
if send_json_data and current - last_sent_packet >= send_interval :
|
||||
if send_json_flag and current - last_sent_packet >= SEND_INTERVAL:
|
||||
last_sent_packet = current
|
||||
rpi_uart.write(data.json + '\n')
|
||||
print(data.json + '\n')
|
||||
|
||||
if backup_data and current - last_written_data >= write_interval :
|
||||
if backup_data_flag and current - last_written_data >= WRITE_INTERVAL:
|
||||
last_written_data = current
|
||||
rotate_files() #First check if files need to rotate
|
||||
# First check if files need to rotate
|
||||
last_rotation = rotate_files(last_rotation)
|
||||
print("Backup data...")
|
||||
data.write_on_flash()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user