Holger Frey
7 years ago
2 changed files with 263 additions and 0 deletions
@ -0,0 +1,263 @@ |
|||||||
|
''' Controlling an Arduino with a time table ''' |
||||||
|
|
||||||
|
import asyncio |
||||||
|
import serial |
||||||
|
import serial.tools.list_ports |
||||||
|
|
||||||
|
from collections import namedtuple |
||||||
|
from datetime import datetime, timedelta |
||||||
|
|
||||||
|
|
||||||
|
# example time table for the TrapControl |
||||||
|
example = ''' |
||||||
|
00:00.0 close |
||||||
|
00:06.0 open |
||||||
|
00:08.0 close |
||||||
|
00:10.0 open |
||||||
|
00:12.0 close |
||||||
|
00:14.0 open |
||||||
|
00:16.0 close |
||||||
|
00:18.0 open |
||||||
|
00:20.0 close |
||||||
|
00:22.0 open |
||||||
|
''' |
||||||
|
|
||||||
|
|
||||||
|
# datetime formats for timetable parsing |
||||||
|
TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f'] |
||||||
|
|
||||||
|
|
||||||
|
# named tuple for a single scheduled command |
||||||
|
ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd') |
||||||
|
|
||||||
|
|
||||||
|
def parse_time_table(timetable, available_commands): |
||||||
|
''' parses a (text) time table into a list of ScheduledCommands |
||||||
|
|
||||||
|
timetable: |
||||||
|
a textual representaion of time and commands like |
||||||
|
|
||||||
|
00:12.0 close |
||||||
|
00:14.0 open |
||||||
|
00:16.0 close |
||||||
|
00:18.0 open |
||||||
|
|
||||||
|
available_commands: |
||||||
|
a dictionary containing the available human readable commands as |
||||||
|
keys and the commands to send as values |
||||||
|
|
||||||
|
{'open': 0, 'close': 1} |
||||||
|
|
||||||
|
returns a list consisting of ScheduledCommands |
||||||
|
''' |
||||||
|
# split the time table text by line break |
||||||
|
raw_lines = timetable.split('\n') |
||||||
|
# remove surrounding white space |
||||||
|
content_lines = (line.strip() for line in raw_lines) |
||||||
|
# remove empty lines |
||||||
|
lines = (line for line in content_lines if line) |
||||||
|
|
||||||
|
timed_commands =[] |
||||||
|
for line in lines: |
||||||
|
# split the lines into time and command |
||||||
|
try: |
||||||
|
raw_time, raw_command = line.split(None, 1) |
||||||
|
except ValueError: |
||||||
|
msg = "error in line '{}'".format(line) |
||||||
|
raise ValueError(msg) |
||||||
|
# parse time and commands |
||||||
|
delta = parse_time(raw_time) |
||||||
|
cmd = parse_command(raw_command, available_commands) |
||||||
|
# add a ScheduledCommand to the resulting list |
||||||
|
tc = ScheduledCommand(delta, raw_command, cmd) |
||||||
|
timed_commands.append(tc) |
||||||
|
return timed_commands |
||||||
|
|
||||||
|
|
||||||
|
def parse_time(time_str): |
||||||
|
''' parses a string to extract the time information |
||||||
|
|
||||||
|
time_str: |
||||||
|
string representaion of a time like '00:03.0' |
||||||
|
|
||||||
|
returns a timedelta object, e.g. timedelta(seconds=3) |
||||||
|
''' |
||||||
|
# try the available time formats |
||||||
|
for format_str in TIMETABLE_FORMATS: |
||||||
|
try: |
||||||
|
time_obj = datetime.strptime(time_str, format_str) |
||||||
|
break |
||||||
|
except ValueError: |
||||||
|
pass |
||||||
|
else: |
||||||
|
msg = "time data '{}' does not match any format".format(time_str) |
||||||
|
raise ValueError(msg) |
||||||
|
return timedelta( |
||||||
|
hours=time_obj.hour, |
||||||
|
minutes=time_obj.minute, |
||||||
|
seconds=time_obj.second, |
||||||
|
microseconds=time_obj.microsecond |
||||||
|
) |
||||||
|
|
||||||
|
|
||||||
|
def parse_command(command, available_commands): |
||||||
|
''' parses a string and checks if it is a valid command |
||||||
|
|
||||||
|
raw_command: |
||||||
|
human readable representation of the command |
||||||
|
|
||||||
|
available_commands: |
||||||
|
a dictionary containing the available human readable commands as |
||||||
|
keys and the commands to send as values |
||||||
|
|
||||||
|
{'open': 0, 'close': 1} |
||||||
|
|
||||||
|
returns the command to send over serial |
||||||
|
''' |
||||||
|
try: |
||||||
|
cmd = available_commands[command.lower()] |
||||||
|
except KeyError: |
||||||
|
msg = "unknown command '{}'".format(command) |
||||||
|
raise ValueError(msg) |
||||||
|
if isinstance(cmd, str): |
||||||
|
cmd = cmd.encode('utf-8') |
||||||
|
return cmd |
||||||
|
|
||||||
|
|
||||||
|
def find_arduino_port(): |
||||||
|
''' returns the port where an arduino is connected ''' |
||||||
|
# some regular expressions matching arduino and genuino |
||||||
|
tmp = serial.tools.list_ports.grep('.+uino.+') |
||||||
|
port_list = list(tmp) |
||||||
|
# rais an error, if no or more than one arduinos are found |
||||||
|
if len(port_list) == 0: |
||||||
|
raise IOError('no arduino port found') |
||||||
|
elif len(port_list) > 1: |
||||||
|
raise IOError('{} arduino ports found'.format(len(port_list))) |
||||||
|
# return only the device name |
||||||
|
port_info = port_list[0] |
||||||
|
return port_info.device |
||||||
|
|
||||||
|
|
||||||
|
def send_command(scheduled_command, serial_connection): |
||||||
|
''' sends a command over a serial connection ''' |
||||||
|
serial_connection.write(scheduled_command.cmd) |
||||||
|
print(str(scheduled_command.delta), scheduled_command.command) |
||||||
|
|
||||||
|
|
||||||
|
def run(scheduled_commands, serial_connection): |
||||||
|
''' run scheduled commands |
||||||
|
|
||||||
|
scheduled_commands: |
||||||
|
list of ScheduledCommands |
||||||
|
|
||||||
|
serial_connection: |
||||||
|
serial connection object to send commands |
||||||
|
''' |
||||||
|
# use a new event loop every time |
||||||
|
# if only one eventloop is used, the interactive use of run() |
||||||
|
# does not work since the loop is closed in the end |
||||||
|
loop = asyncio.new_event_loop() |
||||||
|
for scheduled_command in scheduled_commands: |
||||||
|
loop.call_later( |
||||||
|
scheduled_command.delta.total_seconds(), |
||||||
|
send_command, |
||||||
|
scheduled_command, |
||||||
|
serial_connection |
||||||
|
) |
||||||
|
# loop should stop one second after last command in list |
||||||
|
timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) |
||||||
|
loop.call_later(timed_loop_stop.total_seconds(), loop.stop) |
||||||
|
try: |
||||||
|
loop.run_forever() |
||||||
|
except KeyboardInterrupt: |
||||||
|
loop.stop() |
||||||
|
loop.close() |
||||||
|
|
||||||
|
|
||||||
|
class TimedCommands(object): |
||||||
|
''' lightweight encapsulation of the functions in the module ''' |
||||||
|
|
||||||
|
def __init__(self, |
||||||
|
timetable, |
||||||
|
available_commands, |
||||||
|
port=None, |
||||||
|
baudrate=9600, |
||||||
|
**serial_kwargs): |
||||||
|
''' parses a time table and establishes a serial connection |
||||||
|
|
||||||
|
timetable: |
||||||
|
a textual representaion of time and commands like |
||||||
|
|
||||||
|
00:12.0 close |
||||||
|
00:14.0 open |
||||||
|
00:16.0 close |
||||||
|
00:18.0 open |
||||||
|
|
||||||
|
available_commands: |
||||||
|
a dictionary containing the available human readable commands |
||||||
|
as keys and the commands to send as values. |
||||||
|
If an other iterable with strings is provided, the lowercase |
||||||
|
version of the strings will be used as human readable commands |
||||||
|
and the first letter of the strings as commands to send over the |
||||||
|
wire. |
||||||
|
|
||||||
|
port [None] |
||||||
|
port where the Arduino is connected |
||||||
|
if port is None, the ports are scanned for an Arduino |
||||||
|
|
||||||
|
baudrate [9600] |
||||||
|
speed of the serial connection |
||||||
|
|
||||||
|
serial_kwargs |
||||||
|
dictionary with further arguments for the serial connection |
||||||
|
''' |
||||||
|
# make sure the available commands are a dictionary |
||||||
|
cmd_dict = self._ensure_command_dict(available_commands) |
||||||
|
# parse the time table into something suitable |
||||||
|
self.commands = parse_time_table(timetable, cmd_dict) |
||||||
|
|
||||||
|
# establish the serial connection |
||||||
|
port = port or find_arduino_port() |
||||||
|
self.serial = serial.Serial(port, baudrate, **serial_kwargs) |
||||||
|
|
||||||
|
|
||||||
|
def _ensure_command_dict(self, iterable): |
||||||
|
''' ensures, that the available commands are a dictionary |
||||||
|
|
||||||
|
iterable: |
||||||
|
if the itarable is a dictionary, the keys will be |
||||||
|
transformed to lowercase |
||||||
|
if it is not a dictionary, the lowercase version of the |
||||||
|
items is used as a human readable command and the first |
||||||
|
character of this command will be sent over the wire |
||||||
|
''' |
||||||
|
|
||||||
|
try: |
||||||
|
if isinstance(iterable, dict): |
||||||
|
pairs = list(iterable.items()) |
||||||
|
keys = [key.lower() for key, value in pairs] |
||||||
|
values = [value for key, value in pairs] |
||||||
|
else: |
||||||
|
tmp = (str(item) for item in iterable) |
||||||
|
keys = [item.lower() for item in tmp] |
||||||
|
values = [item[0] for item in keys] |
||||||
|
pairs = zip(keys, values) |
||||||
|
return dict(pairs) |
||||||
|
except: |
||||||
|
msg = 'available commands should be a list or dict of strings' |
||||||
|
raise TypeError(msg) |
||||||
|
|
||||||
|
|
||||||
|
def run(self): |
||||||
|
''' run the scheduled commands ''' |
||||||
|
run(self.commands, self.serial) |
||||||
|
|
||||||
|
|
||||||
|
class TrapControl(TimedCommands): |
||||||
|
''' A simple, stripped down version for the magnetic plug trap ''' |
||||||
|
|
||||||
|
def __init__(self, timetable): |
||||||
|
super().__init__(timetable, ['open', 'close']) |
||||||
|
|
||||||
|
|
Loading…
Reference in new issue