# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """ ########################## # Weather and GPS logger # ########################## Use with: * Adafruit Feather M4 Express (CircuitPython firmware) * Adafruit Ultimate GPS FeatherWing * Bosch BME280 sensor (air temperature, humidity, atmospheric pressure) on I2C Pins used: * TX/RX : Ultimate GPS FeatherWing (data) * A5 : Ultimate GPS FeatherWing EN (enabe/disable GPS power) * I2C SDA/SCL : BME280 sensor * D5 : RW / RO jumper * A2, A3 : UART TX & RX to Raspberry pi (115200 baudrate) * D9 : to EN pin on Raspberry Lipo SHIM * D13 : onboard LED (as RW/RO indicator) Back pins diagramm: D05 A2 A3 D09 NC_ GND GND SCL SDA 3V3 Git repository : https://framagit.org/arofarn/Cameteo TODO for v1 : * write data on flash drive (work-in-progress) * send data through UART (work-in-progress) """ __author__ = "arofarn" __version__ = 0.2 ####################### import time import gc import os import rtc import microcontroller import board # import micropython from busio import I2C, UART from analogio import AnalogIn from digitalio import DigitalInOut, Direction from adafruit_bme280 import Adafruit_BME280_I2C from adafruit_gps import GPS import neopixel ########## # config # ########## print_data_flag = True # Print data on USB UART / REPL output ? send_json_flag = True # Send data as JSON on second UART ? backup_data_flag = True # Write data as CSV files on onboard SPI Flash ? neopixel_flag = True # Display atmospheric data as color on onboard neopixel ? gps_enable_flag = True # Use GPS module ? UPDATE_INTERVAL = 10 # Interval between data acquisition (in seconds) WRITE_INTERVAL = 60 # Interval between data written on flash Memory SEND_INTERVAL = 60 # Interval between packet of data sent to Rpi TIME_FORMAT = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format NEOPIXEL_MAX_VALUE = 70 # max value instead of brightness to spare some mem ########### # Classes # ########### class Data: """Class for handling data""" def __init__(self): self.data = {'SYS': {'time': "2000/01/01_00:00:00", 'vbat': int(), 'cput': float()}, 'BME': {'temp': float(), 'hum': int(), 'press': float()}, 'GPS': {'time': "2000/01/01_00:00:00", 'lat': float(), 'lon': float(), 'alt': float(), 'qual': int(), 'age': int()}} self._gps_last_fix = int() self._gps_current_fix = int() def update(self): """Read the data from various sensors and update the data dict variable""" # Data from Feather board self.data['SYS']['time'] = TIME_FORMAT.format(*clock.datetime[0:6]) self.data['SYS']['vbat'] = round(measure_vbat(), 3) self.data['SYS']['cput'] = round(microcontroller.cpu.temperature, 2) # Data from BME280 self.data['BME']['temp'] = round(bme280.temperature, 1) self.data['BME']['hum'] = int(bme280.humidity) self.data['BME']['press'] = round(bme280.pressure, 2) if gps_enable_flag: self._gps_current_fix = int(time.monotonic()) if gps.has_fix: self._gps_last_fix = self._gps_current_fix self.data['GPS']['lat'] = gps.latitude self.data['GPS']['lon'] = gps.longitude self.data['GPS']['alt'] = gps.altitude_m self.data['GPS']['qual'] = gps.fix_quality self.data['GPS']['age'] = 0 else: self.data['GPS']['age'] = int(self._gps_current_fix - self._gps_last_fix) else: self.data['GPS'] = None def show(self): """Serialize data for visualization on serial console""" for source, val in self.data.items(): print(source + ": ") if val is not None: for name, value in val.items(): print("\t{0}: {1}".format(name, value)) @property def json(self): """Serialized data to compact json-formatted string""" output = '{' first_source = True for source, val in self.data.items(): if first_source: first_source = False comma_src = "" else: comma_src = "," output = '{}{}"{}":'.format(output, comma_src, source) if val is not None: output = output + '{' first_data = True for name, value in val.items(): if first_data: comma = "" first_data = False else: comma = "," output = '{}{}"{}":"{}"'.format(output, comma, name, value) output = output + '}' output = output + '}' return output @property def rgb(self): """Convert atmospheric data from BME280 sensor into NeoPixel color as a tuple (RED, BLUE, GREEN): * RED => temperature : max = 35degC, min =10degC (range 25°C) * BLUE => humidity : max= 100%, mini=0% * GREEN => pression : mini=960hPa, maxi = 1030hPa (range 70hPa) """ # RED componant calculation from temperature data # 10 is the min temperature, 25 is the range # (10+25=35°C = max temperature) red = int((self.data['BME']['temp']-10)*NEOPIXEL_MAX_VALUE/25) if red > NEOPIXEL_MAX_VALUE: red = NEOPIXEL_MAX_VALUE if red < 0: red = 0 # BLUE componant calculation: very simple! By definition relative # humidity cannot be more than 100 or less than 0, physically blue = int(self.data['BME']['hum']*NEOPIXEL_MAX_VALUE/100) # GREEN component calculation : 960 is the minimum pressure and 70 is # the range (960+70 = 1030hPa = max pressure) green = int((self.data['BME']['press']-960)*NEOPIXEL_MAX_VALUE/70) if green > NEOPIXEL_MAX_VALUE: green = NEOPIXEL_MAX_VALUE if green < 0: green = 0 if print_data_flag: print("Col:{}".format((red, green, blue))) return (red, green, blue) def write_on_flash(self): """Save the current data as csv file on SPI flash""" global backup_data_flag try: with open("data/data.csv", "a") as csv_file: if gps_enable_flag: csv_file.write("{};{};{};{};{};{};{};{};{};{}\n".format( self.data['SYS']['time'], self.data['BME']['temp'], self.data['BME']['hum'], self.data['BME']['press'], self.data['SYS']['vbat'], self.data['GPS']['time'], self.data['GPS']['lon'], self.data['GPS']['lat'], self.data['GPS']['alt'], self.data['GPS']['qual'], )) else: csv_file.write("{};{};{};{};{};;;;;\n".format( self.data['SYS']['time'], self.data['BME']['temp'], self.data['BME']['hum'], self.data['BME']['press'], self.data['SYS']['vbat'], )) except OSError as err: print("Err {}: readonly".format(err)) # to avoid trying again till next reset backup_data_flag = False # Turn onboard led on to indicate read-only error led13.value = True ############# # Functions # ############# def check_data_dir(): """Check if data directories exists""" global backup_data_flag try: if 'data' not in os.listdir(): os.mkdir('data') os.mkdir('data/hourly') os.mkdir('data/daily') elif 'hourly' not in os.listdir('data'): os.mkdir('data/hourly') elif 'daily' not in os.listdir('data'): os.mkdir('data/daily') except OSError as err: print("Err {}: readonly".format(err)) backup_data_flag = False # to avoid trying again till next reset # Turn onboard led on to indicate read-only error led13.value = True def rotate_files(last_time): """Check if files need to rotate => every new hour => every new day """ current_time = clock.datetime # If the hour changed : copy current data.csv to hourly directory if current_time[3] != last_time[3] and backup_data_flag: print("Time to move hourly data !") os.rename("data/data.csv", "data/hourly/{:02}.csv".format(last_time[3])) # If the day changed : copy content of hourly to daily directories if current_time[2] != last_time[2] and backup_data_flag: print("Time to move daily data !") # Create new dir for the date of yesterday newdir = "data/daily/{}{:02}{:02}".format(*last_time[0:3]) os.mkdir(newdir) # Move each "hourly file" to the new directory for file in os.listdir('data/hourly'): print("Move {} to {}".format(file, newdir)) os.rename("data/hourly/{}".format(file), "{}/{}".format(newdir, file)) # Finally update last_time, each time return current_time def set_clock_from_gps(treshold=5.0): """Compare internal RTC and date-time from GPS (if enable and fixed) and set the RTC date time to GPS date-time if the difference is more or equal to threshold (in seconds)""" if gps_enable_flag and gps.has_fix: gps_datetime = time.struct_time((gps.timestamp_utc.tm_year, gps.timestamp_utc.tm_mon, gps.timestamp_utc.tm_mday, gps.timestamp_utc.tm_hour, gps.timestamp_utc.tm_min, gps.timestamp_utc.tm_sec, 0, 0, 0)) # Max difference between GPS and internal RTC (in seconds): if abs(time.mktime(gps_datetime) - time.mktime(clock.datetime)) >= treshold: # print("Clock difference with GPS!") # print("Previous date/time : " + TIME_FORMAT.format(*clock.datetime[0:6])) clock.datetime = gps_datetime # Trust GPS if there is a bias print("Clocks synced !") def measure_vbat(samples=10, timestep=0.01): """Measure Vbattery as the mean of n samples with timestep second between each measurement""" # Note about v_bat calculation : # 0.000100708 = 2*3.3/65536 with # 2 : voltage is divided by 2 # 3.3 : Vref = 3.3V # 65536 : 16bit ADC val = 0 for sample in range(samples): val = val + vbat.value time.sleep(timestep) return val/samples*0.000100708 ######### # Setup # ######### gc.collect() # micropython.mem_info() # Enable RTC of the feather M0 board clock = rtc.RTC() # clock.datetime = time.struct_time((2018, 7, 29, 15, 31, 30, 0, 0, 0)) # BME280 sensors (I2C) i2c = I2C(board.SCL, board.SDA) # i2c addresses for BME280 breakout : # 0x77 = adafruit breakout board # 0x76 = tiny chinese board bme280 = Adafruit_BME280_I2C(i2c, address=0x76) # Battery voltage vbat = AnalogIn(board.VOLTAGE_MONITOR) # Set the pin to control the power to GPS module gps_en_pin = DigitalInOut(board.A5) gps_en_pin.direction = Direction.OUTPUT # Set the pin N°13 to use the onboard LED as read-only/read-write indicator led13 = DigitalInOut(board.D13) led13.direction = Direction.OUTPUT led13.value = False # Set GPS module on FeatherWing board gps_en_pin.value = not gps_enable_flag # Set enable pin high to disable GPS module if gps_enable_flag: gps_uart = UART(board.TX, board.RX, baudrate=9600, timeout=3000) gps = GPS(gps_uart) # Turn on the basic GGA and RMC info gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') gps.send_command('PMTK220,1000') # 1000 ms refresh rate # Second UART to communicate with raspberry pi (throught GPIO) if send_json_flag: rpi_uart = UART(board.A2, board.A3, baudrate=115200, timeout=2000) # Set onboard Neopixel : used as atmo data output # brightness is fixed to 1 to spare some memory. Use NEOPIXEL_MAX_VALUE instead if neopixel_flag: pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1) else: # if neopixel is disable : turn off the LED pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1) pixel[0] = (0, 0, 0) pixel = None # Finally check if data directories exist check_data_dir() ############# # Main loop # ############# data = Data() # Init timers last_update = last_sent_packet = last_written_data = time.monotonic() last_rotation = clock.datetime while True: if gps_enable_flag: gps.update() current = time.monotonic() if current - last_update >= UPDATE_INTERVAL: last_update = current set_clock_from_gps() data.update() if print_data_flag: data.show() if neopixel_flag: pixel[0] = data.rgb if send_json_flag and current - last_sent_packet >= SEND_INTERVAL: last_sent_packet = current rpi_uart.write(data.json + '\n') print(data.json + '\n') if backup_data_flag and current - last_written_data >= WRITE_INTERVAL: last_written_data = current # First check if files need to rotate last_rotation = rotate_files(last_rotation) print("Backup data...") data.write_on_flash() gc.collect() # micropython.mem_info(1) # print('Memory free: {} allocated: {}'.format(gc.mem_free(), gc.mem_alloc()))