Module to send commands to an Arduino based on a time table.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

344 lines
11 KiB

''' Controlling an Arduino with a time table '''
import asyncio
import serial
import serial.tools.list_ports
from collections import namedtuple
from datetime import datetime, timedelta
from openpyxl import load_workbook
# example time table for the TrapControl
example = '''
00:00.0 close
00:06.0 open
00:08.0 close
00:10.0 open
00:12.0 close
00:14.0 open
00:16.0 close
00:18.0 open
00:20.0 close
00:22.0 open
'''
# datetime formats for timetable parsing
TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f']
# named tuple for a single scheduled command
ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd')
def parse_time_table(timetable, available_commands=None):
''' parses a (text) time table into a list of ScheduledCommands
timetable:
a textual representaion of time and commands like
00:12.0 close
00:14.0 open
00:16.0 close
00:18.0 open
available_commands:
a dictionary containing the available human readable commands as
keys and the commands to send as values
{'open': 0, 'close': 1}
if None is provided, the check is skipped and the first lower case
letter used
returns a list consisting of ScheduledCommands
'''
# split the time table text by line break
raw_lines = timetable.split('\n')
# remove surrounding white space
content_lines = (line.strip() for line in raw_lines)
# remove empty lines
lines = (line for line in content_lines if line)
timed_commands =[]
for line in lines:
# split the lines into time and command
try:
raw_time, raw_command = line.split(None, 1)
except ValueError:
msg = "error in line '{}'".format(line)
raise ValueError(msg)
# parse time and commands
delta = parse_time(raw_time)
cmd = parse_command(raw_command, available_commands)
# add a ScheduledCommand to the resulting list
tc = ScheduledCommand(delta, raw_command, cmd)
timed_commands.append(tc)
return timed_commands
def parse_excel_file(path, available_commands=None):
''' parses an time table in an excel file into a list of ScheduledCommands
path:
path to excel file
the time must be in the first column of the first sheet, the command in
the second column
available_commands:
a dictionary containing the available human readable commands as
keys and the commands to send as values
{'open': 0, 'close': 1}
if None is provided, the check is skipped and the first lower case
letter used
returns a list consisting of ScheduledCommands
'''
workbook = load_workbook(path, read_only=True)
sheets = workbook.get_sheet_names()
sheet_name = sheets[0]
sheet = workbook[sheet_name]
is_header = True
timed_commands = []
for i, row in enumerate(sheet.rows):
delta = parse_time_cell(row[0])
if delta is None:
# no time in the cell
if is_header:
# this is still a header
continue
else:
# not the header and not a time cell
# premature end of list
break
else:
# a time in the cell, this is not a header any more
is_header = False
raw_command = row[1].value
cmd = parse_command(raw_command, available_commands)
# add a ScheduledCommand to the resulting list
tc = ScheduledCommand(delta, raw_command, cmd)
timed_commands.append(tc)
return timed_commands
def parse_time_cell(excel_cell):
''' parse an excel cell that contains a time value
if no time value is found in the excel cell None will be returned
'''
if excel_cell.is_date:
# the excel value is a datetime object
return time_as_timedelta(excel_cell.value)
if isinstance(excel_cell.value, float):
# sometimes excel stores times as a float value
return timedelta(days=excel_cell.value)
try:
# if it is not a common date fomat, try to parse it
delta = parse_time(excel_cell.value)
return delta
except ValueError:
return None
def parse_time(time_str):
''' parses a string to extract the time information
time_str:
string representaion of a time like '00:03.0'
returns a timedelta object, e.g. timedelta(seconds=3)
'''
# try the available time formats
time_str = str(time_str)
for format_str in TIMETABLE_FORMATS:
try:
time_obj = datetime.strptime(time_str, format_str)
break
except ValueError:
pass
else:
msg = "time data '{}' does not match any format".format(time_str)
raise ValueError(msg)
return time_as_timedelta(time_obj)
def time_as_timedelta(time_object):
''' converts a time object to a timedelta '''
return timedelta(
hours=time_object.hour,
minutes=time_object.minute,
seconds=time_object.second,
microseconds=time_object.microsecond
)
def parse_command(command, available_commands=None):
''' parses a string and checks if it is a valid command
raw_command:
human readable representation of the command
available_commands:
a dictionary containing the available human readable commands as
keys and the commands to send as values
{'open': 0, 'close': 1}
if None is provided, the check is skipped and the first lower case
letter used
returns the command to send over serial
'''
if available_commands is None:
cmd = str(command)[0]
else:
try:
cmd = available_commands[command.lower()]
except KeyError:
msg = "unknown command '{}'".format(command)
raise ValueError(msg)
if isinstance(cmd, str):
cmd = cmd.encode('utf-8')
return cmd
def find_arduino_port():
''' returns the port where an arduino is connected '''
# some regular expressions matching arduino and genuino
tmp = serial.tools.list_ports.grep('.+uino.+')
port_list = list(tmp)
# rais an error, if no or more than one arduinos are found
if len(port_list) == 0:
raise IOError('no arduino port found')
elif len(port_list) > 1:
raise IOError('{} arduino ports found'.format(len(port_list)))
# return only the device name
port_info = port_list[0]
return port_info.device
def send_command(scheduled_command, serial_connection):
''' sends a command over a serial connection '''
serial_connection.write(scheduled_command.cmd)
print(str(scheduled_command.delta), scheduled_command.command)
def run(scheduled_commands, serial_connection):
''' run scheduled commands
scheduled_commands:
list of ScheduledCommands
serial_connection:
serial connection object to send commands
'''
# use a new event loop every time
# if only one eventloop is used, the interactive use of run()
# does not work since the loop is closed in the end
loop = asyncio.new_event_loop()
for scheduled_command in scheduled_commands:
loop.call_later(
scheduled_command.delta.total_seconds(),
send_command,
scheduled_command,
serial_connection
)
# loop should stop one second after last command in list
timed_loop_stop = scheduled_command.delta + timedelta(seconds=1)
loop.call_later(timed_loop_stop.total_seconds(), loop.stop)
try:
loop.run_forever()
except KeyboardInterrupt:
loop.stop()
loop.close()
class TimedCommands(object):
''' lightweight encapsulation of the functions in the module '''
def __init__(self,
timetable,
available_commands,
port=None,
baudrate=9600,
**serial_kwargs):
''' parses a time table and establishes a serial connection
timetable:
a file path to an excel file or
a textual representaion of time and commands like
00:12.0 close
00:14.0 open
00:16.0 close
00:18.0 open
available_commands:
a dictionary containing the available human readable commands
as keys and the commands to send as values.
If an other iterable with strings is provided, the lowercase
version of the strings will be used as human readable commands
and the first letter of the strings as commands to send over the
wire.
port [None]
port where the Arduino is connected
if port is None, the ports are scanned for an Arduino
baudrate [9600]
speed of the serial connection
serial_kwargs
dictionary with further arguments for the serial connection
'''
# make sure the available commands are a dictionary
cmd_dict = self._ensure_command_dict(available_commands)
# parse the time table into something suitable
if '.xls' in timetable:
self.commands = parse_excel_file(timetable, cmd_dict)
else:
self.commands = parse_time_table(timetable, cmd_dict)
# establish the serial connection
port = port or find_arduino_port()
self.serial = serial.Serial(port, baudrate, **serial_kwargs)
def _ensure_command_dict(self, iterable):
''' ensures, that the available commands are a dictionary
iterable:
if the itarable is a dictionary, the keys will be
transformed to lowercase
if it is not a dictionary, the lowercase version of the
items is used as a human readable command and the first
character of this command will be sent over the wire
'''
try:
if isinstance(iterable, dict):
pairs = list(iterable.items())
keys = [key.lower() for key, value in pairs]
values = [value for key, value in pairs]
else:
tmp = (str(item) for item in iterable)
keys = [item.lower() for item in tmp]
values = [item[0] for item in keys]
pairs = zip(keys, values)
return dict(pairs)
except:
msg = 'available commands should be a list or dict of strings'
raise TypeError(msg)
def run(self):
''' run the scheduled commands '''
run(self.commands, self.serial)
def close(self):
''' closes an open serial connection '''
self.serial.close()