Holger Frey
5 years ago
15 changed files with 725 additions and 2 deletions
@ -1,3 +1,26 @@
@@ -1,3 +1,26 @@
|
||||
# s3printlog |
||||
|
||||
Generates a report as pdf from a Scienion S3 print log folder |
||||
|
||||
The following packages are required and can be installed via ``pip install -r requirements.txt`` |
||||
|
||||
black |
||||
pandas |
||||
matplotlib < 3.1 |
||||
seaborn |
||||
reportlab |
||||
openpyxl |
||||
pyinstaller |
||||
|
||||
To build the app use the following command: |
||||
|
||||
``pyinstaller --onefile -i images\program_icon.ico "S3 Print Log Report.spec"`` |
||||
|
||||
To have it working on the computer attached to the S3 printer, you need to use a 32 bit python version 3.7 |
||||
|
||||
Hint: If pyinstaller throws a "TypeError", you might need to replace a file due to a bug in pyinstaller. |
||||
- [stackoverflow question on this][1] |
||||
- [fixed file on github][2] |
||||
|
||||
[1]: https://stackoverflow.com/questions/54138898/an-error-for-generating-an-exe-file-using-pyinstaller-typeerror-expected-str |
||||
[2]: https://github.com/Loran425/pyinstaller/commit/14b6e65642e4b07a4358bab278019a48dedf7460 |
||||
|
@ -0,0 +1,36 @@
@@ -0,0 +1,36 @@
|
||||
# -*- mode: python -*- |
||||
|
||||
block_cipher = None |
||||
|
||||
|
||||
a = Analysis( |
||||
["run_gui.py"], |
||||
pathex=["C:\\Users\\Holgi\\Developer\\python-libraries\\s3-print-log"], |
||||
binaries=[], |
||||
datas=[("images\\program_icon.ico", ".")], |
||||
hiddenimports=[], |
||||
hookspath=[], |
||||
runtime_hooks=[], |
||||
excludes=[], |
||||
win_no_prefer_redirects=False, |
||||
win_private_assemblies=False, |
||||
cipher=block_cipher, |
||||
noarchive=False, |
||||
) |
||||
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) |
||||
exe = EXE( |
||||
pyz, |
||||
a.scripts, |
||||
a.binaries, |
||||
a.zipfiles, |
||||
a.datas, |
||||
[], |
||||
name="S3 Print Log Report", |
||||
debug=False, |
||||
bootloader_ignore_signals=False, |
||||
strip=False, |
||||
upx=True, |
||||
runtime_tmpdir=None, |
||||
console=False, |
||||
icon="images\\program_icon.ico", |
||||
) |
After Width: | Height: | Size: 27 KiB |
After Width: | Height: | Size: 27 KiB |
Binary file not shown.
After Width: | Height: | Size: 36 KiB |
@ -0,0 +1,7 @@
@@ -0,0 +1,7 @@
|
||||
black |
||||
pandas |
||||
matplotlib<3.1 |
||||
seaborn |
||||
reportlab |
||||
openpyxl |
||||
pyinstaller |
@ -0,0 +1,6 @@
@@ -0,0 +1,6 @@
|
||||
# import s3printlog |
||||
# s3printlog.run("example 1") |
||||
|
||||
from s3printlog import gui |
||||
|
||||
gui.run() |
@ -0,0 +1,123 @@
@@ -0,0 +1,123 @@
|
||||
import numpy as np |
||||
import matplotlib.pyplot as plt |
||||
import matplotlib.ticker as ticker |
||||
import pandas as pd |
||||
import seaborn as sns |
||||
import pathlib |
||||
|
||||
from .logparser import CheckWhen, CheckResult |
||||
|
||||
# set plotting styles |
||||
sns.set_style("darkgrid") |
||||
sns.set_style( |
||||
"ticks", |
||||
{ |
||||
"legend.frameon": True, |
||||
"xtick.direction": "in", |
||||
"ytick.direction": "in", |
||||
"axes.linewidth": 2, |
||||
}, |
||||
) |
||||
sns.set(rc={"figure.figsize": (12, 6)}) |
||||
sns.set_context("paper") |
||||
|
||||
|
||||
def generate_point_plot(df, figure_index, what, y_limit, y_label, colors, hue_order): |
||||
ax = sns.pointplot( |
||||
data=df, |
||||
x="well", |
||||
y=what, |
||||
hue="when", |
||||
hue_order=hue_order, |
||||
style="when", |
||||
markers=list("X."), |
||||
ax=figure_index, |
||||
style_order=hue_order[::-1], |
||||
palette=colors, |
||||
join=False, |
||||
) |
||||
ax.set_ylim(y_limit) |
||||
ax.set_ylabel(y_label) |
||||
return ax |
||||
|
||||
|
||||
def adjust_common_figure_style(ax): |
||||
show_ticks = [] |
||||
selected_tick_labels = [] |
||||
source_columns = set() |
||||
for i, tick_label in enumerate(ax.get_xticklabels()): |
||||
well = tick_label.get_text() |
||||
column = well[0] |
||||
if column not in source_columns: |
||||
show_ticks.append(i) |
||||
selected_tick_labels.append(well) |
||||
source_columns.add(column) |
||||
|
||||
ax.set_xticks(show_ticks) |
||||
ax.set_xticklabels(selected_tick_labels) |
||||
ax.xaxis.grid(True) |
||||
ax.set_xlabel("") |
||||
|
||||
|
||||
def generate_figure(*args): |
||||
ax = generate_point_plot(*args) |
||||
adjust_common_figure_style(ax) |
||||
|
||||
|
||||
def generate_drop_check_chart(df): |
||||
df_sorted = df.sort_values(by="path", ascending=True) |
||||
colors = sns.palettes.color_palette() |
||||
hue_order = [CheckWhen.PRE_RUN.value, CheckWhen.POST_RUN.value] |
||||
palette = {CheckWhen.PRE_RUN.value: colors[1], CheckWhen.POST_RUN.value: colors[0]} |
||||
|
||||
plt.clf() |
||||
# figsize looks strange, but is fittet for pdf report |
||||
fig, axs = plt.subplots(nrows=3, sharex=True, figsize=(8.75, 8.75)) |
||||
|
||||
generate_figure( |
||||
df_sorted, axs[0], "distance", (0, 400), "Distance [pixels]", palette, hue_order |
||||
) |
||||
generate_figure( |
||||
df_sorted, |
||||
axs[1], |
||||
"traverse", |
||||
(-100, 100), |
||||
"Traverse [pixels]", |
||||
palette, |
||||
hue_order, |
||||
) |
||||
generate_figure( |
||||
df_sorted, axs[2], "volume", (0, 600), "Drop Volume [pl]", palette, hue_order |
||||
) |
||||
axs[-1].set_xlabel("Print Solution Well") |
||||
|
||||
|
||||
def generate_environment_graph(df): |
||||
plt.clf() |
||||
fig, axs = plt.subplots(nrows=2, sharex=True, figsize=(8.5, 5.8)) |
||||
|
||||
ax = sns.lineplot(data=df["temperature"], ax=axs[0]) |
||||
ax.set_ylabel("Temperature [°C]") |
||||
ax.set_ylim((10, 40)) |
||||
ax.set_xlabel("") |
||||
|
||||
ax = sns.lineplot(data=df["humidity"], ax=axs[1]) |
||||
ax.set_ylabel("Humidity [%rH]") |
||||
ax.set_ylim((10, 90)) |
||||
ax.set_xlabel("Date and Time") |
||||
|
||||
|
||||
def find_missing_drops(df): |
||||
mask = df["result"] != "ok" |
||||
missing = df.loc[mask].copy() |
||||
pivot = missing.pivot(index="well", columns="when", values="well") |
||||
if "pre run" not in pivot.columns: |
||||
pivot["pre run"] = np.nan |
||||
if "post run" not in pivot.columns: |
||||
pivot["post run"] = np.nan |
||||
pivot = pivot.fillna("") |
||||
# remove labels for post run fails if there are pre run fails |
||||
pivot["nodups"] = pivot["post run"] |
||||
mask = pivot["pre run"] == pivot["post run"] |
||||
pivot["nodups"][mask] = "" |
||||
return pivot.drop(columns=["post run"]).rename(columns={"nodups": "post run"}) |
@ -0,0 +1,141 @@
@@ -0,0 +1,141 @@
|
||||
import sys |
||||
import tkinter as tk |
||||
import tkinter.ttk as ttk |
||||
|
||||
from pathlib import Path |
||||
from tkinter import filedialog |
||||
|
||||
from .logparser import get_log_files, DROP_CHECK_SUFFIX, ENVIRONMENT_SUFFIX |
||||
from .main import process_log_folder, open_with_default_app |
||||
|
||||
|
||||
if getattr(sys, "frozen", False): |
||||
# we are inside the standalone gui app |
||||
icon_path = Path(sys._MEIPASS) / "program_icon.ico" |
||||
elif __file__: |
||||
# this should be the normal cli thingy |
||||
icon_path = Path(__file__).parent.parent / "images" / "program_icon.ico" |
||||
else: |
||||
# there's something strange |
||||
icon_path = None |
||||
|
||||
|
||||
def validate_folder(folder): |
||||
dir_path = Path(folder) |
||||
drop_checks = get_log_files(dir_path, DROP_CHECK_SUFFIX) |
||||
env_log = get_log_files(dir_path, ENVIRONMENT_SUFFIX) |
||||
if drop_checks and env_log: |
||||
return folder |
||||
else: |
||||
return None |
||||
|
||||
|
||||
class StatusPanel(tk.Frame): |
||||
def __init__(self, parent): |
||||
tk.Frame.__init__(self, parent.master) |
||||
self._parent = parent |
||||
self.label = tk.Label(self, bd=1, relief=tk.SUNKEN, anchor=tk.S) |
||||
self.label.pack(fill=tk.X) |
||||
self.pack(fill=tk.X) |
||||
|
||||
def set_text(self, text): |
||||
self.label.config(text=text) |
||||
self.label.update_idletasks() |
||||
|
||||
def clear_text(self): |
||||
self.set_text("") |
||||
|
||||
|
||||
class FilePanel(tk.Frame): |
||||
def __init__(self, parent): |
||||
tk.Frame.__init__(self, parent.master) |
||||
self._parent = parent |
||||
self.btn_files = tk.Button( |
||||
self, text="Select Folder", command=self._parent.select_folder |
||||
) |
||||
self.btn_files.pack(pady=5, padx=5) |
||||
self.label = tk.Label(self, anchor=tk.CENTER) |
||||
self.label.pack(fill=tk.X, pady=5, padx=5) |
||||
ttk.Separator(self, orient=tk.HORIZONTAL).pack(side=tk.BOTTOM, fill=tk.X) |
||||
self.pack(side=tk.TOP, fill=tk.X) |
||||
|
||||
def set_text(self, text): |
||||
self.label.config(text=text) |
||||
self.label.update_idletasks() |
||||
|
||||
def clear_text(self): |
||||
self.set_text("") |
||||
|
||||
|
||||
class ActionPanel(tk.Frame): |
||||
def __init__(self, parent): |
||||
tk.Frame.__init__(self, parent.master) |
||||
self._parent = parent |
||||
self.btn_quit = tk.Button(self, text="Quit", command=self._parent.quit) |
||||
self.btn_quit.pack(side=tk.LEFT, pady=35, padx=35) |
||||
self.btn_go = tk.Button(self, text="GO!", command=self._parent.generate_report) |
||||
self.btn_go.pack(side=tk.RIGHT, pady=35, padx=35) |
||||
self.pack(fill=tk.X) |
||||
|
||||
def disable(self): |
||||
self.btn_go.config(state="disabled") |
||||
|
||||
def enable(self): |
||||
self.btn_go.config(state="active") |
||||
|
||||
|
||||
class Application(tk.Frame): |
||||
def __init__(self, master): |
||||
master.minsize(height=150, width=400) |
||||
tk.Frame.__init__(self, master) |
||||
self._master = master |
||||
self.selected_folder = None |
||||
self.pack(fill=tk.BOTH) |
||||
self.file_panel = FilePanel(self) |
||||
self.action_panel = ActionPanel(self) |
||||
self.status_panel = StatusPanel(self) |
||||
self.reset() |
||||
|
||||
def reset(self): |
||||
self.status_panel.clear_text() |
||||
self.action_panel.disable() |
||||
|
||||
def select_folder(self): |
||||
self.reset() |
||||
opts = {"initialdir": "~/Desktop", "mustexist": True} |
||||
selection = tk.filedialog.askdirectory(**opts) |
||||
if selection: |
||||
self.selected_folder = validate_folder(selection) |
||||
self.set_active_state() |
||||
|
||||
def set_active_state(self, event=None): |
||||
if self.selected_folder is not None: |
||||
self.file_panel.set_text(self.selected_folder) |
||||
self.action_panel.enable() |
||||
else: |
||||
self.file_panel.set_text("This is not a S3 Print Log Folder") |
||||
self.action_panel.disable() |
||||
|
||||
def quit(self): |
||||
self._master.quit() |
||||
|
||||
def generate_report(self): |
||||
self.status_panel.set_text( |
||||
"Generating report, PDF should be opened in a couple of seconds" |
||||
) |
||||
report_file = process_log_folder(self.selected_folder) |
||||
open_with_default_app(report_file) |
||||
self.status_panel.set_text("Report Generated.") |
||||
|
||||
|
||||
def run(): |
||||
root = tk.Tk() |
||||
root.title("S3 Print Log Report") |
||||
if icon_path is not None: |
||||
root.iconbitmap(str(icon_path)) |
||||
app = Application(master=root) |
||||
app.mainloop() |
||||
try: |
||||
root.destroy() |
||||
except tk.TclError: |
||||
pass |
@ -0,0 +1,147 @@
@@ -0,0 +1,147 @@
|
||||
# basic imports |
||||
import numpy as np |
||||
import matplotlib.pyplot as plt |
||||
import matplotlib.ticker as ticker |
||||
import pandas as pd |
||||
import seaborn as sns |
||||
import pathlib |
||||
|
||||
from collections import namedtuple |
||||
from enum import Enum |
||||
from io import StringIO |
||||
|
||||
DROP_CHECK_SUFFIX = ".cor" |
||||
ENVIRONMENT_SUFFIX = "_Logfile.log" |
||||
|
||||
|
||||
class CheckWhen(Enum): |
||||
PRE_RUN = "pre run" |
||||
POST_RUN = "post run" |
||||
|
||||
|
||||
class CheckResult(Enum): |
||||
OK = "ok" |
||||
FAIL = "fail" |
||||
SKIPPED = "skipped" |
||||
|
||||
|
||||
class LogResult: |
||||
def __init__( |
||||
self, |
||||
path, |
||||
well, |
||||
result, |
||||
distance=np.nan, |
||||
traverse=np.nan, |
||||
volume=np.nan, |
||||
when=None, |
||||
): |
||||
self.well = well |
||||
self.path = path |
||||
self.result = result |
||||
self.distance = distance |
||||
self.traverse = traverse |
||||
self.volume = volume |
||||
self.when = when |
||||
|
||||
def as_dict(self): |
||||
return { |
||||
"well": self.well, |
||||
"path": self.path, |
||||
"result": self.result.value, |
||||
"distance": self.distance, |
||||
"traverse": self.traverse, |
||||
"volume": self.volume, |
||||
"when": self.when.value, |
||||
} |
||||
|
||||
@classmethod |
||||
def from_file(cls, path, encoding="iso-8859-1"): |
||||
with open(path, "r", encoding=encoding) as file_handle: |
||||
lines = file_handle.readlines() |
||||
|
||||
# get x and y values, will be distance and traverse |
||||
xy_line = lines[1] |
||||
x_part, y_part = xy_line.split("\t") |
||||
x = parse_str_value(x_part, float) |
||||
y = parse_str_value(y_part, float) |
||||
|
||||
# get other data values |
||||
for line in lines: |
||||
if line.startswith("Well"): |
||||
well = parse_log_line(line, str) |
||||
if well.startswith("1"): |
||||
# the source plate number is encoded, we remove it, |
||||
# our printers have only one source plate |
||||
well = well[1:] |
||||
elif line.startswith("Drop Volume"): |
||||
volume = parse_log_line(line, float) |
||||
|
||||
# check for status |
||||
if path.stem.lower().endswith("ok"): |
||||
return cls( |
||||
path, well, CheckResult.OK, distance=x, traverse=y, volume=volume |
||||
) |
||||
else: |
||||
return cls(path, well, CheckResult.FAIL) |
||||
|
||||
|
||||
# helper functions |
||||
|
||||
|
||||
def parse_str_value(str_data, cast_to, default_value=np.nan): |
||||
try: |
||||
return cast_to(str_data.strip()) |
||||
except Exception: |
||||
return default_value |
||||
|
||||
|
||||
def parse_log_line(line, cast_to, default_value=np.nan, separator="="): |
||||
_, str_data = line.rsplit(separator, 1) |
||||
return parse_str_value(str_data, cast_to, default_value) |
||||
|
||||
|
||||
def get_log_files(folder, suffix=".cor"): |
||||
visible = (p for p in folder.iterdir() if not p.name.startswith(".")) |
||||
return [p for p in visible if p.name.endswith(suffix)] |
||||
|
||||
|
||||
def parse_log_files(log_list): |
||||
pre_run = dict() |
||||
post_run = dict() |
||||
well_list = list() |
||||
# use the files sorted by date and time |
||||
for path in sorted(log_list): |
||||
log_result = LogResult.from_file(path) |
||||
if log_result.well not in pre_run: |
||||
log_result.when = CheckWhen.PRE_RUN |
||||
pre_run[log_result.well] = log_result |
||||
# we keep a separate list of wells in the order they appear |
||||
# there might be skipped wells after the pre run check |
||||
well_list.append(log_result.well) |
||||
else: |
||||
log_result.when = CheckWhen.POST_RUN |
||||
post_run[log_result.well] = log_result |
||||
|
||||
skipped_runs = {well for well in pre_run if well not in post_run} |
||||
for well in skipped_runs: |
||||
post_result = LogResult("", well, CheckResult.SKIPPED, when=CheckWhen.POST_RUN) |
||||
post_run[well] = post_result |
||||
|
||||
parsed_files = [] |
||||
for well in well_list: |
||||
parsed_files.append(pre_run[well]) |
||||
parsed_files.append(post_run[well]) |
||||
|
||||
return pd.DataFrame([pf.as_dict() for pf in parsed_files]) |
||||
|
||||
|
||||
def parse_environment_log(log_file_path): |
||||
with open(log_file_path, "r", encoding="iso-8859-1") as file_handle: |
||||
env_lines = [l for l in file_handle if "\tHumidity=\t" in l] |
||||
buff = StringIO("".join(env_lines)) |
||||
columns = ["datetime", "garbage 1", "humidity", "garbage 2", "temperature"] |
||||
df = pd.read_csv( |
||||
buff, sep="\t", header=None, names=columns, index_col=0, parse_dates=True |
||||
) |
||||
return df.drop(columns=["garbage 1", "garbage 2"]) |
@ -0,0 +1,81 @@
@@ -0,0 +1,81 @@
|
||||
import matplotlib.pyplot as plt |
||||
import os |
||||
import pathlib |
||||
import subprocess |
||||
import sys |
||||
import warnings |
||||
|
||||
from collections import namedtuple |
||||
|
||||
from .analysis import ( |
||||
generate_drop_check_chart, |
||||
generate_environment_graph, |
||||
find_missing_drops, |
||||
) |
||||
from .logparser import ( |
||||
get_log_files, |
||||
parse_log_files, |
||||
parse_environment_log, |
||||
DROP_CHECK_SUFFIX, |
||||
ENVIRONMENT_SUFFIX, |
||||
) |
||||
from .report import generate_report |
||||
|
||||
|
||||
class NoLogFileError(IOError): |
||||
pass |
||||
|
||||
|
||||
ProcessResult = namedtuple("ProcessResult", ["data_frame", "file_path"]) |
||||
DropProcessResult = namedtuple("DropProcessResult", ["drops", "missing"]) |
||||
|
||||
|
||||
def process_drop_checks(folder): |
||||
drop_log_paths = get_log_files(folder, suffix=DROP_CHECK_SUFFIX) |
||||
if len(drop_log_paths) == 0: |
||||
raise NoLogFileError("Drop Check Files Not Found") |
||||
drop_log_df = parse_log_files(drop_log_paths) |
||||
|
||||
generate_drop_check_chart(drop_log_df) |
||||
image_path = folder / "Drop Check.png" |
||||
plt.savefig(image_path) |
||||
|
||||
missing_drop_df = find_missing_drops(drop_log_df) |
||||
misssing_drop_list_path = folder / "Missed spots.xlsx" |
||||
missing_drop_df.to_excel(misssing_drop_list_path) |
||||
|
||||
return DropProcessResult( |
||||
ProcessResult(drop_log_df, image_path), |
||||
ProcessResult(missing_drop_df, image_path), |
||||
) |
||||
|
||||
|
||||
def process_environment(folder): |
||||
env_log_paths = get_log_files(folder, suffix=ENVIRONMENT_SUFFIX) |
||||
if len(env_log_paths) != 1: |
||||
raise NoLogFileError("Log File Not Found") |
||||
env_log_df = parse_environment_log(env_log_paths[0]) |
||||
|
||||
generate_environment_graph(env_log_df) |
||||
image_path = folder / "Environment.png" |
||||
plt.savefig(image_path) |
||||
|
||||
return ProcessResult(env_log_df, image_path) |
||||
|
||||
|
||||
def process_log_folder(folder): |
||||
with warnings.catch_warnings(): |
||||
warnings.simplefilter("ignore") |
||||
folder = pathlib.Path(folder) |
||||
drops, missing = process_drop_checks(folder) |
||||
environment = process_environment(folder) |
||||
return generate_report(folder, drops, missing, environment) |
||||
|
||||
|
||||
def open_with_default_app(some_path): |
||||
if sys.platform.startswith("linux"): |
||||
subprocess.call(["xdg-open", some_path]) |
||||
elif sys.platform.startswith("darwin"): |
||||
subprocess.call(["open", some_path]) |
||||
elif sys.platform.startswith("win"): |
||||
os.startfile(some_path) |
@ -0,0 +1,149 @@
@@ -0,0 +1,149 @@
|
||||
import io |
||||
import PIL |
||||
|
||||
from collections import namedtuple |
||||
from itertools import zip_longest |
||||
from reportlab.pdfgen.canvas import Canvas |
||||
from reportlab.lib.pagesizes import A4 |
||||
from reportlab.lib.styles import getSampleStyleSheet |
||||
from reportlab.lib.units import inch, cm |
||||
from reportlab.platypus import ( |
||||
Flowable, |
||||
Image, |
||||
KeepTogether, |
||||
PageBreak, |
||||
Paragraph, |
||||
Spacer, |
||||
SimpleDocTemplate, |
||||
) |
||||
|
||||
|
||||
ImageBuffer = namedtuple("ImageBuffer", ["buffer", "width", "height"]) |
||||
FailedDropImage = namedtuple("FailedDropImage", ["path", "well"]) |
||||
|
||||
styles = getSampleStyleSheet() |
||||
style_n = styles["Normal"] |
||||
style_h1 = styles["Heading1"] |
||||
style_h2 = styles["Heading2"] |
||||
|
||||
|
||||
class DropPictures(Flowable): |
||||
"""A row of drop pictures flowable.""" |
||||
|
||||
def __init__(self, pictures, xoffset=0): |
||||
self.pictures = filter(None, pictures) |
||||
self.xoffset = xoffset |
||||
self.size = 3.75 * cm |
||||
self.offsets = [0 * cm, 6 * cm, 12 * cm] |
||||
|
||||
def wrap(self, *args): |
||||
return (self.xoffset, self.size + 1 * cm) |
||||
|
||||
def draw(self): |
||||
canvas = self.canv |
||||
for offset, picture in zip(self.offsets, self.pictures): |
||||
canvas.drawImage(picture.path, offset, 0, width=5 * cm, height=self.size) |
||||
canvas.drawString(offset + 0.5 * cm, 3.0 * cm, picture.well) |
||||
|
||||
|
||||
def trim_image(image_path): |
||||
original = PIL.Image.open(image_path) |
||||
background = PIL.Image.new(original.mode, original.size, original.getpixel((0, 0))) |
||||
diff = PIL.ImageChops.difference(original, background) |
||||
diff = PIL.ImageChops.add(diff, diff, 2.0, -100) |
||||
bbox = diff.getbbox() |
||||
cropped = original.crop(bbox) if bbox else original |
||||
buffer = io.BytesIO() |
||||
cropped.save(buffer, format="png") |
||||
return ImageBuffer(buffer, cropped.width, cropped.height) |
||||
|
||||
|
||||
def scaled_image_flowable(image_path, width=17 * cm): |
||||
image_buffer = trim_image(image_path) |
||||
height = (width / image_buffer.width) * image_buffer.height |
||||
return Image(image_buffer.buffer, width=width, height=height) |
||||
|
||||
|
||||
def get_failed_drop_images(drops, missing, when): |
||||
mask = drops.data_frame["when"] == when |
||||
partial_df = drops.data_frame[mask] |
||||
|
||||
mask = partial_df["result"] == "fail" |
||||
failed_df = partial_df[mask] |
||||
|
||||
missing_wells = missing.data_frame[when] |
||||
mask = failed_df["well"].isin(missing_wells) |
||||
|
||||
failed_images = [ |
||||
FailedDropImage(item.path.with_suffix(".jpg"), item.well) |
||||
for item in failed_df[mask].itertuples() |
||||
] |
||||
return sorted(failed_images) |
||||
|
||||
|
||||
def graph_flowable(title, file_path): |
||||
section = [ |
||||
Paragraph(title, style_h2), |
||||
Spacer(width=17 * cm, height=0.5 * cm), |
||||
scaled_image_flowable(file_path), |
||||
] |
||||
return KeepTogether(section) |
||||
|
||||
|
||||
def failed_drops_flowable(drops, missing, what): |
||||
failed_images = get_failed_drop_images(drops, missing, what) |
||||
|
||||
if len(failed_images) == 0: |
||||
# no images to display here, we return early |
||||
return [] |
||||
|
||||
what_title = what.capitalize() |
||||
section = [PageBreak(), Paragraph(f"Failed Drop Check: {what_title}", style_h2)] |
||||
|
||||
# group three images together |
||||
failed_iterator = iter(failed_images) |
||||
failed_groups = zip_longest(failed_iterator, failed_iterator, failed_iterator) |
||||
|
||||
for group in failed_groups: |
||||
section.append(DropPictures(group)) |
||||
|
||||
return section |
||||
|
||||
|
||||
def generate_report(folder, drops, missing, environment): |
||||
|
||||
story = [] |
||||
|
||||
start = environment.data_frame.index.min() |
||||
start_str = start.strftime("%Y-%m-%d %H:%m") |
||||
end = environment.data_frame.index.max() |
||||
end_str = end.strftime("%Y-%m-%d %H:%m") |
||||
headline = Paragraph(f"Print {start_str} - {end_str}", style_h1) |
||||
story.append(headline) |
||||
story.append(Spacer(width=17 * cm, height=0.5 * cm)) |
||||
|
||||
story.append(graph_flowable("Drop Check Graphs", drops.file_path)) |
||||
|
||||
story.extend(failed_drops_flowable(drops, missing, "pre run")) |
||||
story.extend(failed_drops_flowable(drops, missing, "post run")) |
||||
|
||||
if len(story) == 3: |
||||
# no failed drop checks where reported |
||||
story.append(Spacer(width=17 * cm, height=1 * cm)) |
||||
story.append(Paragraph("No failed drop checks found.", style_n)) |
||||
|
||||
story.append(PageBreak()) |
||||
story.append(graph_flowable("Environment Graphs", environment.file_path)) |
||||
|
||||
pdf_path = folder / "print_report.pdf" |
||||
doc = SimpleDocTemplate( |
||||
str(pdf_path), |
||||
pagesize=A4, |
||||
leftMargin=2 * cm, |
||||
rightMargin=2 * cm, |
||||
topMargin=2 * cm, |
||||
bottomMargin=2 * cm, |
||||
) |
||||
doc.build(story) |
||||
|
||||
return pdf_path |
Loading…
Reference in new issue