Holger Frey
7 years ago
1 changed files with 403 additions and 0 deletions
@ -0,0 +1,403 @@
@@ -0,0 +1,403 @@
|
||||
''' A Tkinter Gui Application for running scheduled commands ''' |
||||
|
||||
# library imports |
||||
import asyncio |
||||
import os |
||||
import serial |
||||
import tkinter as tk |
||||
import tkinter.ttk as ttk |
||||
|
||||
from enum import Enum |
||||
from datetime import timedelta |
||||
from multiprocessing import Process, Lock |
||||
from tkinter import filedialog |
||||
from tkinter import messagebox |
||||
|
||||
# local imports |
||||
from .timetable import parse_excel_file, find_arduino_port |
||||
|
||||
# delay for checking if commands are running on the Arduino |
||||
CHECK_DELAY = 100 |
||||
# delay after connecting to the arduino before a programm can be run |
||||
CONNECT_DELAY = 2500 |
||||
|
||||
|
||||
class ArduinoState(Enum): |
||||
''' State of the current Arduino setup ''' |
||||
not_initialized = 'not_initialized' |
||||
connected = 'connected' |
||||
running = 'running' |
||||
finished = 'finished' |
||||
|
||||
|
||||
def arduino_subprocess(lock, com_port, scheduled_commands, **kwargs): |
||||
''' connecting to the Arduiono and running the scheduled commands |
||||
|
||||
lock: |
||||
Separaion controll between establishing connection and running |
||||
commands. Must be of class multiprocessing.Lock |
||||
com_port: |
||||
port where the Arduino is connected |
||||
scheduled_commands: |
||||
list of .timetable.TimedCommands to run |
||||
**kwargs: |
||||
arguments used to open serial connection |
||||
the most importent one is 'baudrate', defaults to 9600 |
||||
|
||||
To run the scheduled commands a separate process is used. Otherwise the |
||||
execution of the gui would stop until the commands finish. This would |
||||
lead to an unresponsive gui programm. |
||||
|
||||
The serial connection is not pickable and therfore the connection to the |
||||
arduino and running the commands must be made in the same process. To |
||||
have a separation between establishing the connection and running the |
||||
commands, a simple lock is used. Before starting the subprocess, the lock |
||||
will be acquired by the main programm and releasing the lock will start |
||||
the commands. |
||||
''' |
||||
# get a custom baudrate or use the default 9600 |
||||
baudrate = kwargs.pop('baudrate', 9600) |
||||
with serial.Serial(com_port, baudrate=baudrate, **kwargs) as connection: |
||||
# use a fresh asyncio event loop every time |
||||
loop = asyncio.new_event_loop() |
||||
# wait until the lock is released to start the commands |
||||
lock.acquire() |
||||
try: |
||||
# schedule the commands for execution |
||||
for scheduled_command in scheduled_commands: |
||||
loop.call_later( |
||||
scheduled_command.delta.total_seconds(), |
||||
connection.write, |
||||
scheduled_command.cmd |
||||
) |
||||
# loop should stop one second after last command in list |
||||
timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) |
||||
loop.call_later(timed_loop_stop.total_seconds(), loop.stop) |
||||
# run the loop |
||||
try: |
||||
loop.run_forever() |
||||
except KeyboardInterrupt: |
||||
loop.stop() |
||||
loop.close() |
||||
finally: |
||||
lock.release() |
||||
|
||||
|
||||
def timedelta_as_str(timedelta): |
||||
''' a nice string output for a timedelta |
||||
|
||||
shamlessly copied from the original timedelta implementation, but the |
||||
original implementation displays the microseconds always with six |
||||
characters. |
||||
''' |
||||
mm, ss = divmod(timedelta.seconds, 60) |
||||
hh, mm = divmod(mm, 60) |
||||
s = "%d:%02d:%02d" % (hh, mm, ss) |
||||
if timedelta.days: |
||||
def plural(n): |
||||
return n, abs(n) != 1 and "s" or "" |
||||
s = ("%d day%s, " % plural(timedelta.days)) + s |
||||
|
||||
# microseconds are stored as int between 0 and 999999 inclusive |
||||
s = s + ".%d" % (timedelta.microseconds / 10**5) |
||||
|
||||
return s |
||||
|
||||
|
||||
class FilePanel(tk.Frame): |
||||
''' Tkinter Panel for selecting files ''' |
||||
|
||||
def __init__(self, parent): |
||||
tk.Frame.__init__(self, parent.master) |
||||
self._parent = parent |
||||
self.btn_files = tk.Button( |
||||
self, |
||||
text="Select Excel File", |
||||
command=self._parent.select_files |
||||
) |
||||
self.btn_files.pack(pady=5, padx=5) |
||||
sep = ttk.Separator(self, orient=tk.HORIZONTAL) |
||||
sep.pack(side=tk.BOTTOM, fill=tk.X) |
||||
self.pack(side=tk.TOP, fill=tk.X) |
||||
|
||||
|
||||
class ParsedCommandsPanel(tk.Frame): |
||||
''' Tkinter Panel displaying the scheduled commands to be run ''' |
||||
|
||||
def __init__(self, parent): |
||||
tk.Frame.__init__(self, parent.master) |
||||
self._parent = parent |
||||
self.label = tk.Label(self) |
||||
self.label.pack(fill=tk.X, pady=5, padx=5) |
||||
self.messages = tk.Message(self, text='', font=("Courier", 8)) |
||||
self.messages.pack(fill=tk.X, pady=5, padx=5) |
||||
self.pack(side=tk.TOP, fill=tk.X) |
||||
|
||||
def set_schedule_text(self, text): |
||||
''' display the scheuled commands to run ''' |
||||
self.messages.config(text=text) |
||||
self.messages.update_idletasks() |
||||
|
||||
def set_label_text(self, text): |
||||
''' display the filename of the scheduled commands ''' |
||||
self.label.config(text=text) |
||||
self.label.update_idletasks() |
||||
|
||||
def clear_text(self): |
||||
''' clear all text in this panel ''' |
||||
self.set_schedule_text('') |
||||
self.set_label_text('') |
||||
|
||||
def show_schedule(self, command_list): |
||||
''' transformes the command list to a textual representation ''' |
||||
times = [timedelta_as_str(cmd.delta) for cmd in command_list] |
||||
commands = [cmd.command for cmd in command_list] |
||||
# search for the longest time text |
||||
max_len_limes = max(len(time_str) for time_str in times) |
||||
# template for each command line |
||||
template = '{delta:<{size}} {cmd}' |
||||
# assemble all lines |
||||
lines = [ |
||||
template.format(delta=delta, size=max_len_limes, cmd=cmd) |
||||
for delta,cmd |
||||
in zip(times, commands) |
||||
] |
||||
# return the text for the command list |
||||
self.set_schedule_text('\n'.join(lines)) |
||||
|
||||
|
||||
class ActionPanel(tk.Frame): |
||||
''' Tkinter Panel for the action buttons ''' |
||||
|
||||
def __init__(self, parent): |
||||
tk.Frame.__init__(self, parent.master) |
||||
self._parent = parent |
||||
# different configurations for the 'run' button |
||||
self.btn_config_disabled = { |
||||
'text': 'Run Commands', |
||||
'background': 'SystemButtonFace', |
||||
'activebackground': 'SystemButtonFace', |
||||
'state': 'disabled' |
||||
} |
||||
self.btn_config_wait = { |
||||
'text': 'Connecting', |
||||
'background': 'SystemButtonFace', |
||||
'activebackground': 'SystemButtonFace', |
||||
'state': 'disabled' |
||||
} |
||||
self.btn_config_ready = { |
||||
'text': 'Run Comands', |
||||
'background': 'green', |
||||
'activebackground': 'green', |
||||
'state': 'normal' |
||||
} |
||||
self.btn_config_running = { |
||||
'text': 'Stop Comands', |
||||
'background': 'red', |
||||
'activebackground': 'red', |
||||
'state': 'active' |
||||
} |
||||
ttk.Separator(self, orient=tk.HORIZONTAL).pack(side=tk.TOP, fill=tk.X) |
||||
self.btn_go = tk.Button( |
||||
self, |
||||
command=self._parent.go_button_clicked, |
||||
**self.btn_config_disabled |
||||
) |
||||
self.btn_go.pack(side=tk.LEFT, pady=5, padx=5) |
||||
self.btn_quit = tk.Button(self, text="Quit", command=self._parent.quit) |
||||
self.btn_quit.pack(side=tk.RIGHT, pady=5, padx=5) |
||||
self.pack(side=tk.BOTTOM, fill=tk.X) |
||||
|
||||
def update_button_state(self): |
||||
''' update the button state reflecting the state of the Arduino ''' |
||||
if self._parent.arduino_state == ArduinoState.running: |
||||
self.btn_go.config(**self.btn_config_running) |
||||
elif self._parent.arduino_state == ArduinoState.not_initialized: |
||||
self.btn_go.config(**self.btn_config_disabled) |
||||
else: |
||||
# establishing the connection takes some time, show a 'connecting' |
||||
# state and after some time show the 'ready' state |
||||
self.btn_go.config(**self.btn_config_wait) |
||||
self.after(CONNECT_DELAY, self._enable_button) |
||||
self.btn_go.update_idletasks() |
||||
|
||||
def _enable_button(self): |
||||
''' shows the 'ready' state of the button ''' |
||||
self.btn_go.config(**self.btn_config_ready) |
||||
self.btn_go.update_idletasks() |
||||
|
||||
|
||||
class Application(tk.Frame): |
||||
''' The Tkinter application containing the buisiness logic ''' |
||||
|
||||
def __init__(self, master, available_commands): |
||||
''' initialization |
||||
|
||||
master: |
||||
tkinter root object |
||||
|
||||
available_commands: |
||||
list of available commands for the Arduino |
||||
''' |
||||
master.minsize(height=330, width=300) |
||||
tk.Frame.__init__(self, master) |
||||
self._master = master |
||||
self.pack(fill=tk.BOTH) |
||||
|
||||
# the layout sections |
||||
self.file_panel = FilePanel(self) |
||||
self.program_panel = ParsedCommandsPanel(self) |
||||
self.action_panel = ActionPanel(self) |
||||
|
||||
# properties for the buisiness logic |
||||
self.available_commands = available_commands |
||||
self.scheduled_commands = [] |
||||
self.arduino_process = None |
||||
self.arduino_lock = None |
||||
self._arduino_state = ArduinoState.not_initialized |
||||
self.reset() |
||||
|
||||
@property |
||||
def arduino_state(self): |
||||
''' property for the current state of the arduino ''' |
||||
return self._arduino_state |
||||
|
||||
@arduino_state.setter |
||||
def arduino_state(self, new_state): |
||||
''' setter for the arduino state property |
||||
|
||||
as soon as the property changes, the 'go'-button will be updated |
||||
''' |
||||
self._arduino_state = new_state |
||||
self.action_panel.update_button_state() |
||||
|
||||
def select_files(self): |
||||
''' selecting a new excel file with scheduled_commands ''' |
||||
# open the file selection dialog |
||||
opts = { |
||||
'initialdir': '~/', |
||||
'filetypes': [('Excel Files', '.xlsx')], |
||||
'multiple': False} |
||||
excel_file = tk.filedialog.askopenfilename(**opts) |
||||
# if no excel file is selected, do nothing |
||||
if not excel_file: |
||||
return |
||||
# reset the application |
||||
self.reset() |
||||
# try to parse the excel file |
||||
self.scheduled_commands = parse_excel_file( |
||||
excel_file, |
||||
self.available_commands |
||||
) |
||||
if self.scheduled_commands: |
||||
# if parsing resultet in a schedule, prepare everything to run |
||||
self.arduino_prepare_process() |
||||
self.program_panel.set_label_text(os.path.basename(excel_file)) |
||||
self.program_panel.show_schedule(self.scheduled_commands) |
||||
|
||||
def reset(self): |
||||
''' reset the application state ''' |
||||
# stop any connection to the Arduino |
||||
if self.arduino_state in (ArduinoState.running, ArduinoState.connected): |
||||
self.arduino_stop_process() |
||||
self.action_panel.update_button_state() |
||||
self.program_panel.clear_text() |
||||
|
||||
def go_button_clicked(self): |
||||
''' start scheduled commands or stop the execution ''' |
||||
if self.arduino_state == ArduinoState.running: |
||||
# 'Stop Commands' clicked. |
||||
self.arduino_stop_process() |
||||
self.arduino_prepare_process() |
||||
else: |
||||
# 'Run Commands' clicked. |
||||
# the connection is already established, we just need to release |
||||
# the lock |
||||
self.arduino_lock.release() |
||||
self.arduino_state = ArduinoState.running |
||||
# regular check if the scheduled commands finished |
||||
self.after(CHECK_DELAY, self._check_process) |
||||
|
||||
def _check_process(self): |
||||
''' checks if the scheduled commands have already finished ''' |
||||
if self.arduino_process: |
||||
if self.arduino_process.is_alive(): |
||||
# still running, recheck after a short delay |
||||
self.after(CHECK_DELAY, self._check_process) |
||||
else: |
||||
# commands finished |
||||
self.arduino_stop_process() |
||||
self.arduino_prepare_process() |
||||
|
||||
def arduino_stop_process(self): |
||||
''' stops a connected Arduino ''' |
||||
if self.arduino_state != ArduinoState.not_initialized: |
||||
# only do this if a connection was established |
||||
loop = asyncio.get_event_loop() |
||||
loop.stop() |
||||
loop.close() |
||||
self.arduino_process.terminate() |
||||
self.arduino_process = None |
||||
self.arduino_lock = None |
||||
self.arduino_state = ArduinoState.not_initialized |
||||
|
||||
def arduino_prepare_process(self): |
||||
''' open a subprocess setting up the arduino ''' |
||||
# find the com port of the Arduino |
||||
try: |
||||
com_port = find_arduino_port() |
||||
except IOError as e: |
||||
messagebox.showerror('COM Port', 'No Arduino found') |
||||
return |
||||
except serial.SerialException: |
||||
messagebox.showerror('COM Port', 'Could not talk to Arduino') |
||||
return |
||||
# Create a lock and acquire it. This will put the subprocess on hold |
||||
# after establishing the connection. As soon as the lock is released, |
||||
# the scheduled commands will be run |
||||
self.arduino_lock = Lock() |
||||
self.arduino_lock.acquire() |
||||
# start the subprocess for the Arduino connection |
||||
self.arduino_process = Process( |
||||
target=arduino_subprocess, |
||||
args = ( |
||||
self.arduino_lock, |
||||
com_port, |
||||
self.scheduled_commands |
||||
) |
||||
) |
||||
self.arduino_process.start() |
||||
self.arduino_state = ArduinoState.connected |
||||
|
||||
def quit(self): |
||||
''' stop the programm ''' |
||||
self.arduino_stop_process() |
||||
self._master.destroy() |
||||
|
||||
|
||||
def run_application(available_commands, title='Arduino Scheduled Commands'): |
||||
''' run the gui application |
||||
|
||||
available_commands: |
||||
list of available commands for the Arduino |
||||
|
||||
title: |
||||
window title bar text |
||||
''' |
||||
|
||||
# create the Tkinter root and instantiate the application |
||||
root = tk.Tk() |
||||
root.wm_title(title) |
||||
app = Application(master=root, available_commands=available_commands) |
||||
|
||||
# use app.quit() as callback if the window is closed (with the 'X' button) |
||||
# if an Arduino connection is established, the standard callback does not |
||||
# stop the subprocess and therfore the application would not quit |
||||
root.protocol("WM_DELETE_WINDOW", app.quit) |
||||
|
||||
# start the application |
||||
app.mainloop() |
||||
try: |
||||
root.destroy() |
||||
except tk.TclError: |
||||
pass |
Loading…
Reference in new issue