Module to send commands to an Arduino based on a time table.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

263 lines
8.1 KiB

''' Controlling an Arduino with a time table '''
import asyncio
import serial
import serial.tools.list_ports
from collections import namedtuple
from datetime import datetime, timedelta
# example time table for the TrapControl
example = '''
00:00.0 close
00:06.0 open
00:08.0 close
00:10.0 open
00:12.0 close
00:14.0 open
00:16.0 close
00:18.0 open
00:20.0 close
00:22.0 open
'''
# datetime formats for timetable parsing
TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f']
# named tuple for a single scheduled command
ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd')
def parse_time_table(timetable, available_commands):
''' parses a (text) time table into a list of ScheduledCommands
timetable:
a textual representaion of time and commands like
00:12.0 close
00:14.0 open
00:16.0 close
00:18.0 open
available_commands:
a dictionary containing the available human readable commands as
keys and the commands to send as values
{'open': 0, 'close': 1}
returns a list consisting of ScheduledCommands
'''
# split the time table text by line break
raw_lines = timetable.split('\n')
# remove surrounding white space
content_lines = (line.strip() for line in raw_lines)
# remove empty lines
lines = (line for line in content_lines if line)
timed_commands =[]
for line in lines:
# split the lines into time and command
try:
raw_time, raw_command = line.split(None, 1)
except ValueError:
msg = "error in line '{}'".format(line)
raise ValueError(msg)
# parse time and commands
delta = parse_time(raw_time)
cmd = parse_command(raw_command, available_commands)
# add a ScheduledCommand to the resulting list
tc = ScheduledCommand(delta, raw_command, cmd)
timed_commands.append(tc)
return timed_commands
def parse_time(time_str):
''' parses a string to extract the time information
time_str:
string representaion of a time like '00:03.0'
returns a timedelta object, e.g. timedelta(seconds=3)
'''
# try the available time formats
for format_str in TIMETABLE_FORMATS:
try:
time_obj = datetime.strptime(time_str, format_str)
break
except ValueError:
pass
else:
msg = "time data '{}' does not match any format".format(time_str)
raise ValueError(msg)
return timedelta(
hours=time_obj.hour,
minutes=time_obj.minute,
seconds=time_obj.second,
microseconds=time_obj.microsecond
)
def parse_command(command, available_commands):
''' parses a string and checks if it is a valid command
raw_command:
human readable representation of the command
available_commands:
a dictionary containing the available human readable commands as
keys and the commands to send as values
{'open': 0, 'close': 1}
returns the command to send over serial
'''
try:
cmd = available_commands[command.lower()]
except KeyError:
msg = "unknown command '{}'".format(command)
raise ValueError(msg)
if isinstance(cmd, str):
cmd = cmd.encode('utf-8')
return cmd
def find_arduino_port():
''' returns the port where an arduino is connected '''
# some regular expressions matching arduino and genuino
tmp = serial.tools.list_ports.grep('.+uino.+')
port_list = list(tmp)
# rais an error, if no or more than one arduinos are found
if len(port_list) == 0:
raise IOError('no arduino port found')
elif len(port_list) > 1:
raise IOError('{} arduino ports found'.format(len(port_list)))
# return only the device name
port_info = port_list[0]
return port_info.device
def send_command(scheduled_command, serial_connection):
''' sends a command over a serial connection '''
serial_connection.write(scheduled_command.cmd)
print(str(scheduled_command.delta), scheduled_command.command)
def run(scheduled_commands, serial_connection):
''' run scheduled commands
scheduled_commands:
list of ScheduledCommands
serial_connection:
serial connection object to send commands
'''
# use a new event loop every time
# if only one eventloop is used, the interactive use of run()
# does not work since the loop is closed in the end
loop = asyncio.new_event_loop()
for scheduled_command in scheduled_commands:
loop.call_later(
scheduled_command.delta.total_seconds(),
send_command,
scheduled_command,
serial_connection
)
# loop should stop one second after last command in list
timed_loop_stop = scheduled_command.delta + timedelta(seconds=1)
loop.call_later(timed_loop_stop.total_seconds(), loop.stop)
try:
loop.run_forever()
except KeyboardInterrupt:
loop.stop()
loop.close()
class TimedCommands(object):
''' lightweight encapsulation of the functions in the module '''
def __init__(self,
timetable,
available_commands,
port=None,
baudrate=9600,
**serial_kwargs):
''' parses a time table and establishes a serial connection
timetable:
a textual representaion of time and commands like
00:12.0 close
00:14.0 open
00:16.0 close
00:18.0 open
available_commands:
a dictionary containing the available human readable commands
as keys and the commands to send as values.
If an other iterable with strings is provided, the lowercase
version of the strings will be used as human readable commands
and the first letter of the strings as commands to send over the
wire.
port [None]
port where the Arduino is connected
if port is None, the ports are scanned for an Arduino
baudrate [9600]
speed of the serial connection
serial_kwargs
dictionary with further arguments for the serial connection
'''
# make sure the available commands are a dictionary
cmd_dict = self._ensure_command_dict(available_commands)
# parse the time table into something suitable
self.commands = parse_time_table(timetable, cmd_dict)
# establish the serial connection
port = port or find_arduino_port()
self.serial = serial.Serial(port, baudrate, **serial_kwargs)
def _ensure_command_dict(self, iterable):
''' ensures, that the available commands are a dictionary
iterable:
if the itarable is a dictionary, the keys will be
transformed to lowercase
if it is not a dictionary, the lowercase version of the
items is used as a human readable command and the first
character of this command will be sent over the wire
'''
try:
if isinstance(iterable, dict):
pairs = list(iterable.items())
keys = [key.lower() for key, value in pairs]
values = [value for key, value in pairs]
else:
tmp = (str(item) for item in iterable)
keys = [item.lower() for item in tmp]
values = [item[0] for item in keys]
pairs = zip(keys, values)
return dict(pairs)
except:
msg = 'available commands should be a list or dict of strings'
raise TypeError(msg)
def run(self):
''' run the scheduled commands '''
run(self.commands, self.serial)
class TrapControl(TimedCommands):
''' A simple, stripped down version for the magnetic plug trap '''
def __init__(self, timetable):
super().__init__(timetable, ['open', 'close'])