From 67b7fdce689c3c186b519a0bf819b698ec96e038 Mon Sep 17 00:00:00 2001 From: Pierrick C Date: Sat, 25 Aug 2018 11:38:05 +0200 Subject: [PATCH] pylint and PEP 8 checks Only styles changes --- circuitpython/code/main.py | 265 +++++++++++++++++++------------------ 1 file changed, 138 insertions(+), 127 deletions(-) diff --git a/circuitpython/code/main.py b/circuitpython/code/main.py index 6a3932f..dd49c4f 100644 --- a/circuitpython/code/main.py +++ b/circuitpython/code/main.py @@ -15,7 +15,6 @@ ########################## # Weather and GPS logger # ########################## -Author : Pierrick "Arofarn" Couturier Use with: * Adafruit Feather M4 Express (CircuitPython firmware) @@ -43,41 +42,46 @@ TODO for v1 : * send data through UART (work-in-progress) """ __version__ = 0.2 +__author__ = "arofarn" + +####################### +import time +import gc +import os +import rtc +import microcontroller +import board +# import micropython +from busio import I2C, UART +from analogio import AnalogIn +from digitalio import DigitalInOut, Direction +from adafruit_bme280 import Adafruit_BME280_I2C +from adafruit_gps import GPS +import neopixel + ########## # config # ########## -print_data = True # Print data on USB UART / REPL output ? -send_json_data = True # Send data as JSON on second UART ? -backup_data = True # Write data as CSV files on onboard SPI Flash ? -data_to_neopixel = True # Display atmospheric data as color on onboard neopixel ? -gps_enable = True # Use GPS module ? -update_interval = const(10) # Interval between data acquisition (in seconds) -write_interval = const(60) # Interval between data written on flash Memory -send_interval = const(60) # Interval between packet of data sent to Rpi -datetime_format = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format -neopixel_max_value =const(70) #max value instead of brightness to spare some mem - -####################### - -import microcontroller, board -import gc, os -# import micropython -import time, rtc -from busio import I2C, UART -from analogio import AnalogIn -from digitalio import DigitalInOut, Direction - -from adafruit_bme280 import Adafruit_BME280_I2C -from adafruit_gps import GPS -import neopixel +print_data_flag = True # Print data on USB UART / REPL output ? +send_json_flag = True # Send data as JSON on second UART ? +backup_data_flag = True # Write data as CSV files on onboard SPI Flash ? +neopixel_flag = True # Display atmospheric data as color on onboard neopixel ? +gps_enable_flag = True # Use GPS module ? +UPDATE_INTERVAL = 10 # Interval between data acquisition (in seconds) +WRITE_INTERVAL = 60 # Interval between data written on flash Memory +SEND_INTERVAL = 60 # Interval between packet of data sent to Rpi +TIME_FORMAT = "{:04}/{:02}/{:02}_{:02}:{:02}:{:02}" # Date/time format +NEOPIXEL_MAX_VALUE = 70 # max value instead of brightness to spare some mem ########### # Classes # ########### + class Data: """Class for handling data""" + def __init__(self): self.data = {'SYS': {'time': "2000/01/01_00:00:00", 'vbat': int(), @@ -90,33 +94,32 @@ class Data: 'lon': float(), 'alt': float(), 'qual': int(), - 'age': int() } - } + 'age': int()}} self._gps_last_fix = int() self._gps_current_fix = int() def update(self): """Read the data from various sensors and update the data dict variable""" - #Data from Feather board - self.data['SYS']['time'] = datetime_format.format(*clock.datetime[0:6]) + # Data from Feather board + self.data['SYS']['time'] = TIME_FORMAT.format(*clock.datetime[0:6]) self.data['SYS']['vbat'] = round(measure_vbat(), 3) self.data['SYS']['cput'] = round(microcontroller.cpu.temperature, 2) - #Data from BME280 + # Data from BME280 self.data['BME']['temp'] = round(bme280.temperature, 1) self.data['BME']['hum'] = int(bme280.humidity) self.data['BME']['press'] = round(bme280.pressure, 2) - if gps_enable: + if gps_enable_flag: self._gps_current_fix = int(time.monotonic()) if gps.has_fix: self._gps_last_fix = self._gps_current_fix - self.data['GPS']['time'] = datetime_format.format(gps.timestamp_utc.tm_year, - gps.timestamp_utc.tm_mon, - gps.timestamp_utc.tm_mday, - gps.timestamp_utc.tm_hour, - gps.timestamp_utc.tm_min, - gps.timestamp_utc.tm_sec) + self.data['GPS']['time'] = TIME_FORMAT.format(gps.timestamp_utc.tm_year, + gps.timestamp_utc.tm_mon, + gps.timestamp_utc.tm_mday, + gps.timestamp_utc.tm_hour, + gps.timestamp_utc.tm_min, + gps.timestamp_utc.tm_sec) self.data['GPS']['lat'] = gps.latitude self.data['GPS']['lon'] = gps.longitude self.data['GPS']['alt'] = gps.altitude_m @@ -128,12 +131,12 @@ class Data: self.data['GPS'] = None def show(self): - """Serialize data for visualization on serial console""" - for source in self.data.keys(): - print(source + ": ") - if not self.data[source] == None: - for d in self.data[source].items(): - print("\t{0}: {1}".format(d[0], d[1])) + """Serialize data for visualization on serial console""" + for source in self.data.keys(): + print(source + ": ") + if not self.data[source] is None: + for d in self.data[source].items(): + print("\t{0}: {1}".format(d[0], d[1])) @property def json(self): @@ -147,7 +150,7 @@ class Data: else: comma_src = "," output = '{}{}"{}":'.format(output, comma_src, source) - if not self.data[source] == None: + if not self.data[source] is None: output = output + '{' first_data = True for d in self.data[source].items(): @@ -172,55 +175,60 @@ class Data: # RED componant calculation from temperature data # 10 is the min temperature, 25 is the range # (10+25=35°C = max temperature) - red = int((self.data['BME']['temp']-10)*neopixel_max_value/25) - if red > neopixel_max_value: - red = neopixel_max_value + red = int((self.data['BME']['temp']-10)*NEOPIXEL_MAX_VALUE/25) + if red > NEOPIXEL_MAX_VALUE: + red = NEOPIXEL_MAX_VALUE if red < 0: red = 0 # BLUE componant calculation: very simple! By definition relative # humidity cannot be more than 100 or less than 0, physically - blue = int(self.data['BME']['hum']*neopixel_max_value/100) + blue = int(self.data['BME']['hum']*NEOPIXEL_MAX_VALUE/100) # GREEN component calculation : 960 is the minimum pressure and 70 is # the range (960+70 = 1030hPa = max pressure) - green = int((self.data['BME']['press']-960)*neopixel_max_value/70) - if green > neopixel_max_value: - green = neopixel_max_value + green = int((self.data['BME']['press']-960)*NEOPIXEL_MAX_VALUE/70) + if green > NEOPIXEL_MAX_VALUE: + green = NEOPIXEL_MAX_VALUE if green < 0: green = 0 - if print_data: + if print_data_flag: print("Col:{}".format((red, green, blue))) return (red, green, blue) def write_on_flash(self): """Save the current data as csv file on SPI flash""" + global backup_data_flag try: with open("data/data.csv", "a") as csv_file: - if gps_enable: - csv_file.write("{};{};{};{};{};{};{};{};{};{}\n".format(self.data['SYS']['time'], - self.data['BME']['temp'], - self.data['BME']['hum'], - self.data['BME']['press'], - self.data['SYS']['vbat'], - self.data['GPS']['time'], - self.data['GPS']['lon'], - self.data['GPS']['lat'], - self.data['GPS']['alt'], - self.data['GPS']['qual'], - )) + if gps_enable_flag: + csv_file.write("{};{};{};{};{};{};{};{};{};{}\n".format( + self.data['SYS']['time'], + self.data['BME']['temp'], + self.data['BME']['hum'], + self.data['BME']['press'], + self.data['SYS']['vbat'], + self.data['GPS']['time'], + self.data['GPS']['lon'], + self.data['GPS']['lat'], + self.data['GPS']['alt'], + self.data['GPS']['qual'], + )) else: - csv_file.write("{};{};{};{};{};;;;;\n".format(self.data['SYS']['time'], - self.data['BME']['temp'], - self.data['BME']['hum'], - self.data['BME']['press'], - self.data['SYS']['vbat'] - )) - except OSError as e: - print("Err {}: readonly".format(e)) - backup_data = False #to avoid trying again till next reset + csv_file.write("{};{};{};{};{};;;;;\n".format( + self.data['SYS']['time'], + self.data['BME']['temp'], + self.data['BME']['hum'], + self.data['BME']['press'], + self.data['SYS']['vbat'], + )) + + except OSError as err: + print("Err {}: readonly".format(err)) + # to avoid trying again till next reset + backup_data_flag = False # Turn onboard led on to indicate read-only error led13.value = True @@ -228,8 +236,10 @@ class Data: # Functions # ############# + def check_data_dir(): """Check if data directories exists""" + global backup_data_flag try: if 'data' not in os.listdir(): os.mkdir('data') @@ -239,60 +249,60 @@ def check_data_dir(): os.mkdir('data/hourly') elif 'daily' not in os.listdir('data'): os.mkdir('data/daily') - except OSError as e: - print("Err {}: readonly".format(e)) - backup_data = False #to avoid trying again till next reset + except OSError as err: + print("Err {}: readonly".format(err)) + backup_data_flag = False # to avoid trying again till next reset # Turn onboard led on to indicate read-only error led13.value = True -def rotate_files(): + +def rotate_files(last_time): """Check if files need to rotate => every new hour => every new day """ - global last_time current_time = clock.datetime - #If the hour changed : copy current data.csv to hourly directory - if current_time[3] != last_time[3] and backup_data: + # If the hour changed : copy current data.csv to hourly directory + if current_time[3] != last_time[3] and backup_data_flag: print("Time to move hourly data !") os.rename("data/data.csv", "data/hourly/{:02}.csv".format(last_time[3])) - #If the day changed : copy content of hourly to daily directories - if current_time[2] != last_time[2] and backup_data: + # If the day changed : copy content of hourly to daily directories + if current_time[2] != last_time[2] and backup_data_flag: print("Time to move daily data !") - #Create new dir for the date of yesterday + # Create new dir for the date of yesterday newdir = "data/daily/{}{:02}{:02}".format(*last_time[0:3]) os.mkdir(newdir) - #Move each "hourly file" to the new directory + # Move each "hourly file" to the new directory for file in os.listdir('data/hourly'): print("Move {} to {}".format(file, newdir)) os.rename("data/hourly/{}".format(file), "{}/{}".format(newdir, file)) - #Finally update last_time, each time - last_time = current_time + # Finally update last_time, each time + return current_time -def set_clock_from_GPS(treshold=5.0): +def set_clock_from_gps(treshold=5.0): """Compare internal RTC and date-time from GPS (if enable and fixed) and set the RTC date time to GPS date-time if the difference is more or equal to threshold (in seconds)""" - if gps_enable and gps.has_fix: - #Congreen GPS timestamp into struct_time - gps_datetime = time.struct_time((gps.timestamp_utc.tm_year, - gps.timestamp_utc.tm_mon, - gps.timestamp_utc.tm_mday, - gps.timestamp_utc.tm_hour, - gps.timestamp_utc.tm_min, - gps.timestamp_utc.tm_sec, 0, 0, 0)) - #Max difference between GPS and internal RTC (in seconds): + if gps_enable_flag and gps.has_fix: + gps_datetime = time.struct_time((gps.timestamp_utc.tm_year, + gps.timestamp_utc.tm_mon, + gps.timestamp_utc.tm_mday, + gps.timestamp_utc.tm_hour, + gps.timestamp_utc.tm_min, + gps.timestamp_utc.tm_sec, 0, 0, 0)) + # Max difference between GPS and internal RTC (in seconds): if abs(time.mktime(gps_datetime) - time.mktime(clock.datetime)) >= treshold: # print("Clock difference with GPS!") - # print("Previous date/time : " + datetime_format.format(*clock.datetime[0:6])) - clock.datetime = gps_datetime #Trust GPS if there is a bias + # print("Previous date/time : " + TIME_FORMAT.format(*clock.datetime[0:6])) + clock.datetime = gps_datetime # Trust GPS if there is a bias print("Clocks synced !") + def measure_vbat(samples=10, timestep=0.01): """Measure Vbattery as the mean of n samples with timestep second between each measurement""" @@ -301,22 +311,23 @@ def measure_vbat(samples=10, timestep=0.01): # 2 : voltage is divided by 2 # 3.3 : Vref = 3.3V # 65536 : 16bit ADC - v = 0 + val = 0 for i in range(samples): - v = v + vbat.value + val = val + vbat.value time.sleep(timestep) - return v/samples*0.000100708 + return val/samples*0.000100708 ######### # Setup # ######### -gc.collect() -#micropython.mem_info() -#Enable RTC of the feather M0 board +gc.collect() +# micropython.mem_info() + +# Enable RTC of the feather M0 board clock = rtc.RTC() -#clock.datetime = time.struct_time((2018, 7, 29, 15, 31, 30, 0, 0, 0)) +# clock.datetime = time.struct_time((2018, 7, 29, 15, 31, 30, 0, 0, 0)) # BME280 sensors (I2C) i2c = I2C(board.SCL, board.SDA) @@ -328,18 +339,18 @@ bme280 = Adafruit_BME280_I2C(i2c, address=0x76) # Battery voltage vbat = AnalogIn(board.VOLTAGE_MONITOR) -#Set the pin to control the power to GPS module +# Set the pin to control the power to GPS module gps_en_pin = DigitalInOut(board.A5) gps_en_pin.direction = Direction.OUTPUT -#Set the pin N°13 to use the onboard LED as read-only/read-write indicator +# Set the pin N°13 to use the onboard LED as read-only/read-write indicator led13 = DigitalInOut(board.D13) led13.direction = Direction.OUTPUT led13.value = False # Set GPS module on FeatherWing board -gps_en_pin.value = not gps_enable #Set enable pin high to disable GPS module -if gps_enable: +gps_en_pin.value = not gps_enable_flag # Set enable pin high to disable GPS module +if gps_enable_flag: gps_uart = UART(board.TX, board.RX, baudrate=9600, timeout=3000) gps = GPS(gps_uart) @@ -348,57 +359,57 @@ if gps_enable: gps.send_command('PMTK220,1000') # 1000 ms refresh rate # Second UART to communicate with raspberry pi (throught GPIO) -if send_json_data: +if send_json_flag: rpi_uart = UART(board.A2, board.A3, baudrate=115200, timeout=2000) # Set onboard Neopixel : used as atmo data output -# brightness is fixed to 1 to spare some memory. Use neopixel_max_value instead -if data_to_neopixel: +# brightness is fixed to 1 to spare some memory. Use NEOPIXEL_MAX_VALUE instead +if neopixel_flag: pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1) else: - #if neopixel is disable : turn off the LED + # if neopixel is disable : turn off the LED pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1) - pixel[0] = (0,0,0) + pixel[0] = (0, 0, 0) pixel = None -#Finally check if data directories exist +# Finally check if data directories exist check_data_dir() ############# # Main loop # ############# -#Create the Data object data = Data() -#Init timer -last_update = last_sent_packet = last_written_data = time.monotonic() -last_time = clock.datetime +# Init timers +last_update = last_sent_packet = last_written_data = time.monotonic() +last_rotation = clock.datetime while True: - if gps_enable: + if gps_enable_flag: gps.update() current = time.monotonic() - if current - last_update >= update_interval: + if current - last_update >= UPDATE_INTERVAL: last_update = current - set_clock_from_GPS() + set_clock_from_gps() data.update() - if print_data: + if print_data_flag: data.show() - if data_to_neopixel: + if neopixel_flag: pixel[0] = data.rgb - if send_json_data and current - last_sent_packet >= send_interval : + if send_json_flag and current - last_sent_packet >= SEND_INTERVAL: last_sent_packet = current rpi_uart.write(data.json + '\n') print(data.json + '\n') - if backup_data and current - last_written_data >= write_interval : + if backup_data_flag and current - last_written_data >= WRITE_INTERVAL: last_written_data = current - rotate_files() #First check if files need to rotate + # First check if files need to rotate + last_rotation = rotate_files(last_rotation) print("Backup data...") data.write_on_flash()