''' Controlling an Arduino with a time table ''' import asyncio import serial import serial.tools.list_ports from collections import namedtuple from datetime import datetime, timedelta __version__ = '0.0.1' # example time table for the TrapControl example = ''' 00:00.0 close 00:06.0 open 00:08.0 close 00:10.0 open 00:12.0 close 00:14.0 open 00:16.0 close 00:18.0 open 00:20.0 close 00:22.0 open ''' # datetime formats for timetable parsing TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f'] # named tuple for a single scheduled command ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd') def parse_time_table(timetable, available_commands): ''' parses a (text) time table into a list of ScheduledCommands timetable: a textual representaion of time and commands like 00:12.0 close 00:14.0 open 00:16.0 close 00:18.0 open available_commands: a dictionary containing the available human readable commands as keys and the commands to send as values {'open': 0, 'close': 1} returns a list consisting of ScheduledCommands ''' # split the time table text by line break raw_lines = timetable.split('\n') # remove surrounding white space content_lines = (line.strip() for line in raw_lines) # remove empty lines lines = (line for line in content_lines if line) timed_commands =[] for line in lines: # split the lines into time and command try: raw_time, raw_command = line.split(None, 1) except ValueError: msg = "error in line '{}'".format(line) raise ValueError(msg) # parse time and commands delta = parse_time(raw_time) cmd = parse_command(raw_command, available_commands) # add a ScheduledCommand to the resulting list tc = ScheduledCommand(delta, raw_command, cmd) timed_commands.append(tc) return timed_commands def parse_time(time_str): ''' parses a string to extract the time information time_str: string representaion of a time like '00:03.0' returns a timedelta object, e.g. timedelta(seconds=3) ''' # try the available time formats for format_str in TIMETABLE_FORMATS: try: time_obj = datetime.strptime(time_str, format_str) break except ValueError: pass else: msg = "time data '{}' does not match any format".format(time_str) raise ValueError(msg) return timedelta( hours=time_obj.hour, minutes=time_obj.minute, seconds=time_obj.second, microseconds=time_obj.microsecond ) def parse_command(command, available_commands): ''' parses a string and checks if it is a valid command raw_command: human readable representation of the command available_commands: a dictionary containing the available human readable commands as keys and the commands to send as values {'open': 0, 'close': 1} returns the command to send over serial ''' try: cmd = available_commands[command.lower()] except KeyError: msg = "unknown command '{}'".format(command) raise ValueError(msg) if isinstance(cmd, str): cmd = cmd.encode('utf-8') return cmd def find_arduino_port(): ''' returns the port where an arduino is connected ''' # some regular expressions matching arduino and genuino tmp = serial.tools.list_ports.grep('.+uino.+') port_list = list(tmp) # rais an error, if no or more than one arduinos are found if len(port_list) == 0: raise IOError('no arduino port found') elif len(port_list) > 1: raise IOError('{} arduino ports found'.format(len(port_list))) # return only the device name port_info = port_list[0] return port_info.device def send_command(scheduled_command, serial_connection): ''' sends a command over a serial connection ''' serial_connection.write(scheduled_command.cmd) print(str(scheduled_command.delta), scheduled_command.command) def run(scheduled_commands, serial_connection): ''' run scheduled commands scheduled_commands: list of ScheduledCommands serial_connection: serial connection object to send commands ''' # use a new event loop every time # if only one eventloop is used, the interactive use of run() # does not work since the loop is closed in the end loop = asyncio.new_event_loop() for scheduled_command in scheduled_commands: loop.call_later( scheduled_command.delta.total_seconds(), send_command, scheduled_command, serial_connection ) # loop should stop one second after last command in list timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) loop.call_later(timed_loop_stop.total_seconds(), loop.stop) try: loop.run_forever() except KeyboardInterrupt: loop.stop() loop.close() class TimedCommands(object): ''' lightweight encapsulation of the functions in the module ''' def __init__(self, timetable, available_commands, port=None, baudrate=9600, **serial_kwargs): ''' parses a time table and establishes a serial connection timetable: a file path to an excel file or a textual representaion of time and commands like 00:12.0 close 00:14.0 open 00:16.0 close 00:18.0 open available_commands: a dictionary containing the available human readable commands as keys and the commands to send as values. If an other iterable with strings is provided, the lowercase version of the strings will be used as human readable commands and the first letter of the strings as commands to send over the wire. port [None] port where the Arduino is connected if port is None, the ports are scanned for an Arduino baudrate [9600] speed of the serial connection serial_kwargs dictionary with further arguments for the serial connection ''' # make sure the available commands are a dictionary cmd_dict = self._ensure_command_dict(available_commands) # parse the time table into something suitable if '.xls' in timetable: self.commands = parse_excel_file(timetable, cmd_dict) else: self.commands = parse_time_table(timetable, cmd_dict) # establish the serial connection port = port or find_arduino_port() self.serial = serial.Serial(port, baudrate, **serial_kwargs) def _ensure_command_dict(self, iterable): ''' ensures, that the available commands are a dictionary iterable: if the itarable is a dictionary, the keys will be transformed to lowercase if it is not a dictionary, the lowercase version of the items is used as a human readable command and the first character of this command will be sent over the wire ''' try: if isinstance(iterable, dict): pairs = list(iterable.items()) keys = [key.lower() for key, value in pairs] values = [value for key, value in pairs] else: tmp = (str(item) for item in iterable) keys = [item.lower() for item in tmp] values = [item[0] for item in keys] pairs = zip(keys, values) return dict(pairs) except: msg = 'available commands should be a list or dict of strings' raise TypeError(msg) def run(self): ''' run the scheduled commands ''' run(self.commands, self.serial) class TrapControl(TimedCommands): ''' A simple, stripped down version for the magnetic plug trap ''' def __init__(self, timetable): super().__init__(timetable, ['open', 'close'])