# This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . """Cameteo module Data classes for GPS, BME280 sensors and some system stuff Function to handle data files rotation and set internal RTC date from other (more reliable) sources. """ import os import time import rtc import microcontroller import board from busio import I2C, UART from analogio import AnalogIn from digitalio import DigitalInOut, Direction from adafruit_bme280 import Adafruit_BME280_I2C from adafruit_gps import GPS TIME_FORMAT = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format # Set the pin N°13 to use the onboard LED as read-only/read-write indicator # (RED = read-only, no data recorded) LED13 = DigitalInOut(board.D13) LED13.direction = Direction.OUTPUT LED13.value = False ########### # Classes # ########### class Data: """Class for handling data""" def __init__(self, name, update_interval=10, write_interval=300, send_interval=60, debug=False): self.name = name self.data = {} self.debug = debug self.update_interval = update_interval self.write_interval = write_interval self.send_interval = send_interval self._last_update = 0 self._last_backup = 0 self._last_send = 0 self._last_file_rotation = time.localtime() # Check if data directories exists if we need to use them if self.write_interval >= 0: try: if 'data' not in os.listdir(): os.mkdir('data') os.mkdir('data/hourly') os.mkdir('data/daily') elif 'hourly' not in os.listdir('data'): os.mkdir('data/hourly') elif 'daily' not in os.listdir('data'): os.mkdir('data/daily') except OSError as err: print("Err {}: readonly".format(err)) self.write_interval = -1 # to avoid trying again till next reset # Turn onboard led on to indicate read-only error LED13.value = True def __str__(self): """Serialize data for visualization on serial console""" output = self.name + ":\n" for item in self.data.items(): output += "\t{0}: {1}\n".format(*item) return output def __repr__(self): return self.json def update(self, current, verbose=True): """Read the data from sensors and update the data dict variable""" if self.debug: print("Update {} : Current : {} | Last : {} | Interval : {}".format( self.name, current, self._last_update, self.update_interval)) if current - self._last_update >= self.update_interval: self._last_update = current self._update() if verbose: print(self) def _update(self): """Here comes specific update code""" pass @property def json(self): """Serialized data to compact json-formatted string""" output = '{"' + self.name + '":{' first_data = True for name, value in self.data.items(): if first_data: comma = "" first_data = False else: comma = "," output = '{}{}"{}":"{}"'.format(output, comma, name, value) output = output + '}}' return output def csv(self, header=False): """Serialized data values (or keys if header=True) as CSV line""" if header: return ";".join([str(x) for x in self.data.keys()]) return ";".join([str(x) for x in self.data.values()]) def write_on_flash(self, current, verbose=True): """Save the current data as csv file on SPI flash""" if self.write_interval >= 0: if self.debug: print("Write {} : Current : {} | Last : {} | Interval : {}".format( self.name, current, self._last_backup, self.write_interval)) if current - self._last_backup >= self.write_interval: self._last_backup = current #First check if a rotation is needed self._file_rotation() file_name = "{}.csv".format(self.name) file_path = "data" try: #Check if the file exists. If not, creates it with CSV header if file_name not in os.listdir(file_path): with open("/".join((file_path, file_name)), "w") as csv_file: csv_file.write(self.csv(header=True)) csv_file.write("\n") if verbose: print("File created : {}".format(file_name)) with open("/".join((file_path, file_name)), "a") as csv_file: csv_file.write(self.csv()) csv_file.write("\n") if verbose: print("Written data : \n{}".format(self.csv())) except OSError as err: print("Err {}: readonly".format(err)) # to avoid trying again till next reset self.write_interval = -1 # Turn onboard led on to indicate read-only filesystem LED13.value = True def send_json(self, current, uart, verbose=True): """Send JSON string over UART""" if self.debug: print("Send {} : Current : {} | Last : {} | Interval : {}".format( self.name, current, self._last_send, self.send_interval)) if current - self._last_send >= self.send_interval: self._last_send = current uart.write(self.json) uart.write("\n") if verbose: print(self.json) #print("\n") def _file_rotation(self): """Check if files need to rotate => every new hour => every new day """ current_time = time.localtime() if self.debug: print(self._last_file_rotation[3]) print(current_time[3]) # If the hour changed : copy current csv to hourly directory if current_time[3] != self._last_file_rotation[3]: print("Time to move hourly data !") os.rename("data/{}.csv".format(self.name), "data/hourly/{}_{:02}.csv".format( self.name, self._last_file_rotation[3])) # If the day changed : copy content of hourly to daily directories if current_time[2] != self._last_file_rotation[2]: print("Time to move daily data !") # Create new dir for the date of "yesterday" newdir = "{}{:02}{:02}".format(*self._last_file_rotation[0:3]) if newdir not in os.listdir("data/daily/"): os.mkdir("data/daily/" + newdir) # Move each "hourly file" to the new directory for file in os.listdir('data/hourly'): # Move only file which name is beginning with self.name if file.find(self.name) == 0: print("Move {} to {}".format(file, newdir)) os.rename("data/hourly/{}".format(file), "{}/{}".format(newdir, file)) #Finally : remember last rotation check self._last_file_rotation = current_time return class SysData(Data): """Subclass for Feather board data""" def __init__(self, name="SYS", update_interval=10, write_interval=300, send_interval=60, debug=False): Data.__init__(self, name=name, update_interval=update_interval, write_interval=write_interval, send_interval=send_interval, debug=debug) self.data = {'time': "2000/01/01_00:00:00", 'vbat': int(), 'cput': float()} # Battery voltage self.vbat = AnalogIn(board.VOLTAGE_MONITOR) self._rtc = rtc.RTC() def _update(self): """ Update data from Feather board""" self.data['time'] = TIME_FORMAT.format(*self._rtc.datetime[0:6]) self.data['cput'] = round(microcontroller.cpu.temperature, 2) # Note about v_bat calculation : # 0.000100708 = 2*3.3/65536 with # 2 : voltage is divided by 2 # 3.3 : Vref = 3.3V # 65536 : 16bit ADC val = 0 samples = 10 for sample in range(samples): val += self.vbat.value time.sleep(0.01) self.data['vbat'] = round(val/samples*0.000100708, 3) class BME280Data(Data): """Subclass for BME280 sensor""" def __init__(self, name="BME", update_interval=10, write_interval=300, send_interval=60, debug=False): Data.__init__(self, name=name, update_interval=update_interval, write_interval=write_interval, send_interval=send_interval, debug=debug) self.data = {'time' : "2000/01/01_00:00:00", 'temp': float(), 'hum': int(), 'press': float()} # BME280 sensors (I2C) # i2c addresses for BME280 breakout : # 0x77 = adafruit breakout board # 0x76 = tiny cheap chinese board self.bme280 = Adafruit_BME280_I2C(I2C(board.SCL, board.SDA), address=0x76) def _update(self): """Update data from BME280""" self.data['time'] = TIME_FORMAT.format(*time.localtime()[0:6]) self.data['temp'] = round(self.bme280.temperature, 1) self.data['hum'] = int(self.bme280.humidity) self.data['press'] = round(self.bme280.pressure, 2) def rgb(self, neopixel_max=70): """Convert atmospheric data from BME280 sensor into NeoPixel color as a tuple (RED, BLUE, GREEN): * RED => temperature : max = 35degC, min =10degC (range 25°C) * BLUE => humidity : max= 100%, mini=0% * GREEN => pression : mini=960hPa, maxi = 1030hPa (range 70hPa) """ # RED componant calculation from temperature data # 10 is the min temperature, 25 is the range # (10+25=35°C = max temperature) red = int((self.data['temp']-10)*neopixel_max/25) if red > neopixel_max: red = neopixel_max if red < 0: red = 0 # BLUE componant calculation: very simple! By definition relative # humidity cannot be more than 100 or less than 0, physically blue = int(self.data['hum']*neopixel_max/100) # GREEN component calculation : 960 is the minimum pressure and 70 is # the range (960+70 = 1030hPa = max pressure) green = int((self.data['press']-960)*neopixel_max/70) if green > neopixel_max: green = neopixel_max if green < 0: green = 0 if self.debug: print("Col:{}".format((red, green, blue))) return (red, green, blue) class GPSData(Data): """Sub class for GPS""" def __init__(self, name="GPS", update_interval=0, write_interval=60, send_interval=60, enable=True, enable_pin=DigitalInOut(board.A5), debug=False): Data.__init__(self, name=name, update_interval=update_interval, write_interval=write_interval, send_interval=send_interval, debug=debug) self.data = {'time': "2000/01/01_00:00:00", 'lat': float(), 'lon': float(), 'alt': float(), 'qual': int(), 'age': int()} self._enable = enable self._gps_last_fix = int() self._gps_current_fix = int() # Set the pin to control the power to GPS module self._gps_en_pin = enable_pin self._gps_en_pin.direction = Direction.OUTPUT self._gps = GPS(UART(board.TX, board.RX, baudrate=9600, timeout=3000)) #, debug=self.debug) if self._enable: self.enable() else: self.disable() def _update(self): self._gps.update() if self._enable: self._gps_current_fix = int(time.monotonic()) self.data['time'] = TIME_FORMAT.format( self._gps.timestamp_utc.tm_year, self._gps.timestamp_utc.tm_mon, self._gps.timestamp_utc.tm_mday, self._gps.timestamp_utc.tm_hour, self._gps.timestamp_utc.tm_min, self._gps.timestamp_utc.tm_sec) if self._gps.has_fix: self._gps_last_fix = self._gps_current_fix self.data['lat'] = self._gps.latitude self.data['lon'] = self._gps.longitude self.data['alt'] = self._gps.altitude_m self.data['qual'] = self._gps.fix_quality self.data['age'] = 0 else: self.data['age'] = int(self._gps_current_fix - self._gps_last_fix) else: self.data = {} def enable(self): """Enable GPS module""" # Set GPS module on FeatherWing board if not self._enable: self._enable = True self._gps_en_pin.value = not self._enable # Set enable pin high to disable GPS module time.sleep(0.1) # Turn on the basic GGA and RMC info self._gps.send_command('PMTK314,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0') self._gps.send_command('PMTK220,1000') # 1000 ms refresh rate def disable(self): """Disable GPS module""" if self._enable: self._enable = False self._gps_en_pin.value = not self._enable # Set enable pin high to disable GPS module @property def datetime(self): """Wrapper to feed rtc.set_time_source()""" return self._gps.timestamp_utc ############# # Functions # ############# def set_clock_from_localtime(clock, threshold=2.0, debug=False): """ Compare internal RTC and date-time from time.localtime() and set the RTC date/time to this value if the difference is more or equal to threshold (in seconds). The source of time.localtime() can be set with rtc.set_time_source() """ if debug: print("Clock date/time : " + TIME_FORMAT.format(*clock.datetime[0:6])) print("Local date/time : " + TIME_FORMAT.format(*time.localtime()[0:6])) # Check if time.localtime() is valid and can be converted in seconds try: local_seconds = time.mktime(time.localtime()) except OverflowError as err: print(err) print("Local time source : " + TIME_FORMAT.format(*time.localtime()[0:6])) print("Wait some time to have proper time from time source...") return 1 # Max difference between GPS and internal RTC (in seconds): if abs(local_seconds - time.mktime(clock.datetime)) >= threshold: print("Clock difference with GPS!") print("Previous date/time : " + TIME_FORMAT.format(*clock.datetime[0:6])) clock.datetime = time.localtime() # Trust localtime if there is a bias print("Clocks synced !") return 0