From dd6e3493b9641365f09ba25dd2143f65d28859df Mon Sep 17 00:00:00 2001 From: Holger Frey Date: Thu, 14 Sep 2017 14:54:20 +0200 Subject: [PATCH] some cleanup --- arduino_timetable/__init__.py | 342 ++------------------------------ arduino_timetable/timetable.py | 344 +++++++++++++++++++++++++++++++++ run_magnetic_trap.py | 5 + 3 files changed, 360 insertions(+), 331 deletions(-) create mode 100644 arduino_timetable/timetable.py create mode 100644 run_magnetic_trap.py diff --git a/arduino_timetable/__init__.py b/arduino_timetable/__init__.py index 50becf4..c0e8e82 100644 --- a/arduino_timetable/__init__.py +++ b/arduino_timetable/__init__.py @@ -1,333 +1,13 @@ -''' Controlling an Arduino with a time table ''' - -import asyncio -import serial -import serial.tools.list_ports - -from collections import namedtuple -from datetime import datetime, timedelta -from openpyxl import load_workbook +''' Controlling an Arduino with commands in a time table ''' + +from .gui import run_application +from .timetable import ( + parse_time_table, + parse_excel_file, + find_arduino_port, + send_command, + run, + TimedCommands + ) __version__ = '0.0.1' - - -# example time table for the TrapControl -example = ''' - 00:00.0 close - 00:06.0 open - 00:08.0 close - 00:10.0 open - 00:12.0 close - 00:14.0 open - 00:16.0 close - 00:18.0 open - 00:20.0 close - 00:22.0 open - ''' - - -# datetime formats for timetable parsing -TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f'] - - -# named tuple for a single scheduled command -ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd') - - -def parse_time_table(timetable, available_commands): - ''' parses a (text) time table into a list of ScheduledCommands - - timetable: - a textual representaion of time and commands like - - 00:12.0 close - 00:14.0 open - 00:16.0 close - 00:18.0 open - - available_commands: - a dictionary containing the available human readable commands as - keys and the commands to send as values - - {'open': 0, 'close': 1} - - returns a list consisting of ScheduledCommands - ''' - # split the time table text by line break - raw_lines = timetable.split('\n') - # remove surrounding white space - content_lines = (line.strip() for line in raw_lines) - # remove empty lines - lines = (line for line in content_lines if line) - - timed_commands =[] - for line in lines: - # split the lines into time and command - try: - raw_time, raw_command = line.split(None, 1) - except ValueError: - msg = "error in line '{}'".format(line) - raise ValueError(msg) - # parse time and commands - delta = parse_time(raw_time) - cmd = parse_command(raw_command, available_commands) - # add a ScheduledCommand to the resulting list - tc = ScheduledCommand(delta, raw_command, cmd) - timed_commands.append(tc) - return timed_commands - - -def parse_excel_file(path, available_commands): - ''' parses an time table in an excel file into a list of ScheduledCommands - - path: - path to excel file - the time must be in the first column of the first sheet, the command in - the second column - - available_commands: - a dictionary containing the available human readable commands as - keys and the commands to send as values - - {'open': 0, 'close': 1} - - returns a list consisting of ScheduledCommands - ''' - workbook = load_workbook(path, read_only=True) - sheets = workbook.get_sheet_names() - sheet_name = sheets[0] - sheet = workbook[sheet_name] - - is_header = True - timed_commands = [] - for i, row in enumerate(sheet.rows): - delta = parse_time_cell(row[0]) - print(i, row[0].value, delta) - if delta is None: - # no time in the cell - if is_header: - # this is still a header - continue - else: - # not the header and not a time cell - # premature end of list - break - else: - # a time in the cell, this is not a header any more - is_header = False - raw_command = row[1].value - cmd = parse_command(raw_command, available_commands) - # add a ScheduledCommand to the resulting list - tc = ScheduledCommand(delta, raw_command, cmd) - timed_commands.append(tc) - return timed_commands - - - -def parse_time_cell(excel_cell): - if excel_cell.is_date: - return time_as_timedelta(excel_cell.value) - if isinstance(excel_cell.value, float): - return timedelta(days=excel_cell.value) - try: - delta = parse_time(excel_cell.value) - return delta - except ValueError: - return None - - - - -def parse_time(time_str): - ''' parses a string to extract the time information - - time_str: - string representaion of a time like '00:03.0' - - returns a timedelta object, e.g. timedelta(seconds=3) - ''' - # try the available time formats - for format_str in TIMETABLE_FORMATS: - try: - time_obj = datetime.strptime(time_str, format_str) - break - except ValueError: - pass - else: - msg = "time data '{}' does not match any format".format(time_str) - raise ValueError(msg) - return time_as_timedelta(time_obj) - - -def time_as_timedelta(time_object): - ''' converts a time object to a timedelta ''' - return timedelta( - hours=time_object.hour, - minutes=time_object.minute, - seconds=time_object.second, - microseconds=time_object.microsecond - ) - -def parse_command(command, available_commands): - ''' parses a string and checks if it is a valid command - - raw_command: - human readable representation of the command - - available_commands: - a dictionary containing the available human readable commands as - keys and the commands to send as values - - {'open': 0, 'close': 1} - - returns the command to send over serial - ''' - try: - cmd = available_commands[command.lower()] - except KeyError: - msg = "unknown command '{}'".format(command) - raise ValueError(msg) - if isinstance(cmd, str): - cmd = cmd.encode('utf-8') - return cmd - - -def find_arduino_port(): - ''' returns the port where an arduino is connected ''' - # some regular expressions matching arduino and genuino - tmp = serial.tools.list_ports.grep('.+uino.+') - port_list = list(tmp) - # rais an error, if no or more than one arduinos are found - if len(port_list) == 0: - raise IOError('no arduino port found') - elif len(port_list) > 1: - raise IOError('{} arduino ports found'.format(len(port_list))) - # return only the device name - port_info = port_list[0] - return port_info.device - - -def send_command(scheduled_command, serial_connection): - ''' sends a command over a serial connection ''' - serial_connection.write(scheduled_command.cmd) - print(str(scheduled_command.delta), scheduled_command.command) - - -def run(scheduled_commands, serial_connection): - ''' run scheduled commands - - scheduled_commands: - list of ScheduledCommands - - serial_connection: - serial connection object to send commands - ''' - # use a new event loop every time - # if only one eventloop is used, the interactive use of run() - # does not work since the loop is closed in the end - loop = asyncio.new_event_loop() - for scheduled_command in scheduled_commands: - loop.call_later( - scheduled_command.delta.total_seconds(), - send_command, - scheduled_command, - serial_connection - ) - # loop should stop one second after last command in list - timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) - loop.call_later(timed_loop_stop.total_seconds(), loop.stop) - try: - loop.run_forever() - except KeyboardInterrupt: - loop.stop() - loop.close() - - -class TimedCommands(object): - ''' lightweight encapsulation of the functions in the module ''' - - def __init__(self, - timetable, - available_commands, - port=None, - baudrate=9600, - **serial_kwargs): - ''' parses a time table and establishes a serial connection - - timetable: - a file path to an excel file or - a textual representaion of time and commands like - - 00:12.0 close - 00:14.0 open - 00:16.0 close - 00:18.0 open - - available_commands: - a dictionary containing the available human readable commands - as keys and the commands to send as values. - If an other iterable with strings is provided, the lowercase - version of the strings will be used as human readable commands - and the first letter of the strings as commands to send over the - wire. - - port [None] - port where the Arduino is connected - if port is None, the ports are scanned for an Arduino - - baudrate [9600] - speed of the serial connection - - serial_kwargs - dictionary with further arguments for the serial connection - ''' - # make sure the available commands are a dictionary - cmd_dict = self._ensure_command_dict(available_commands) - # parse the time table into something suitable - if '.xls' in timetable: - self.commands = parse_excel_file(timetable, cmd_dict) - else: - self.commands = parse_time_table(timetable, cmd_dict) - - # establish the serial connection - port = port or find_arduino_port() - self.serial = serial.Serial(port, baudrate, **serial_kwargs) - - - def _ensure_command_dict(self, iterable): - ''' ensures, that the available commands are a dictionary - - iterable: - if the itarable is a dictionary, the keys will be - transformed to lowercase - if it is not a dictionary, the lowercase version of the - items is used as a human readable command and the first - character of this command will be sent over the wire - ''' - - try: - if isinstance(iterable, dict): - pairs = list(iterable.items()) - keys = [key.lower() for key, value in pairs] - values = [value for key, value in pairs] - else: - tmp = (str(item) for item in iterable) - keys = [item.lower() for item in tmp] - values = [item[0] for item in keys] - pairs = zip(keys, values) - return dict(pairs) - except: - msg = 'available commands should be a list or dict of strings' - raise TypeError(msg) - - - def run(self): - ''' run the scheduled commands ''' - run(self.commands, self.serial) - - -class TrapControl(TimedCommands): - ''' A simple, stripped down version for the magnetic plug trap ''' - - def __init__(self, timetable): - super().__init__(timetable, ['open', 'close']) diff --git a/arduino_timetable/timetable.py b/arduino_timetable/timetable.py new file mode 100644 index 0000000..276be74 --- /dev/null +++ b/arduino_timetable/timetable.py @@ -0,0 +1,344 @@ +''' Controlling an Arduino with a time table ''' + +import asyncio +import serial +import serial.tools.list_ports + +from collections import namedtuple +from datetime import datetime, timedelta +from openpyxl import load_workbook + +# example time table for the TrapControl +example = ''' + 00:00.0 close + 00:06.0 open + 00:08.0 close + 00:10.0 open + 00:12.0 close + 00:14.0 open + 00:16.0 close + 00:18.0 open + 00:20.0 close + 00:22.0 open + ''' + + +# datetime formats for timetable parsing +TIMETABLE_FORMATS = ['%M:%S', '%M:%S.%f', '%H:%M:%S', '%H:%M:%S.%f'] + + +# named tuple for a single scheduled command +ScheduledCommand = namedtuple('ScheduledCommand', 'delta command cmd') + + +def parse_time_table(timetable, available_commands=None): + ''' parses a (text) time table into a list of ScheduledCommands + + timetable: + a textual representaion of time and commands like + + 00:12.0 close + 00:14.0 open + 00:16.0 close + 00:18.0 open + + available_commands: + a dictionary containing the available human readable commands as + keys and the commands to send as values + + {'open': 0, 'close': 1} + + if None is provided, the check is skipped and the first lower case + letter used + + returns a list consisting of ScheduledCommands + ''' + # split the time table text by line break + raw_lines = timetable.split('\n') + # remove surrounding white space + content_lines = (line.strip() for line in raw_lines) + # remove empty lines + lines = (line for line in content_lines if line) + + timed_commands =[] + for line in lines: + # split the lines into time and command + try: + raw_time, raw_command = line.split(None, 1) + except ValueError: + msg = "error in line '{}'".format(line) + raise ValueError(msg) + # parse time and commands + delta = parse_time(raw_time) + cmd = parse_command(raw_command, available_commands) + # add a ScheduledCommand to the resulting list + tc = ScheduledCommand(delta, raw_command, cmd) + timed_commands.append(tc) + return timed_commands + + +def parse_excel_file(path, available_commands=None): + ''' parses an time table in an excel file into a list of ScheduledCommands + + path: + path to excel file + the time must be in the first column of the first sheet, the command in + the second column + + available_commands: + a dictionary containing the available human readable commands as + keys and the commands to send as values + + {'open': 0, 'close': 1} + + if None is provided, the check is skipped and the first lower case + letter used + + returns a list consisting of ScheduledCommands + ''' + workbook = load_workbook(path, read_only=True) + sheets = workbook.get_sheet_names() + sheet_name = sheets[0] + sheet = workbook[sheet_name] + + is_header = True + timed_commands = [] + for i, row in enumerate(sheet.rows): + delta = parse_time_cell(row[0]) + if delta is None: + # no time in the cell + if is_header: + # this is still a header + continue + else: + # not the header and not a time cell + # premature end of list + break + else: + # a time in the cell, this is not a header any more + is_header = False + raw_command = row[1].value + cmd = parse_command(raw_command, available_commands) + # add a ScheduledCommand to the resulting list + tc = ScheduledCommand(delta, raw_command, cmd) + timed_commands.append(tc) + return timed_commands + + +def parse_time_cell(excel_cell): + ''' parse an excel cell that contains a time value + + if no time value is found in the excel cell None will be returned + ''' + if excel_cell.is_date: + # the excel value is a datetime object + return time_as_timedelta(excel_cell.value) + if isinstance(excel_cell.value, float): + # sometimes excel stores times as a float value + return timedelta(days=excel_cell.value) + try: + # if it is not a common date fomat, try to parse it + delta = parse_time(excel_cell.value) + return delta + except ValueError: + return None + + +def parse_time(time_str): + ''' parses a string to extract the time information + + time_str: + string representaion of a time like '00:03.0' + + returns a timedelta object, e.g. timedelta(seconds=3) + ''' + # try the available time formats + time_str = str(time_str) + for format_str in TIMETABLE_FORMATS: + try: + time_obj = datetime.strptime(time_str, format_str) + break + except ValueError: + pass + else: + msg = "time data '{}' does not match any format".format(time_str) + raise ValueError(msg) + return time_as_timedelta(time_obj) + + +def time_as_timedelta(time_object): + ''' converts a time object to a timedelta ''' + return timedelta( + hours=time_object.hour, + minutes=time_object.minute, + seconds=time_object.second, + microseconds=time_object.microsecond + ) + +def parse_command(command, available_commands=None): + ''' parses a string and checks if it is a valid command + + raw_command: + human readable representation of the command + + available_commands: + a dictionary containing the available human readable commands as + keys and the commands to send as values + + {'open': 0, 'close': 1} + + if None is provided, the check is skipped and the first lower case + letter used + + returns the command to send over serial + ''' + if available_commands is None: + cmd = str(command)[0] + else: + try: + cmd = available_commands[command.lower()] + except KeyError: + msg = "unknown command '{}'".format(command) + raise ValueError(msg) + if isinstance(cmd, str): + cmd = cmd.encode('utf-8') + return cmd + + +def find_arduino_port(): + ''' returns the port where an arduino is connected ''' + # some regular expressions matching arduino and genuino + tmp = serial.tools.list_ports.grep('.+uino.+') + port_list = list(tmp) + # rais an error, if no or more than one arduinos are found + if len(port_list) == 0: + raise IOError('no arduino port found') + elif len(port_list) > 1: + raise IOError('{} arduino ports found'.format(len(port_list))) + # return only the device name + port_info = port_list[0] + return port_info.device + + +def send_command(scheduled_command, serial_connection): + ''' sends a command over a serial connection ''' + serial_connection.write(scheduled_command.cmd) + print(str(scheduled_command.delta), scheduled_command.command) + + +def run(scheduled_commands, serial_connection): + ''' run scheduled commands + + scheduled_commands: + list of ScheduledCommands + + serial_connection: + serial connection object to send commands + ''' + # use a new event loop every time + # if only one eventloop is used, the interactive use of run() + # does not work since the loop is closed in the end + loop = asyncio.new_event_loop() + for scheduled_command in scheduled_commands: + loop.call_later( + scheduled_command.delta.total_seconds(), + send_command, + scheduled_command, + serial_connection + ) + # loop should stop one second after last command in list + timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) + loop.call_later(timed_loop_stop.total_seconds(), loop.stop) + try: + loop.run_forever() + except KeyboardInterrupt: + loop.stop() + loop.close() + + +class TimedCommands(object): + ''' lightweight encapsulation of the functions in the module ''' + + def __init__(self, + timetable, + available_commands, + port=None, + baudrate=9600, + **serial_kwargs): + ''' parses a time table and establishes a serial connection + + timetable: + a file path to an excel file or + a textual representaion of time and commands like + + 00:12.0 close + 00:14.0 open + 00:16.0 close + 00:18.0 open + + available_commands: + a dictionary containing the available human readable commands + as keys and the commands to send as values. + If an other iterable with strings is provided, the lowercase + version of the strings will be used as human readable commands + and the first letter of the strings as commands to send over the + wire. + + port [None] + port where the Arduino is connected + if port is None, the ports are scanned for an Arduino + + baudrate [9600] + speed of the serial connection + + serial_kwargs + dictionary with further arguments for the serial connection + ''' + # make sure the available commands are a dictionary + cmd_dict = self._ensure_command_dict(available_commands) + # parse the time table into something suitable + if '.xls' in timetable: + self.commands = parse_excel_file(timetable, cmd_dict) + else: + self.commands = parse_time_table(timetable, cmd_dict) + + # establish the serial connection + port = port or find_arduino_port() + self.serial = serial.Serial(port, baudrate, **serial_kwargs) + + + def _ensure_command_dict(self, iterable): + ''' ensures, that the available commands are a dictionary + + iterable: + if the itarable is a dictionary, the keys will be + transformed to lowercase + if it is not a dictionary, the lowercase version of the + items is used as a human readable command and the first + character of this command will be sent over the wire + ''' + + try: + if isinstance(iterable, dict): + pairs = list(iterable.items()) + keys = [key.lower() for key, value in pairs] + values = [value for key, value in pairs] + else: + tmp = (str(item) for item in iterable) + keys = [item.lower() for item in tmp] + values = [item[0] for item in keys] + pairs = zip(keys, values) + return dict(pairs) + except: + msg = 'available commands should be a list or dict of strings' + raise TypeError(msg) + + + def run(self): + ''' run the scheduled commands ''' + run(self.commands, self.serial) + + + def close(self): + ''' closes an open serial connection ''' + self.serial.close() diff --git a/run_magnetic_trap.py b/run_magnetic_trap.py new file mode 100644 index 0000000..1ae08cd --- /dev/null +++ b/run_magnetic_trap.py @@ -0,0 +1,5 @@ +from arduino_timetable import run_application + +if __name__ == '__main__': + arduino_commands = {'open': 'o', 'close': 'c'} + run_application(arduino_commands, "It's a trap!")