363 lines
12 KiB
Python
363 lines
12 KiB
Python
|
#!/usr/bin/python3
|
||
|
# -*- coding: UTF8 -*-
|
||
|
|
||
|
"""datacam
|
||
|
v0.3
|
||
|
|
||
|
datacam is design to take a picture with the PiCamera and collect environnemental
|
||
|
data from the PiSense HAT, (almost) at the same time.
|
||
|
|
||
|
"""
|
||
|
|
||
|
#################
|
||
|
# Configuration #
|
||
|
#################
|
||
|
|
||
|
#General
|
||
|
version = "v0.3"
|
||
|
#time_lapse = 20 # Time between photos et data captures
|
||
|
verbose = True
|
||
|
|
||
|
# Pictures
|
||
|
ajust_time = 1 #Time waiting for the camera to ajust before taking the photo (in seconds)
|
||
|
x_res = 2592 #X resolution (max = 2592)
|
||
|
y_res = 1944 #Y resolution (max = 1944)
|
||
|
photo_dir = "pictures" #Photo directory
|
||
|
photo_file = "Test_%Y-%m-%d_%H%M%S" # Picture files name (strftime() compatible)
|
||
|
photo_rotation = "180" # Image rotation in degree
|
||
|
#camera_LED = True # Set to False to disable red camera LED during capture, need root privileges
|
||
|
sense_LED_flash = True
|
||
|
|
||
|
# Data
|
||
|
|
||
|
#Data to collect :
|
||
|
# - 'temperature_h' : temperature from humidity sensor
|
||
|
# - 'temperature_p' : temperature from pressure sensor
|
||
|
# - 'temperature_cpu' : temperature from CPU
|
||
|
# - 'pressure' : pressure
|
||
|
# - 'humidity' : relative humidity
|
||
|
data_collection = ['temperature_p',
|
||
|
'pressure',
|
||
|
'temperature_h',
|
||
|
'humidity',
|
||
|
'temperature_cpu',
|
||
|
'blabla',
|
||
|
]
|
||
|
data_display = ['pressure',
|
||
|
'temperature_h',
|
||
|
'humidity',
|
||
|
'blabla',
|
||
|
]
|
||
|
data_dir = 'data'
|
||
|
data_log = ['date', 'value', 'quality']
|
||
|
|
||
|
# Fonts
|
||
|
fonts_dir = '/usr/share/fonts/truetype/freefont/'
|
||
|
font_bold = 'FreeMonoBold.ttf'
|
||
|
font_basic = 'FreeMono.ttf'
|
||
|
font_italic = 'FreeMonoOblique.ttf'
|
||
|
font_bold_italic = 'FreeMonoBoldOblique.ttf'
|
||
|
font_default = font_basic
|
||
|
|
||
|
|
||
|
#################
|
||
|
# Imports #
|
||
|
#################
|
||
|
|
||
|
from os import path, popen
|
||
|
from time import sleep, strftime
|
||
|
import csv
|
||
|
|
||
|
# Picture management with the Pi Cam
|
||
|
from picamera import PiCamera
|
||
|
|
||
|
#Pi Sense HAT module (sensors)
|
||
|
from sense_hat import SenseHat
|
||
|
|
||
|
# Traitement d'images
|
||
|
import PIL
|
||
|
from PIL import ImageFont
|
||
|
from PIL import Image
|
||
|
from PIL import ImageDraw
|
||
|
|
||
|
# For picture metadata (EXIF, IPTC...)
|
||
|
import piexif
|
||
|
|
||
|
#Divers
|
||
|
from pprint import pprint
|
||
|
|
||
|
#########################
|
||
|
# Déclarations globales #
|
||
|
#########################
|
||
|
|
||
|
sense = SenseHat()
|
||
|
data = {}
|
||
|
data_dir = path.join('/home/pi/datacam', version, data_dir)
|
||
|
photo_dir = path.join('/home/pi/datacam', version, photo_dir)
|
||
|
|
||
|
#############
|
||
|
# Fonctions #
|
||
|
#############
|
||
|
|
||
|
#Get one data from one sensors and return the value in a dict object with some metadata
|
||
|
# like time/date, description, unit...
|
||
|
|
||
|
class Raw_Data:
|
||
|
"""Data class.
|
||
|
"""
|
||
|
dtype = ''
|
||
|
date = ''
|
||
|
value = ''
|
||
|
quality = -1 # -1 : default value/data type unknown, 0 : OK, 1 : non-available
|
||
|
metadata = {'desc' : 'Unknow',
|
||
|
'unit' : '',
|
||
|
}
|
||
|
|
||
|
def __init__(self, data_type):
|
||
|
"Initialize new data object of the defined type."
|
||
|
self.dtype = data_type
|
||
|
if self.dtype == 'temperature_p':
|
||
|
self.metadata = {'desc' : 'Air temperature (pressure sensors)',
|
||
|
'unit' : '°C',
|
||
|
'short' : 'Temperature',
|
||
|
'category' : 'Environnement/Temperature',
|
||
|
}
|
||
|
elif self.dtype == 'temperature_h':
|
||
|
self.metadata = {'desc' : 'Air temperature (pressure sensors)',
|
||
|
'unit' : '°C',
|
||
|
'short' : 'Temperature',
|
||
|
'category' : 'Environnement/Temperature',
|
||
|
}
|
||
|
elif self.dtype == 'pressure':
|
||
|
self.metadata = {'desc' : 'Atmospheric Pressure',
|
||
|
'unit' : 'mbar',
|
||
|
'short' : 'Pression',
|
||
|
'category' : 'Environnement/Pressure',
|
||
|
}
|
||
|
elif self.dtype == 'humidity':
|
||
|
self.metadata = {'desc' : 'Relative air humidity',
|
||
|
'unit' : '%',
|
||
|
'short' : 'Humidity',
|
||
|
'category' : 'Environnement/Humidity',
|
||
|
}
|
||
|
elif self.dtype == 'temperature_cpu':
|
||
|
self.metadata = {'desc' : 'CPU temperature',
|
||
|
'unit' : '°C',
|
||
|
'short' : 'Temperature',
|
||
|
'category' : 'System/Temperature',
|
||
|
}
|
||
|
else:
|
||
|
print(data_type + " is unknown.")
|
||
|
|
||
|
def get_raw(self):
|
||
|
"Get raw data from sensors and update information about this data."
|
||
|
|
||
|
self.date = strftime('%Y-%m-%d_%H:%M:%S')
|
||
|
|
||
|
if self.dtype == 'temperature_p':
|
||
|
try:
|
||
|
self.value = float(sense.get_temperature_from_pressure())
|
||
|
self.quality = 0
|
||
|
except:
|
||
|
self.value = None
|
||
|
self.quality = 1
|
||
|
elif self.dtype == 'temperature_h':
|
||
|
try:
|
||
|
self.value = float(sense.get_temperature_from_humidity())
|
||
|
self.quality = 0
|
||
|
except:
|
||
|
self.value = None
|
||
|
self.quality = 1
|
||
|
elif self.dtype == 'pressure':
|
||
|
try:
|
||
|
self.value = float(sense.get_pressure())
|
||
|
self.quality = 0
|
||
|
except:
|
||
|
self.value = None
|
||
|
self.quality = 1
|
||
|
elif self.dtype == 'humidity':
|
||
|
try:
|
||
|
self.value = float(sense.get_humidity())
|
||
|
self.quality = 0
|
||
|
except:
|
||
|
self.value = None
|
||
|
self.quality = 1
|
||
|
elif self.dtype == 'temperature_cpu':
|
||
|
try:
|
||
|
temp = popen('/opt/vc/bin/vcgencmd measure_temp').readline()
|
||
|
self.value = float(temp.replace('temp=','').replace("'C\n",""))
|
||
|
self.quality = 0
|
||
|
except:
|
||
|
self.value = None
|
||
|
self.quality = 1
|
||
|
else:
|
||
|
print(data_type + " is unknown.")
|
||
|
self.metadata.update({'desc' : 'Unknown'})
|
||
|
self.value = None
|
||
|
self.quality = -1
|
||
|
|
||
|
def write_csv(self, file_path):
|
||
|
"Write data in CSV file. Creates CSV file and metadata text file if they do not exist."
|
||
|
|
||
|
if path.isfile(file_path):
|
||
|
with open(file_path, 'a') as csvfile:
|
||
|
datawriter = csv.DictWriter(csvfile, fieldnames=data_log, delimiter=';')
|
||
|
datawriter.writerow({n : v for n, v in vars(data[data_type]).items() if n in data_log })
|
||
|
|
||
|
else:
|
||
|
#Creation of metadata file
|
||
|
if data[data_type]['desc'] != 'Unknown':
|
||
|
with open(path.join(data_dir, data_type + '_desc.txt'), 'w') as metadata_file:
|
||
|
for desc, value in data[data_type].items():
|
||
|
if desc not in data_log:
|
||
|
metadata_file.write(desc + " : " + str(value) + '\n')
|
||
|
#Creation of data file
|
||
|
with open(csvfile_path, 'w') as csvfile:
|
||
|
datawriter = csv.DictWriter(csvfile, fieldnames=data_log, delimiter=';')
|
||
|
datawriter.writeheader()
|
||
|
datawriter.writerow({n : v for n, v in vars(data[data_type]).items() if n in data_log})
|
||
|
|
||
|
def __repr__(self):
|
||
|
"For debug"
|
||
|
if self.metadata['desc'] == 'Unknown':
|
||
|
return '<Data {t:} = {d:} at {dt}>'.format(d=self.metadata['desc'],
|
||
|
t=self.dtype,
|
||
|
dt=self.date,
|
||
|
)
|
||
|
elif type(self.value) == float:
|
||
|
return '<Data {d:} = {v:.1f}{u} ({q}) at {dt}>'.format(d=self.metadata['desc'],
|
||
|
v=self.value,
|
||
|
u=self.metadata['unit'],
|
||
|
q=self.quality,
|
||
|
dt=self.date,
|
||
|
)
|
||
|
elif self.value == None:
|
||
|
return '<Data {d:} = {v} ({q}) at {dt}>'.format(d=self.metadata['desc'],
|
||
|
v=self.value,
|
||
|
q=self.quality,
|
||
|
dt=self.date,
|
||
|
)
|
||
|
else:
|
||
|
return '<Data {d:} = {v} {u} ({q}) at {dt}>'.format(d=self.metadata['desc'],
|
||
|
v=self.value,
|
||
|
u=self.metadata['unit'],
|
||
|
q=self.quality,
|
||
|
dt=self.date,
|
||
|
)
|
||
|
return
|
||
|
|
||
|
def __str__(self):
|
||
|
"For print (to user)"
|
||
|
if self.metadata['desc'] == 'Unknown':
|
||
|
return '{data_type:} = {desc:}'.format(desc=self.metadata['desc'],
|
||
|
data_type=self.dtype,
|
||
|
)
|
||
|
elif type(self.value) == float:
|
||
|
return '{d:} = {v:.1f}{u}'.format(d=self.metadata['desc'],
|
||
|
v=self.value,
|
||
|
u=self.metadata['unit'],
|
||
|
)
|
||
|
elif self.value == None:
|
||
|
return '{d:} = {v}'.format(d=self.metadata['desc'],
|
||
|
v='NA',
|
||
|
)
|
||
|
else:
|
||
|
return '{d:} = {v} {u}'.format(d=self.metadata['desc'],
|
||
|
v=self.value,
|
||
|
u=self.metadata['unit'],
|
||
|
)
|
||
|
|
||
|
def __call__(self):
|
||
|
return self.value
|
||
|
|
||
|
# Take a picture
|
||
|
def get_pict():
|
||
|
with PiCamera() as camera:
|
||
|
camera.resolution = (x_res, y_res)
|
||
|
camera.rotation = photo_rotation
|
||
|
#camera.led = camera_LED # !!! need root privileges !!!
|
||
|
if sense_LED_flash:
|
||
|
sense.show_message(text_string="",
|
||
|
back_colour=[255, 255, 255],
|
||
|
)
|
||
|
camera.start_preview()
|
||
|
sleep(ajust_time)
|
||
|
sense.set_rotation(r=180, redraw=True)
|
||
|
#sense.show_message(text_string=":-)",
|
||
|
#scroll_speed=0.1 ,
|
||
|
#text_colour=[255, 0, 0],
|
||
|
#back_colour=[0, 0, 255])
|
||
|
|
||
|
pict_path = path.join(photo_dir, strftime(photo_file) + '.jpg')
|
||
|
camera.capture(pict_path)
|
||
|
sense.clear()
|
||
|
#camera.led = True # !!! need root privileges !!!
|
||
|
|
||
|
return pict_path
|
||
|
|
||
|
########
|
||
|
# MAIN #
|
||
|
########
|
||
|
|
||
|
# Collect asked data from sensors
|
||
|
for data_type in data_collection:
|
||
|
data[data_type] = Raw_Data(data_type)
|
||
|
|
||
|
data[data_type].get_raw()
|
||
|
|
||
|
if verbose :
|
||
|
print(data[data_type].__repr__())
|
||
|
print(data[data_type]())
|
||
|
|
||
|
#Save data in CSV file
|
||
|
|
||
|
csvfile_path = path.join(data_dir, data_type + '.csv')
|
||
|
|
||
|
data[data_type].write_csv(csvfile_path)
|
||
|
|
||
|
#Take a picture
|
||
|
picture_path = get_pict()
|
||
|
|
||
|
# Write data on picture in new picture file
|
||
|
pict = Image.open(picture_path)
|
||
|
|
||
|
draw = ImageDraw.Draw(pict)
|
||
|
# Text line height (=font size) and line space calculation
|
||
|
line_height = int(y_res / 40)
|
||
|
line_space = int(line_height / 10)
|
||
|
|
||
|
text = ''
|
||
|
l = line_space
|
||
|
|
||
|
# Add a line with data for each data to display
|
||
|
for d in data_display:
|
||
|
text = str(data[d]) #.__str__()
|
||
|
# Select color and font to match data quality
|
||
|
if data[d].quality == 0:
|
||
|
text_color = (0, 255, 0) # Quality OK : text in green
|
||
|
font = font_default
|
||
|
elif data[d].quality < 0:
|
||
|
text_color = (100, 100, 100) # Unknown in grey
|
||
|
font = font_italic
|
||
|
elif data[d].quality == 1:
|
||
|
text_color = (255, 255, 0) # Sensor unavailable : yellow
|
||
|
font = font_italic
|
||
|
else:
|
||
|
text_color = (255, 0, 0) # Other = error in red
|
||
|
font = font_bold
|
||
|
|
||
|
img_font = ImageFont.truetype(path.join(fonts_dir, font), line_height)
|
||
|
draw.text((5,l), text, text_color, img_font)
|
||
|
draw = ImageDraw.Draw(pict)
|
||
|
l = l + line_height + line_space
|
||
|
|
||
|
#Get EXIF metadata and delete embedded thumbnail
|
||
|
pict_exif = piexif.load(pict.info['exif'])
|
||
|
del(pict_exif['thumbnail'])
|
||
|
|
||
|
pict.save(picture_path, 'jpeg', exif=piexif.dump(pict_exif) )
|
||
|
|
||
|
if verbose :
|
||
|
print('Picture = ' + picture_path)
|
||
|
|