''' Controlling an Arduino with a time table ''' import asyncio import serial import serial.tools.list_ports from collections import namedtuple from datetime import datetime, timedelta from openpyxl import load_workbook # example time table for the TrapControl example = ''' 00:00.0 close 00:06.0 open 00:08.0 close 00:10.0 open 00:12.0 close 00:14.0 open 00:16.0 close 00:18.0 open 00:20.0 close 00:22.0 open ''' # datetime formats for timetable parsing TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f'] # named tuple for a single scheduled command ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd') def parse_time_table(timetable, available_commands=None): ''' parses a (text) time table into a list of ScheduledCommands timetable: a textual representaion of time and commands like 00:12.0 close 00:14.0 open 00:16.0 close 00:18.0 open available_commands: a dictionary containing the available human readable commands as keys and the commands to send as values {'open': 0, 'close': 1} if None is provided, the check is skipped and the first lower case letter used returns a list consisting of ScheduledCommands ''' # split the time table text by line break raw_lines = timetable.split('\n') # remove surrounding white space content_lines = (line.strip() for line in raw_lines) # remove empty lines lines = (line for line in content_lines if line) timed_commands = [] for line in lines: # split the lines into time and command try: raw_time, raw_command = line.split(None, 1) except ValueError: msg = "error in line '{}'".format(line) raise ValueError(msg) # parse time and commands delta = parse_time(raw_time) cmd = parse_command(raw_command, available_commands) # add a ScheduledCommand to the resulting list tc = ScheduledCommand(delta, raw_command, cmd) timed_commands.append(tc) return timed_commands def parse_excel_file(path, available_commands=None): ''' parses an time table in an excel file into a list of ScheduledCommands path: path to excel file the time must be in the first column of the first sheet, the command in the second column available_commands: a dictionary containing the available human readable commands as keys and the commands to send as values {'open': 0, 'close': 1} if None is provided, the check is skipped and the first lower case letter used returns a list consisting of ScheduledCommands ''' workbook = load_workbook(path, read_only=True) sheets = workbook.get_sheet_names() sheet_name = sheets[0] sheet = workbook[sheet_name] is_header = True timed_commands = [] for i, row in enumerate(sheet.rows): delta = parse_time_cell(row[0]) if delta is None: # no time in the cell if is_header: # this is still a header continue else: # not the header and not a time cell # premature end of list break else: # a time in the cell, this is not a header any more is_header = False raw_command = row[1].value cmd = parse_command(raw_command, available_commands) # add a ScheduledCommand to the resulting list tc = ScheduledCommand(delta, raw_command, cmd) timed_commands.append(tc) return timed_commands def parse_time_cell(excel_cell): ''' parse an excel cell that contains a time value if no time value is found in the excel cell None will be returned ''' if excel_cell.is_date: # the excel value is a datetime object return time_as_timedelta(excel_cell.value) if isinstance(excel_cell.value, float): # sometimes excel stores times as a float value return timedelta(days=excel_cell.value) try: # if it is not a common date fomat, try to parse it delta = parse_time(excel_cell.value) return delta except ValueError: return None def parse_time(time_str): ''' parses a string to extract the time information time_str: string representaion of a time like '00:03.0' returns a timedelta object, e.g. timedelta(seconds=3) ''' # try the available time formats time_str = str(time_str) for format_str in TIMETABLE_FORMATS: try: time_obj = datetime.strptime(time_str, format_str) break except ValueError: pass else: msg = "time data '{}' does not match any format".format(time_str) raise ValueError(msg) return time_as_timedelta(time_obj) def time_as_timedelta(time_object): ''' converts a time object to a timedelta ''' return timedelta( hours=time_object.hour, minutes=time_object.minute, seconds=time_object.second, microseconds=time_object.microsecond ) def parse_command(command, available_commands=None): ''' parses a string and checks if it is a valid command raw_command: human readable representation of the command available_commands: a dictionary containing the available human readable commands as keys and the commands to send as values {'open': 0, 'close': 1} if None is provided, the check is skipped and the first lower case letter used returns the command to send over serial ''' if available_commands is None: cmd = str(command)[0] else: try: cmd = available_commands[command.lower()] except KeyError: msg = "unknown command '{}'".format(command) raise ValueError(msg) if isinstance(cmd, str): cmd = cmd.encode('utf-8') return cmd def find_arduino_port(): ''' returns the port where an arduino is connected ''' # some regular expressions matching arduino and genuino tmp = serial.tools.list_ports.grep('.+uino.+') port_list = list(tmp) # rais an error, if no or more than one arduinos are found if len(port_list) == 0: raise IOError('no arduino port found') elif len(port_list) > 1: raise IOError('{} arduino ports found'.format(len(port_list))) # return only the device name port_info = port_list[0] return port_info.device def send_command(scheduled_command, serial_connection): ''' sends a command over a serial connection ''' serial_connection.write(scheduled_command.cmd) print(str(scheduled_command.delta), scheduled_command.command) def run(scheduled_commands, serial_connection): ''' run scheduled commands scheduled_commands: list of ScheduledCommands serial_connection: serial connection object to send commands ''' # use a new event loop every time # if only one eventloop is used, the interactive use of run() # does not work since the loop is closed in the end loop = asyncio.new_event_loop() for scheduled_command in scheduled_commands: loop.call_later( scheduled_command.delta.total_seconds(), send_command, scheduled_command, serial_connection ) # loop should stop one second after last command in list timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) loop.call_later(timed_loop_stop.total_seconds(), loop.stop) try: loop.run_forever() except KeyboardInterrupt: loop.stop() loop.close() class TimedCommands(object): ''' lightweight encapsulation of the functions in the module ''' def __init__(self, timetable, available_commands, port=None, baudrate=9600, **serial_kwargs): ''' parses a time table and establishes a serial connection timetable: a file path to an excel file or a textual representaion of time and commands like 00:12.0 close 00:14.0 open 00:16.0 close 00:18.0 open available_commands: a dictionary containing the available human readable commands as keys and the commands to send as values. If an other iterable with strings is provided, the lowercase version of the strings will be used as human readable commands and the first letter of the strings as commands to send over the wire. port [None] port where the Arduino is connected if port is None, the ports are scanned for an Arduino baudrate [9600] speed of the serial connection serial_kwargs dictionary with further arguments for the serial connection ''' # make sure the available commands are a dictionary cmd_dict = self._ensure_command_dict(available_commands) # parse the time table into something suitable if '.xls' in timetable: self.commands = parse_excel_file(timetable, cmd_dict) else: self.commands = parse_time_table(timetable, cmd_dict) # establish the serial connection port = port or find_arduino_port() self.serial = serial.Serial(port, baudrate, **serial_kwargs) def _ensure_command_dict(self, iterable): ''' ensures, that the available commands are a dictionary iterable: if the itarable is a dictionary, the keys will be transformed to lowercase if it is not a dictionary, the lowercase version of the items is used as a human readable command and the first character of this command will be sent over the wire ''' try: if isinstance(iterable, dict): pairs = list(iterable.items()) keys = [key.lower() for key, value in pairs] values = [value for key, value in pairs] else: tmp = (str(item) for item in iterable) keys = [item.lower() for item in tmp] values = [item[0] for item in keys] pairs = zip(keys, values) return dict(pairs) except: msg = 'available commands should be a list or dict of strings' raise TypeError(msg) def run(self): ''' run the scheduled commands ''' run(self.commands, self.serial) def close(self): ''' closes an open serial connection ''' self.serial.close()