Updated directory arch
This commit is contained in:
17
circuitpython/code/boot.py
Normal file
17
circuitpython/code/boot.py
Normal file
@ -0,0 +1,17 @@
|
||||
# Selectively setting readonly to False on boot
|
||||
|
||||
import board
|
||||
import digitalio
|
||||
import storage
|
||||
|
||||
switch = digitalio.DigitalInOut(board.D5)
|
||||
switch.direction = digitalio.Direction.INPUT
|
||||
switch.pull = digitalio.Pull.UP
|
||||
led = digitalio.DigitalInOut(board.D13)
|
||||
led.direction = digitalio.Direction.OUTPUT
|
||||
|
||||
# If the D0 is connected to ground with a wire
|
||||
# CircuitPython can write to the drive
|
||||
storage.remount("/", switch.value)
|
||||
print("Readonly : {}".format(switch.value))
|
||||
led.value = switch.value
|
279
circuitpython/code/main.py
Normal file
279
circuitpython/code/main.py
Normal file
@ -0,0 +1,279 @@
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
"""
|
||||
##########################
|
||||
# Weather and GPS logger #
|
||||
##########################
|
||||
Author : Pierrick Couturier
|
||||
|
||||
Use with:
|
||||
* Adafruit Feather M0 Express (CircuitPython firmware 3.0.0)
|
||||
* Adafruit Ultimate GPS FeatherWing
|
||||
* Bosch BME280 sensor (air temperature, humidity, atmospheric pressure) on I2C
|
||||
|
||||
TODO for v1 :
|
||||
* write data on flash drive (work-in-progress)
|
||||
* send data through UART (one more !)
|
||||
"""
|
||||
__version__ = 0.1
|
||||
__repo__ = "https://framagit.org/arofarn/Cameteo"
|
||||
|
||||
##########
|
||||
# config #
|
||||
##########
|
||||
print_data = True
|
||||
backup_data = True
|
||||
data_to_neopixel = True
|
||||
gps_enable = True
|
||||
update_interval = const(10) # in seconds
|
||||
send_json_data = True
|
||||
datetime_format = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}"
|
||||
neopixel_max_value =const(70) #max value instead of brightness to spare some mem
|
||||
|
||||
#######################
|
||||
|
||||
import board, microcontroller
|
||||
import gc, os
|
||||
# import micropython
|
||||
import time, rtc
|
||||
from busio import I2C, UART
|
||||
from analogio import AnalogIn
|
||||
|
||||
from adafruit_bme280 import Adafruit_BME280_I2C
|
||||
from adafruit_gps import GPS
|
||||
import neopixel
|
||||
|
||||
###########
|
||||
# Classes #
|
||||
###########
|
||||
|
||||
class Data:
|
||||
"""Class for handling data"""
|
||||
def __init__(self):
|
||||
self.data = {'SYS': {},
|
||||
'BME280': {},
|
||||
'GPS': {}
|
||||
}
|
||||
|
||||
def update(self):
|
||||
"""Read the data from various sensors and update the data dict variable"""
|
||||
#Data from Feather board
|
||||
self.data['SYS'] = {'time': {'val': datetime_format.format(*clock.datetime[0:6]),
|
||||
'unit': '' },
|
||||
'v_bat': {'val': measure_vbat(),
|
||||
'unit': 'V' },
|
||||
'CPU_temp': {'val': microcontroller.cpu.temperature,
|
||||
'unit': '°C' }}
|
||||
|
||||
#Data from BME280
|
||||
self.data['BME280'] = {'temp': { 'val': round(bme280.temperature, 1), 'unit': '°C' },
|
||||
'hum': { 'val': int(bme280.humidity), 'unit': '%' },
|
||||
'press': { 'val': round(bme280.pressure, 2), 'unit': 'hPa' }}
|
||||
if gps_enable:
|
||||
if gps.has_fix:
|
||||
self.data['GPS'] = {'timestamp': {'val': datetime_format.format(gps.timestamp_utc.tm_year,
|
||||
gps.timestamp_utc.tm_mon,
|
||||
gps.timestamp_utc.tm_mday,
|
||||
gps.timestamp_utc.tm_hour,
|
||||
gps.timestamp_utc.tm_min,
|
||||
gps.timestamp_utc.tm_sec),
|
||||
'unit': ''},
|
||||
'lat': {'val': gps.latitude, 'unit': 'deg'},
|
||||
'lon': {'val': gps.longitude, 'unit': 'deg'},
|
||||
'alt': {'val': gps.altitude_m, 'unit': 'm'},
|
||||
'qual': {'val': gps.fix_quality, 'unit': ''}}
|
||||
else:
|
||||
self.data['GPS'] = {'lat': {'val': None, 'unit': 'deg'},
|
||||
'lon': {'val': None, 'unit': 'deg'},
|
||||
'alt': {'val': None, 'unit': 'm'}}
|
||||
else:
|
||||
self.data['GPS'] = None
|
||||
|
||||
def show(self):
|
||||
"""Serialize data to json-formatted string for visualization on
|
||||
serial console
|
||||
"""
|
||||
for source in self.data.keys():
|
||||
print(source + ": ")
|
||||
if not self.data[source] == None:
|
||||
for d in self.data[source].items():
|
||||
print("\t{0}: {val} {unit}".format(d[0], **d[1]))
|
||||
|
||||
def json(self):
|
||||
"""Serialize data to json-formatted string"""
|
||||
output = "{"
|
||||
for source in self.data.keys():
|
||||
output = "".join((output, "'", source, "': \n"))
|
||||
if not self.data[source] == None:
|
||||
for d in self.data[source].items():
|
||||
output = "".join((output, "{",
|
||||
"'{}': ".format(d[0]),
|
||||
"{",
|
||||
"'val': {val},'unit': {unit}".format(**d[1]),
|
||||
"}}\n"))
|
||||
output = output + "}, \n"
|
||||
output = output + "}"
|
||||
return output
|
||||
|
||||
def write_on_flash(self):
|
||||
"""Save the current data as csv file on SPI flash"""
|
||||
try:
|
||||
with open("data/data.csv", "a") as csv_file:
|
||||
csv_file.write("{};{};{};{}\n".format(self.data['SYS']['time']['val'],
|
||||
self.data['BME280']['temp']['val'],
|
||||
self.data['BME280']['hum']['val'],
|
||||
self.data['BME280']['press']['val']))
|
||||
except OSError as e:
|
||||
print("Err. {} : R-O FS".format(e))
|
||||
backup_data = False #to avoid trying again till next reset
|
||||
|
||||
|
||||
#############
|
||||
# Functions #
|
||||
#############
|
||||
|
||||
def check_data_dir():
|
||||
"""Check if data directories exists"""
|
||||
if 'data' not in os.listdir():
|
||||
os.mkdir('data')
|
||||
os.mkdir('data/hourly')
|
||||
os.mkdir('data/daily')
|
||||
elif 'hourly' not in os.listdir('data'):
|
||||
os.mkdir('data/hourly')
|
||||
elif 'daily' not in os.listdir('data'):
|
||||
os.mkdir('data/daily')
|
||||
|
||||
def update_neopixel(data):
|
||||
"""Convert atmospheric data from BME280 sensor into NeoPixel color
|
||||
* RED => temperature : max = 35degC, min =10degC (range 25°C)
|
||||
* BLUE => humidity : max= 100%, mini=0%
|
||||
* GREEN => pression : mini=960hPa, maxi = 1030hPa (range 70hPa)
|
||||
"""
|
||||
|
||||
rouge = int((data['BME280']['temp']['val']-10)*neopixel_max_value/25)
|
||||
if rouge > neopixel_max_value:
|
||||
rouge = neopixel_max_value
|
||||
if rouge < 0:
|
||||
rouge = 0
|
||||
|
||||
bleu = int(data['BME280']['hum']['val']*neopixel_max_value/100)
|
||||
|
||||
vert = int((data['BME280']['press']['val']-960)*neopixel_max_value/70)
|
||||
if vert > neopixel_max_value:
|
||||
vert = neopixel_max_value
|
||||
if vert < 0:
|
||||
vert = 0
|
||||
|
||||
if print_data:
|
||||
print("Col:{}".format((rouge, vert, bleu)))
|
||||
|
||||
return (rouge, vert, bleu)
|
||||
|
||||
def set_clock_from_GPS():
|
||||
if gps_enable and gps.has_fix:
|
||||
#Convert GPS timestamp into struct_time
|
||||
gps_datetime = time.struct_time((gps.timestamp_utc.tm_year,
|
||||
gps.timestamp_utc.tm_mon,
|
||||
gps.timestamp_utc.tm_mday,
|
||||
gps.timestamp_utc.tm_hour,
|
||||
gps.timestamp_utc.tm_min,
|
||||
gps.timestamp_utc.tm_sec, 0, 0, 0))
|
||||
#Max difference between GPS and internal RTC (in seconds):
|
||||
if abs(time.mktime(gps_datetime) - time.mktime(clock.datetime)) >= 5.0:
|
||||
print("Clock difference with GPS!")
|
||||
print("Previous date/time : " + datetime_format.format(*clock.datetime[0:6]))
|
||||
clock.datetime = gps_datetime #Trust GPS if there is a bias
|
||||
print("New date/time : " + datetime_format.format(*clock.datetime[0:6]))
|
||||
|
||||
def measure_vbat(samples=10, timestep=0.01):
|
||||
"""Measure Vbattery as the mean of n samples with timestep second between
|
||||
each measurement"""
|
||||
# Note about v_bat calculation :
|
||||
# 0.000100708 = 2*3.3/65536 with
|
||||
# 2 : voltage is divided by 2
|
||||
# 3.3 : Vref = 3.3V
|
||||
# 65536 : 16bit ADC
|
||||
v = 0
|
||||
for i in range(samples):
|
||||
v = v + vbat.value
|
||||
time.sleep(timestep)
|
||||
return v/samples*0.000100708
|
||||
|
||||
#########
|
||||
# Setup #
|
||||
#########
|
||||
|
||||
gc.collect()
|
||||
#micropython.mem_info()
|
||||
|
||||
#Enable RTC of the feather M0 board
|
||||
clock = rtc.RTC()
|
||||
#clock.datetime = time.struct_time((2018, 7, 29, 15, 31, 30, 0, 0, 0))
|
||||
|
||||
# BME280 sensors (I2C)
|
||||
i2c = I2C(board.SCL, board.SDA)
|
||||
# i2c addresses for BME280 breakout :
|
||||
# 0x77 = adafruit breakout board
|
||||
# 0x76 = tiny chinese board
|
||||
bme280 = Adafruit_BME280_I2C(i2c, address=0x76)
|
||||
|
||||
# Battery voltage
|
||||
vbat = AnalogIn(board.D9, )
|
||||
|
||||
# GPS on FeatherWing board
|
||||
if gps_enable:
|
||||
gps_uart = UART(board.TX, board.RX, baudrate=9600, timeout=3000)
|
||||
gps = GPS(gps_uart)
|
||||
# Turn on the basic GGA and RMC info
|
||||
gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0')
|
||||
gps.send_command('PMTK220,1000') # 1000 ms refresh rate
|
||||
|
||||
# Integrated Neopixel
|
||||
if data_to_neopixel:
|
||||
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
|
||||
else:
|
||||
#if neopixel is disable : turn off the LED
|
||||
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
|
||||
pixel[0] = (0,0,0)
|
||||
pixel = None
|
||||
|
||||
check_data_dir()
|
||||
|
||||
#############
|
||||
# Main loop #
|
||||
#############
|
||||
|
||||
data = Data()
|
||||
last_update = time.monotonic()
|
||||
|
||||
while True:
|
||||
|
||||
if gps_enable:
|
||||
gps.update()
|
||||
|
||||
current = time.monotonic()
|
||||
if current - last_update >= update_interval:
|
||||
last_update = current
|
||||
set_clock_from_GPS()
|
||||
data.update()
|
||||
if print_data:
|
||||
data.show()
|
||||
# print(data.json())
|
||||
if backup_data:
|
||||
data.write_on_flash()
|
||||
if data_to_neopixel:
|
||||
pixel[0] = update_neopixel(data.data)
|
||||
gc.collect()
|
||||
# micropython.mem_info(1)
|
||||
# print('Memory free: {} allocated: {}'.format(gc.mem_free(), gc.mem_alloc()))
|
Reference in New Issue
Block a user