You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
412 lines
14 KiB
412 lines
14 KiB
''' A Tkinter Gui Application for running scheduled commands ''' |
|
|
|
# library imports |
|
import asyncio |
|
import os |
|
import serial |
|
import tkinter as tk |
|
import tkinter.ttk as ttk |
|
|
|
from enum import Enum |
|
from datetime import timedelta |
|
from multiprocessing import Process, Lock |
|
from tkinter import filedialog |
|
from tkinter import messagebox |
|
|
|
# local imports |
|
from .timetable import parse_excel_file, find_arduino_port |
|
|
|
# delay for checking if commands are running on the Arduino |
|
CHECK_DELAY = 100 |
|
# delay after connecting to the arduino before a programm can be run |
|
CONNECT_DELAY = 2500 |
|
|
|
|
|
class ArduinoState(Enum): |
|
''' State of the current Arduino setup ''' |
|
not_initialized = 'not_initialized' |
|
connected = 'connected' |
|
running = 'running' |
|
finished = 'finished' |
|
|
|
|
|
def arduino_subprocess(lock, com_port, scheduled_commands, **kwargs): |
|
''' connecting to the Arduiono and running the scheduled commands |
|
|
|
lock: |
|
Separaion controll between establishing connection and running |
|
commands. Must be of class multiprocessing.Lock |
|
com_port: |
|
port where the Arduino is connected |
|
scheduled_commands: |
|
list of .timetable.TimedCommands to run |
|
**kwargs: |
|
arguments used to open serial connection |
|
the most importent one is 'baudrate', defaults to 9600 |
|
|
|
To run the scheduled commands a separate process is used. Otherwise the |
|
execution of the gui would stop until the commands finish. This would |
|
lead to an unresponsive gui programm. |
|
|
|
The serial connection is not pickable and therfore the connection to the |
|
arduino and running the commands must be made in the same process. To |
|
have a separation between establishing the connection and running the |
|
commands, a simple lock is used. Before starting the subprocess, the lock |
|
will be acquired by the main programm and releasing the lock will start |
|
the commands. |
|
''' |
|
# get a custom baudrate or use the default 9600 |
|
baudrate = kwargs.pop('baudrate', 9600) |
|
with serial.Serial(com_port, baudrate=baudrate, **kwargs) as connection: |
|
# use a fresh asyncio event loop every time |
|
loop = asyncio.new_event_loop() |
|
# wait until the lock is released to start the commands |
|
lock.acquire() |
|
try: |
|
# schedule the commands for execution |
|
for scheduled_command in scheduled_commands: |
|
loop.call_later( |
|
scheduled_command.delta.total_seconds(), |
|
connection.write, |
|
scheduled_command.cmd |
|
) |
|
# loop should stop one second after last command in list |
|
timed_loop_stop = scheduled_command.delta + timedelta(seconds=1) |
|
loop.call_later(timed_loop_stop.total_seconds(), loop.stop) |
|
# run the loop |
|
try: |
|
loop.run_forever() |
|
except KeyboardInterrupt: |
|
loop.stop() |
|
loop.close() |
|
finally: |
|
lock.release() |
|
|
|
|
|
def timedelta_as_str(timedelta): |
|
''' a nice string output for a timedelta |
|
|
|
shamlessly copied from the original timedelta implementation, but the |
|
original implementation displays the microseconds always with six |
|
characters. |
|
''' |
|
mm, ss = divmod(timedelta.seconds, 60) |
|
hh, mm = divmod(mm, 60) |
|
s = "%d:%02d:%02d" % (hh, mm, ss) |
|
if timedelta.days: |
|
def plural(n): |
|
return n, abs(n) != 1 and "s" or "" |
|
s = ("%d day%s, " % plural(timedelta.days)) + s |
|
|
|
# microseconds are stored as int between 0 and 999999 inclusive |
|
s = s + ".%d" % (timedelta.microseconds / 10**5) |
|
|
|
return s |
|
|
|
|
|
class FilePanel(tk.Frame): |
|
''' Tkinter Panel for selecting files ''' |
|
|
|
def __init__(self, parent): |
|
tk.Frame.__init__(self, parent.master) |
|
self._parent = parent |
|
self.btn_files = tk.Button( |
|
self, |
|
text="Select Excel File", |
|
command=self._parent.select_files |
|
) |
|
self.btn_files.pack(pady=5, padx=5) |
|
sep = ttk.Separator(self, orient=tk.HORIZONTAL) |
|
sep.pack(side=tk.BOTTOM, fill=tk.X) |
|
self.pack(side=tk.TOP, fill=tk.X) |
|
|
|
|
|
class ParsedCommandsPanel(tk.Frame): |
|
''' Tkinter Panel displaying the scheduled commands to be run ''' |
|
|
|
def __init__(self, parent): |
|
tk.Frame.__init__(self, parent.master) |
|
self._parent = parent |
|
self.label = tk.Label(self) |
|
self.label.pack(fill=tk.X, pady=5, padx=5) |
|
self.messages = tk.Message(self, text='', font=("Courier", 8)) |
|
self.messages.pack(fill=tk.X, pady=5, padx=5) |
|
self.pack(side=tk.TOP, fill=tk.X) |
|
|
|
def set_schedule_text(self, text): |
|
''' display the scheuled commands to run ''' |
|
self.messages.config(text=text) |
|
self.messages.update_idletasks() |
|
|
|
def set_label_text(self, text): |
|
''' display the filename of the scheduled commands ''' |
|
self.label.config(text=text) |
|
self.label.update_idletasks() |
|
|
|
def clear_text(self): |
|
''' clear all text in this panel ''' |
|
self.set_schedule_text('') |
|
self.set_label_text('') |
|
|
|
def show_schedule(self, command_list): |
|
''' transformes the command list to a textual representation ''' |
|
times = [timedelta_as_str(cmd.delta) for cmd in command_list] |
|
commands = [cmd.command for cmd in command_list] |
|
# search for the longest time text |
|
max_len_limes = max(len(time_str) for time_str in times) |
|
# template for each command line |
|
template = '{delta:<{size}} {cmd}' |
|
# assemble all lines |
|
lines = [ |
|
template.format(delta=delta, size=max_len_limes, cmd=cmd) |
|
for delta, cmd |
|
in zip(times, commands) |
|
] |
|
# return the text for the command list |
|
self.set_schedule_text('\n'.join(lines)) |
|
|
|
|
|
class ActionPanel(tk.Frame): |
|
''' Tkinter Panel for the action buttons ''' |
|
|
|
def __init__(self, parent): |
|
tk.Frame.__init__(self, parent.master) |
|
self._parent = parent |
|
# different configurations for the 'run' button |
|
self.btn_config_disabled = { |
|
'text': 'Run Commands', |
|
'background': 'SystemButtonFace', |
|
'activebackground': 'SystemButtonFace', |
|
'state': 'disabled' |
|
} |
|
self.btn_config_wait = { |
|
'text': 'Connecting', |
|
'background': 'SystemButtonFace', |
|
'activebackground': 'SystemButtonFace', |
|
'state': 'disabled' |
|
} |
|
self.btn_config_ready = { |
|
'text': 'Run Comands', |
|
'background': 'green', |
|
'activebackground': 'green', |
|
'state': 'normal' |
|
} |
|
self.btn_config_running = { |
|
'text': 'Stop Comands', |
|
'background': 'red', |
|
'activebackground': 'red', |
|
'state': 'active' |
|
} |
|
ttk.Separator(self, orient=tk.HORIZONTAL).pack(side=tk.TOP, fill=tk.X) |
|
self.btn_go = tk.Button( |
|
self, |
|
command=self._parent.go_button_clicked, |
|
**self.btn_config_disabled |
|
) |
|
self.btn_go.pack(side=tk.LEFT, pady=5, padx=5) |
|
self.btn_quit = tk.Button(self, text="Quit", command=self._parent.quit) |
|
self.btn_quit.pack(side=tk.RIGHT, pady=5, padx=5) |
|
self.pack(side=tk.BOTTOM, fill=tk.X) |
|
|
|
def update_button_state(self): |
|
''' update the button state reflecting the state of the Arduino ''' |
|
if self._parent.arduino_state == ArduinoState.running: |
|
self.btn_go.config(**self.btn_config_running) |
|
elif self._parent.arduino_state == ArduinoState.not_initialized: |
|
self.btn_go.config(**self.btn_config_disabled) |
|
else: |
|
# establishing the connection takes some time, show a 'connecting' |
|
# state and after some time show the 'ready' state |
|
self.btn_go.config(**self.btn_config_wait) |
|
self.after(CONNECT_DELAY, self._enable_button) |
|
self.btn_go.update_idletasks() |
|
|
|
def _enable_button(self): |
|
''' shows the 'ready' state of the button ''' |
|
self.btn_go.config(**self.btn_config_ready) |
|
self.btn_go.update_idletasks() |
|
|
|
|
|
class Application(tk.Frame): |
|
''' The Tkinter application containing the buisiness logic ''' |
|
|
|
def __init__(self, master, available_commands): |
|
''' initialization |
|
|
|
master: |
|
tkinter root object |
|
|
|
available_commands: |
|
list of available commands for the Arduino |
|
''' |
|
master.minsize(height=330, width=300) |
|
tk.Frame.__init__(self, master) |
|
self._master = master |
|
self.pack(fill=tk.BOTH) |
|
|
|
# the layout sections |
|
self.file_panel = FilePanel(self) |
|
self.program_panel = ParsedCommandsPanel(self) |
|
self.action_panel = ActionPanel(self) |
|
|
|
# properties for the buisiness logic |
|
self.available_commands = available_commands |
|
self.scheduled_commands = [] |
|
self.arduino_process = None |
|
self.arduino_lock = None |
|
self._arduino_state = ArduinoState.not_initialized |
|
self.reset() |
|
|
|
@property |
|
def arduino_state(self): |
|
''' property for the current state of the arduino ''' |
|
return self._arduino_state |
|
|
|
@arduino_state.setter |
|
def arduino_state(self, new_state): |
|
''' setter for the arduino state property |
|
|
|
as soon as the property changes, the 'go'-button will be updated |
|
''' |
|
self._arduino_state = new_state |
|
self.action_panel.update_button_state() |
|
|
|
def select_files(self): |
|
''' selecting a new excel file with scheduled_commands ''' |
|
# open the file selection dialog |
|
opts = { |
|
'initialdir': '~/Desktop', |
|
'filetypes': [('Excel Files', '.xlsx')], |
|
'multiple': False} |
|
excel_file = tk.filedialog.askopenfilename(**opts) |
|
# if no excel file is selected, do nothing |
|
if not excel_file: |
|
return |
|
# reset the application |
|
self.reset() |
|
# try to parse the excel file |
|
self.scheduled_commands = parse_excel_file( |
|
excel_file, |
|
self.available_commands |
|
) |
|
if self.scheduled_commands: |
|
# if parsing resultet in a schedule, prepare everything to run |
|
self.arduino_prepare_process() |
|
self.program_panel.set_label_text(os.path.basename(excel_file)) |
|
self.program_panel.show_schedule(self.scheduled_commands) |
|
|
|
def reset(self): |
|
''' reset the application state ''' |
|
# stop any connection to the Arduino |
|
if self.arduino_state in (ArduinoState.running, ArduinoState.connected): |
|
self.arduino_stop_process() |
|
self.action_panel.update_button_state() |
|
self.program_panel.clear_text() |
|
|
|
def go_button_clicked(self): |
|
''' start scheduled commands or stop the execution ''' |
|
if self.arduino_state == ArduinoState.running: |
|
# 'Stop Commands' clicked. |
|
self.arduino_stop_process() |
|
self.arduino_prepare_process() |
|
else: |
|
# 'Run Commands' clicked. |
|
# the connection is already established, we just need to release |
|
# the lock |
|
self.arduino_lock.release() |
|
self.arduino_state = ArduinoState.running |
|
# regular check if the scheduled commands finished |
|
self.after(CHECK_DELAY, self._check_process) |
|
|
|
def _check_process(self): |
|
''' checks if the scheduled commands have already finished ''' |
|
if self.arduino_process: |
|
if self.arduino_process.is_alive(): |
|
# still running, recheck after a short delay |
|
self.after(CHECK_DELAY, self._check_process) |
|
else: |
|
# commands finished |
|
self.arduino_stop_process() |
|
self.arduino_prepare_process() |
|
|
|
def arduino_stop_process(self): |
|
''' stops a connected Arduino ''' |
|
if self.arduino_state != ArduinoState.not_initialized: |
|
# only do this if a connection was established |
|
loop = asyncio.get_event_loop() |
|
loop.stop() |
|
loop.close() |
|
self.arduino_process.terminate() |
|
self.arduino_process = None |
|
self.arduino_lock = None |
|
self.arduino_state = ArduinoState.not_initialized |
|
|
|
def arduino_prepare_process(self): |
|
''' open a subprocess setting up the arduino ''' |
|
# find the com port of the Arduino |
|
try: |
|
com_port = find_arduino_port() |
|
except IOError as e: |
|
messagebox.showerror('COM Port', 'No Arduino found') |
|
return |
|
except serial.SerialException: |
|
messagebox.showerror('COM Port', 'Could not talk to Arduino') |
|
return |
|
# Create a lock and acquire it. This will put the subprocess on hold |
|
# after establishing the connection. As soon as the lock is released, |
|
# the scheduled commands will be run |
|
self.arduino_lock = Lock() |
|
self.arduino_lock.acquire() |
|
# start the subprocess for the Arduino connection |
|
self.arduino_process = Process( |
|
target=arduino_subprocess, |
|
args=( |
|
self.arduino_lock, |
|
com_port, |
|
self.scheduled_commands |
|
) |
|
) |
|
self.arduino_process.start() |
|
self.arduino_state = ArduinoState.connected |
|
|
|
def quit(self): |
|
''' stop the programm ''' |
|
self.arduino_stop_process() |
|
self._master.destroy() |
|
|
|
|
|
def run_application( |
|
available_commands, |
|
title='Arduino Scheduled Commands', |
|
icon=None |
|
): |
|
''' run the gui application |
|
|
|
available_commands: |
|
list of available commands for the Arduino |
|
|
|
title: |
|
window title bar text |
|
|
|
icon: |
|
program icon to use |
|
''' |
|
|
|
# create the Tkinter root and instantiate the application |
|
root = tk.Tk() |
|
root.wm_title(title) |
|
if icon is not None: |
|
root.iconbitmap(icon) |
|
app = Application(master=root, available_commands=available_commands) |
|
|
|
# use app.quit() as callback if the window is closed (with the 'X' button) |
|
# if an Arduino connection is established, the standard callback does not |
|
# stop the subprocess and therfore the application would not quit |
|
root.protocol("WM_DELETE_WINDOW", app.quit) |
|
|
|
# start the application |
|
app.mainloop() |
|
try: |
|
root.destroy() |
|
except tk.TclError: |
|
pass
|
|
|