diff --git a/arduino_timetable/__init__.py b/arduino_timetable/__init__.py new file mode 100644 index 0000000..53a7d09 --- /dev/null +++ b/arduino_timetable/__init__.py @@ -0,0 +1,263 @@ +''' Controlling an Arduino with a time table ''' + +import asyncio +import serial +import serial.tools.list_ports + +from collections import namedtuple +from datetime import datetime, timedelta + + +# example time table for the TrapControl +example = ''' + 00:00.0 close + 00:06.0 open + 00:08.0 close + 00:10.0 open + 00:12.0 close + 00:14.0 open + 00:16.0 close + 00:18.0 open + 00:20.0 close + 00:22.0 open + ''' + + +# datetime formats for timetable parsing +TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f'] + + +# named tuple for a single scheduled command +ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd') + + +def parse_time_table(timetable, available_commands): + ''' parses a (text) time table into a list of ScheduledCommands + + timetable: + a textual representaion of time and commands like + + 00:12.0 close + 00:14.0 open + 00:16.0 close + 00:18.0 open + + available_commands: + a dictionary containing the available human readable commands as + keys and the commands to send as values + + {'open': 0, 'close': 1} + + returns a list consisting of ScheduledCommands + ''' + # split the time table text by line break + raw_lines = timetable.split('\n') + # remove surrounding white space + content_lines = (line.strip() for line in raw_lines) + # remove empty lines + lines = (line for line in content_lines if line) + + timed_commands =[] + for line in lines: + # split the lines into time and command + try: + raw_time, raw_command = line.split(None, 1) + except ValueError: + msg = "error in line '{}'".format(line) + raise ValueError(msg) + # parse time and commands + delta = parse_time(raw_time) + cmd = parse_command(raw_command, available_commands) + # add a ScheduledCommand to the resulting list + tc = ScheduledCommand(delta, raw_command, cmd) + timed_commands.append(tc) + return timed_commands + + +def parse_time(time_str): + ''' parses a string to extract the time information + + time_str: + string representaion of a time like '00:03.0' + + returns a timedelta object, e.g. timedelta(seconds=3) + ''' + # try the available time formats + for format_str in TIMETABLE_FORMATS: + try: + time_obj = datetime.strptime(time_str, format_str) + break + except ValueError: + pass + else: + msg = "time data '{}' does not match any format".format(time_str) + raise ValueError(msg) + return timedelta( + hours=time_obj.hour, + minutes=time_obj.minute, + seconds=time_obj.second, + microseconds=time_obj.microsecond + ) + + +def parse_command(command, available_commands): + ''' parses a string and checks if it is a valid command + + raw_command: + human readable representation of the command + + available_commands: + a dictionary containing the available human readable commands as + keys and the commands to send as values + + {'open': 0, 'close': 1} + + returns the command to send over serial + ''' + try: + cmd = available_commands[command.lower()] + except KeyError: + msg = "unknown command '{}'".format(command) + raise ValueError(msg) + if isinstance(cmd, str): + cmd = cmd.encode('utf-8') + return cmd + + +def find_arduino_port(): + ''' returns the port where an arduino is connected ''' + # some regular expressions matching arduino and genuino + tmp = serial.tools.list_ports.grep('.+uino.+') + port_list = list(tmp) + # rais an error, if no or more than one arduinos are found + if len(port_list) == 0: + raise IOError('no arduino port found') + elif len(port_list) > 1: + raise IOError('{} arduino ports found'.format(len(port_list))) + # return only the device name + port_info = port_list[0] + return port_info.device + + +def send_command(scheduled_command, serial_connection): + ''' sends a command over a serial connection ''' + serial_connection.write(scheduled_command.cmd) + print(str(scheduled_command.delta), scheduled_command.command) + + +def run(scheduled_commands, serial_connection): + ''' run scheduled commands + + scheduled_commands: + list of ScheduledCommands + + serial_connection: + serial connection object to send commands + ''' + # use a new event loop every time + # if only one eventloop is used, the interactive use of run() + # does not work since the loop is closed in the end + loop = asyncio.new_event_loop() + for scheduled_command in scheduled_commands: + loop.call_later( + scheduled_command.delta.total_seconds(), + send_command, + scheduled_command, + serial_connection + ) + # loop should stop one second after last command in list + timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) + loop.call_later(timed_loop_stop.total_seconds(), loop.stop) + try: + loop.run_forever() + except KeyboardInterrupt: + loop.stop() + loop.close() + + +class TimedCommands(object): + ''' lightweight encapsulation of the functions in the module ''' + + def __init__(self, + timetable, + available_commands, + port=None, + baudrate=9600, + **serial_kwargs): + ''' parses a time table and establishes a serial connection + + timetable: + a textual representaion of time and commands like + + 00:12.0 close + 00:14.0 open + 00:16.0 close + 00:18.0 open + + available_commands: + a dictionary containing the available human readable commands + as keys and the commands to send as values. + If an other iterable with strings is provided, the lowercase + version of the strings will be used as human readable commands + and the first letter of the strings as commands to send over the + wire. + + port [None] + port where the Arduino is connected + if port is None, the ports are scanned for an Arduino + + baudrate [9600] + speed of the serial connection + + serial_kwargs + dictionary with further arguments for the serial connection + ''' + # make sure the available commands are a dictionary + cmd_dict = self._ensure_command_dict(available_commands) + # parse the time table into something suitable + self.commands = parse_time_table(timetable, cmd_dict) + + # establish the serial connection + port = port or find_arduino_port() + self.serial = serial.Serial(port, baudrate, **serial_kwargs) + + + def _ensure_command_dict(self, iterable): + ''' ensures, that the available commands are a dictionary + + iterable: + if the itarable is a dictionary, the keys will be + transformed to lowercase + if it is not a dictionary, the lowercase version of the + items is used as a human readable command and the first + character of this command will be sent over the wire + ''' + + try: + if isinstance(iterable, dict): + pairs = list(iterable.items()) + keys = [key.lower() for key, value in pairs] + values = [value for key, value in pairs] + else: + tmp = (str(item) for item in iterable) + keys = [item.lower() for item in tmp] + values = [item[0] for item in keys] + pairs = zip(keys, values) + return dict(pairs) + except: + msg = 'available commands should be a list or dict of strings' + raise TypeError(msg) + + + def run(self): + ''' run the scheduled commands ''' + run(self.commands, self.serial) + + +class TrapControl(TimedCommands): + ''' A simple, stripped down version for the magnetic plug trap ''' + + def __init__(self, timetable): + super().__init__(timetable, ['open', 'close']) + + diff --git a/arduino_timetable/arduino_timetable.py b/arduino_timetable/arduino_timetable.py deleted file mode 100644 index e69de29..0000000