''' A Tkinter Gui Application for running scheduled commands ''' # library imports import asyncio import os import serial import tkinter as tk import tkinter.ttk as ttk from enum import Enum from datetime import timedelta from multiprocessing import Process, Lock from tkinter import filedialog from tkinter import messagebox # local imports from .timetable import parse_excel_file, find_arduino_port # delay for checking if commands are running on the Arduino CHECK_DELAY = 100 # delay after connecting to the arduino before a programm can be run CONNECT_DELAY = 2500 class ArduinoState(Enum): ''' State of the current Arduino setup ''' not_initialized = 'not_initialized' connected = 'connected' running = 'running' finished = 'finished' def arduino_subprocess(lock, com_port, scheduled_commands, **kwargs): ''' connecting to the Arduiono and running the scheduled commands lock: Separaion controll between establishing connection and running commands. Must be of class multiprocessing.Lock com_port: port where the Arduino is connected scheduled_commands: list of .timetable.TimedCommands to run **kwargs: arguments used to open serial connection the most importent one is 'baudrate', defaults to 9600 To run the scheduled commands a separate process is used. Otherwise the execution of the gui would stop until the commands finish. This would lead to an unresponsive gui programm. The serial connection is not pickable and therfore the connection to the arduino and running the commands must be made in the same process. To have a separation between establishing the connection and running the commands, a simple lock is used. Before starting the subprocess, the lock will be acquired by the main programm and releasing the lock will start the commands. ''' # get a custom baudrate or use the default 9600 baudrate = kwargs.pop('baudrate', 9600) with serial.Serial(com_port, baudrate=baudrate, **kwargs) as connection: # use a fresh asyncio event loop every time loop = asyncio.new_event_loop() # wait until the lock is released to start the commands lock.acquire() try: # schedule the commands for execution for scheduled_command in scheduled_commands: loop.call_later( scheduled_command.delta.total_seconds(), connection.write, scheduled_command.cmd ) # loop should stop one second after last command in list timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) loop.call_later(timed_loop_stop.total_seconds(), loop.stop) # run the loop try: loop.run_forever() except KeyboardInterrupt: loop.stop() loop.close() finally: lock.release() def timedelta_as_str(timedelta): ''' a nice string output for a timedelta shamlessly copied from the original timedelta implementation, but the original implementation displays the microseconds always with six characters. ''' mm, ss = divmod(timedelta.seconds, 60) hh, mm = divmod(mm, 60) s = "%d:%02d:%02d" % (hh, mm, ss) if timedelta.days: def plural(n): return n, abs(n) != 1 and "s" or "" s = ("%d day%s, " % plural(timedelta.days)) + s # microseconds are stored as int between 0 and 999999 inclusive s = s + ".%d" % (timedelta.microseconds / 10**5) return s class FilePanel(tk.Frame): ''' Tkinter Panel for selecting files ''' def __init__(self, parent): tk.Frame.__init__(self, parent.master) self._parent = parent self.btn_files = tk.Button( self, text="Select Excel File", command=self._parent.select_files ) self.btn_files.pack(pady=5, padx=5) sep = ttk.Separator(self, orient=tk.HORIZONTAL) sep.pack(side=tk.BOTTOM, fill=tk.X) self.pack(side=tk.TOP, fill=tk.X) class ParsedCommandsPanel(tk.Frame): ''' Tkinter Panel displaying the scheduled commands to be run ''' def __init__(self, parent): tk.Frame.__init__(self, parent.master) self._parent = parent self.label = tk.Label(self) self.label.pack(fill=tk.X, pady=5, padx=5) self.messages = tk.Message(self, text='', font=("Courier", 8)) self.messages.pack(fill=tk.X, pady=5, padx=5) self.pack(side=tk.TOP, fill=tk.X) def set_schedule_text(self, text): ''' display the scheuled commands to run ''' self.messages.config(text=text) self.messages.update_idletasks() def set_label_text(self, text): ''' display the filename of the scheduled commands ''' self.label.config(text=text) self.label.update_idletasks() def clear_text(self): ''' clear all text in this panel ''' self.set_schedule_text('') self.set_label_text('') def show_schedule(self, command_list): ''' transformes the command list to a textual representation ''' times = [timedelta_as_str(cmd.delta) for cmd in command_list] commands = [cmd.command for cmd in command_list] # search for the longest time text max_len_limes = max(len(time_str) for time_str in times) # template for each command line template = '{delta:<{size}} {cmd}' # assemble all lines lines = [ template.format(delta=delta, size=max_len_limes, cmd=cmd) for delta, cmd in zip(times, commands) ] # return the text for the command list self.set_schedule_text('\n'.join(lines)) class ActionPanel(tk.Frame): ''' Tkinter Panel for the action buttons ''' def __init__(self, parent): tk.Frame.__init__(self, parent.master) self._parent = parent # different configurations for the 'run' button self.btn_config_disabled = { 'text': 'Run Commands', 'background': 'SystemButtonFace', 'activebackground': 'SystemButtonFace', 'state': 'disabled' } self.btn_config_wait = { 'text': 'Connecting', 'background': 'SystemButtonFace', 'activebackground': 'SystemButtonFace', 'state': 'disabled' } self.btn_config_ready = { 'text': 'Run Comands', 'background': 'green', 'activebackground': 'green', 'state': 'normal' } self.btn_config_running = { 'text': 'Stop Comands', 'background': 'red', 'activebackground': 'red', 'state': 'active' } ttk.Separator(self, orient=tk.HORIZONTAL).pack(side=tk.TOP, fill=tk.X) self.btn_go = tk.Button( self, command=self._parent.go_button_clicked, **self.btn_config_disabled ) self.btn_go.pack(side=tk.LEFT, pady=5, padx=5) self.btn_quit = tk.Button(self, text="Quit", command=self._parent.quit) self.btn_quit.pack(side=tk.RIGHT, pady=5, padx=5) self.pack(side=tk.BOTTOM, fill=tk.X) def update_button_state(self): ''' update the button state reflecting the state of the Arduino ''' if self._parent.arduino_state == ArduinoState.running: self.btn_go.config(**self.btn_config_running) elif self._parent.arduino_state == ArduinoState.not_initialized: self.btn_go.config(**self.btn_config_disabled) else: # establishing the connection takes some time, show a 'connecting' # state and after some time show the 'ready' state self.btn_go.config(**self.btn_config_wait) self.after(CONNECT_DELAY, self._enable_button) self.btn_go.update_idletasks() def _enable_button(self): ''' shows the 'ready' state of the button ''' self.btn_go.config(**self.btn_config_ready) self.btn_go.update_idletasks() class Application(tk.Frame): ''' The Tkinter application containing the buisiness logic ''' def __init__(self, master, available_commands): ''' initialization master: tkinter root object available_commands: list of available commands for the Arduino ''' master.minsize(height=330, width=300) tk.Frame.__init__(self, master) self._master = master self.pack(fill=tk.BOTH) # the layout sections self.file_panel = FilePanel(self) self.program_panel = ParsedCommandsPanel(self) self.action_panel = ActionPanel(self) # properties for the buisiness logic self.available_commands = available_commands self.scheduled_commands = [] self.arduino_process = None self.arduino_lock = None self._arduino_state = ArduinoState.not_initialized self.reset() @property def arduino_state(self): ''' property for the current state of the arduino ''' return self._arduino_state @arduino_state.setter def arduino_state(self, new_state): ''' setter for the arduino state property as soon as the property changes, the 'go'-button will be updated ''' self._arduino_state = new_state self.action_panel.update_button_state() def select_files(self): ''' selecting a new excel file with scheduled_commands ''' # open the file selection dialog opts = { 'initialdir': '~/', 'filetypes': [('Excel Files', '.xlsx')], 'multiple': False} excel_file = tk.filedialog.askopenfilename(**opts) # if no excel file is selected, do nothing if not excel_file: return # reset the application self.reset() # try to parse the excel file self.scheduled_commands = parse_excel_file( excel_file, self.available_commands ) if self.scheduled_commands: # if parsing resultet in a schedule, prepare everything to run self.arduino_prepare_process() self.program_panel.set_label_text(os.path.basename(excel_file)) self.program_panel.show_schedule(self.scheduled_commands) def reset(self): ''' reset the application state ''' # stop any connection to the Arduino if self.arduino_state in (ArduinoState.running, ArduinoState.connected): self.arduino_stop_process() self.action_panel.update_button_state() self.program_panel.clear_text() def go_button_clicked(self): ''' start scheduled commands or stop the execution ''' if self.arduino_state == ArduinoState.running: # 'Stop Commands' clicked. self.arduino_stop_process() self.arduino_prepare_process() else: # 'Run Commands' clicked. # the connection is already established, we just need to release # the lock self.arduino_lock.release() self.arduino_state = ArduinoState.running # regular check if the scheduled commands finished self.after(CHECK_DELAY, self._check_process) def _check_process(self): ''' checks if the scheduled commands have already finished ''' if self.arduino_process: if self.arduino_process.is_alive(): # still running, recheck after a short delay self.after(CHECK_DELAY, self._check_process) else: # commands finished self.arduino_stop_process() self.arduino_prepare_process() def arduino_stop_process(self): ''' stops a connected Arduino ''' if self.arduino_state != ArduinoState.not_initialized: # only do this if a connection was established loop = asyncio.get_event_loop() loop.stop() loop.close() self.arduino_process.terminate() self.arduino_process = None self.arduino_lock = None self.arduino_state = ArduinoState.not_initialized def arduino_prepare_process(self): ''' open a subprocess setting up the arduino ''' # find the com port of the Arduino try: com_port = find_arduino_port() except IOError as e: messagebox.showerror('COM Port', 'No Arduino found') return except serial.SerialException: messagebox.showerror('COM Port', 'Could not talk to Arduino') return # Create a lock and acquire it. This will put the subprocess on hold # after establishing the connection. As soon as the lock is released, # the scheduled commands will be run self.arduino_lock = Lock() self.arduino_lock.acquire() # start the subprocess for the Arduino connection self.arduino_process = Process( target=arduino_subprocess, args=( self.arduino_lock, com_port, self.scheduled_commands ) ) self.arduino_process.start() self.arduino_state = ArduinoState.connected def quit(self): ''' stop the programm ''' self.arduino_stop_process() self._master.destroy() def run_application(available_commands, title='Arduino Scheduled Commands'): ''' run the gui application available_commands: list of available commands for the Arduino title: window title bar text ''' # create the Tkinter root and instantiate the application root = tk.Tk() root.wm_title(title) app = Application(master=root, available_commands=available_commands) # use app.quit() as callback if the window is closed (with the 'X' button) # if an Arduino connection is established, the standard callback does not # stop the subprocess and therfore the application would not quit root.protocol("WM_DELETE_WINDOW", app.quit) # start the application app.mainloop() try: root.destroy() except tk.TclError: pass