Browse Source

Rewrite, adaptive to scienion software versions

sx_rewrite
Holger Frey 6 years ago
parent
commit
74022e4c9f
  1. 7
      .gitignore
  2. 2
      s3printlog/__init__.py
  3. 127
      s3printlog/analysis.py
  4. 153
      s3printlog/graphs.py
  5. 5
      s3printlog/gui.py
  6. 201
      s3printlog/logparser.py
  7. 107
      s3printlog/main.py
  8. 243
      s3printlog/parsers.py
  9. 149
      s3printlog/report.py
  10. 72
      s3printlog/utils.py
  11. 7
      test.py

7
.gitignore vendored

@ -1,10 +1,5 @@ @@ -1,10 +1,5 @@
# example data
example 1/
example 2/
example 3/
example fail 1/
example fail 2/
example fail 3/
example data/
# ---> Python
# Byte-compiled / optimized / DLL files

2
s3printlog/__init__.py

@ -1,2 +0,0 @@ @@ -1,2 +0,0 @@
from . import gui
from . import main

127
s3printlog/analysis.py

@ -1,127 +0,0 @@ @@ -1,127 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
import seaborn as sns
import pathlib
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
from .logparser import CheckWhen, CheckResult
# set plotting styles
sns.set_style("darkgrid")
sns.set_style(
"ticks",
{
"legend.frameon": True,
"xtick.direction": "in",
"ytick.direction": "in",
"axes.linewidth": 2,
},
)
sns.set(rc={"figure.figsize": (12, 6)})
sns.set_context("paper")
def generate_point_plot(df, figure_index, what, y_limit, y_label, colors, hue_order):
ax = sns.pointplot(
data=df,
x="well",
y=what,
hue="when",
hue_order=hue_order,
style="when",
markers=list("X."),
ax=figure_index,
style_order=hue_order[::-1],
palette=colors,
join=False,
)
ax.set_ylim(y_limit)
ax.set_ylabel(y_label)
return ax
def adjust_common_figure_style(ax):
show_ticks = []
selected_tick_labels = []
source_columns = set()
for i, tick_label in enumerate(ax.get_xticklabels()):
well = tick_label.get_text()
column = well[0]
if column not in source_columns:
show_ticks.append(i)
selected_tick_labels.append(well)
source_columns.add(column)
ax.set_xticks(show_ticks)
ax.set_xticklabels(selected_tick_labels)
ax.xaxis.grid(True)
ax.set_xlabel("")
def generate_figure(*args):
ax = generate_point_plot(*args)
adjust_common_figure_style(ax)
def generate_drop_check_chart(df):
df_sorted = df.sort_values(by="path", ascending=True)
colors = sns.palettes.color_palette()
hue_order = [CheckWhen.PRE_RUN.value, CheckWhen.POST_RUN.value]
palette = {CheckWhen.PRE_RUN.value: colors[1], CheckWhen.POST_RUN.value: colors[0]}
plt.clf()
# figsize looks strange, but is fittet for pdf report
fig, axs = plt.subplots(nrows=3, sharex=True, figsize=(8.75, 8.75))
generate_figure(
df_sorted, axs[0], "distance", (0, 400), "Distance [pixels]", palette, hue_order
)
generate_figure(
df_sorted,
axs[1],
"traverse",
(-100, 100),
"Traverse [pixels]",
palette,
hue_order,
)
generate_figure(
df_sorted, axs[2], "volume", (0, 600), "Drop Volume [pl]", palette, hue_order
)
axs[-1].set_xlabel("Print Solution Well")
def generate_environment_graph(df):
plt.clf()
fig, axs = plt.subplots(nrows=2, sharex=True, figsize=(8.5, 5.8))
ax = sns.lineplot(data=df["temperature"], ax=axs[0])
ax.set_ylabel("Temperature [°C]")
ax.set_ylim((10, 40))
ax.set_xlabel("")
ax = sns.lineplot(data=df["humidity"], ax=axs[1])
ax.set_ylabel("Humidity [%rH]")
ax.set_ylim((10, 90))
ax.set_xlabel("Date and Time")
def find_missing_drops(df):
mask = df["result"] != "ok"
missing = df.loc[mask].copy()
pivot = missing.pivot(index="well", columns="when", values="well")
if "pre run" not in pivot.columns:
pivot["pre run"] = np.nan
if "post run" not in pivot.columns:
pivot["post run"] = np.nan
pivot = pivot.fillna("")
# remove labels for post run fails if there are pre run fails
pivot["nodups"] = pivot["post run"]
mask = pivot["pre run"] == pivot["post run"]
pivot["nodups"][mask] = ""
return pivot.drop(columns=["post run"]).rename(columns={"nodups": "post run"})

153
s3printlog/graphs.py

@ -0,0 +1,153 @@ @@ -0,0 +1,153 @@
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas
import pathlib
import seaborn
from pandas.plotting import register_matplotlib_converters
from collections import namedtuple
register_matplotlib_converters()
# set plotting styles
seaborn.set_style("darkgrid")
seaborn.set_style(
"ticks",
{
"legend.frameon": True,
"xtick.direction": "in",
"ytick.direction": "in",
"axes.linewidth": 2,
},
)
seaborn.set(rc={"figure.figsize": (12, 6)})
seaborn.set_context("paper")
GraphPaths = namedtuple("GraphPaths", ["environment", "drops"])
def save_plot(data, label, suffix=".png"):
if not suffix.startswith("."):
suffix = f".{suffix}"
folder = data.files.folder
path = folder / f"{folder.name}_{label}{suffix}"
plt.savefig(path)
return path
def generate_environment_graph(data):
dataframe = data.print.environment
plt.clf()
fig, axs = plt.subplots(nrows=2, sharex=True, figsize=(8.5, 5.8))
ax = seaborn.lineplot(data=dataframe["temperature"], ax=axs[0])
ax.set_ylabel("Temperature [°C]")
ax.set_ylim((10, 40))
ax.set_xlabel("")
ax = seaborn.lineplot(data=dataframe["humidity"], ax=axs[1])
ax.set_ylabel("Humidity [%rH]")
ax.set_ylim((10, 90))
ax.set_xlabel("Date / Time")
return save_plot(data, "environment")
def _drop_point_plot(df, figure_index, what, y_limit, y_label, colors, hue_order):
ax = seaborn.pointplot(
data=df,
x="well",
y=what,
hue="measurement",
hue_order=hue_order,
style="measurement",
markers=list("X."),
ax=figure_index,
style_order=hue_order[::-1],
palette=colors,
join=False,
)
ax.set_ylim(y_limit)
ax.set_ylabel(y_label)
return ax
def _drop_figure_styles(ax):
show_ticks = []
selected_tick_labels = []
source_columns = set()
for i, tick_label in enumerate(ax.get_xticklabels()):
well = tick_label.get_text()
column = well[0]
if column not in source_columns:
show_ticks.append(i)
selected_tick_labels.append(well)
source_columns.add(column)
ax.set_xticks(show_ticks)
ax.set_xticklabels(selected_tick_labels)
ax.xaxis.grid(True)
ax.set_xlabel("")
def _make_drop_figure(*args):
ax = _drop_point_plot(*args)
_drop_figure_styles(ax)
def generate_drop_graph(data, nozzle):
# select the data of the nozlle
selection = data.drops["nozzle"] == nozzle
nozzle_df = data.drops[selection]
sorted_df = nozzle_df.sort_values(by="when", ascending=True)
# setup some parameters
colors = seaborn.palettes.color_palette()
hue_order = ["pre run", "post run"]
palette = {"pre run": colors[1], "post run": colors[0]}
settings = data.print.graph_settings
plt.clf()
# figsize looks strange, but is fittet for pdf report
fig, axs = plt.subplots(nrows=3, sharex=True, figsize=(8.75, 8.75))
_make_drop_figure(
sorted_df,
axs[0],
"distance",
(settings.distance.min, settings.distance.max),
settings.distance.label,
palette,
hue_order,
)
_make_drop_figure(
sorted_df,
axs[1],
"offset",
(settings.offset.min, settings.offset.max),
settings.offset.label,
palette,
hue_order,
)
_make_drop_figure(
sorted_df,
axs[2],
"volume",
(settings.volume.min, settings.volume.max),
settings.volume.label,
palette,
hue_order,
)
axs[-1].set_xlabel("Print Solution Well")
return save_plot(data, f"nozzle_{nozzle}")
def generate_all_graphs(data):
env_graph = generate_environment_graph(data)
nozzles = data.drops["nozzle"].unique()
drop_graphs = {n: generate_drop_graph(data, n) for n in nozzles}
return GraphPaths(env_graph, drop_graphs)

5
s3printlog/gui.py

@ -5,7 +5,8 @@ import tkinter.ttk as ttk @@ -5,7 +5,8 @@ import tkinter.ttk as ttk
from pathlib import Path
from tkinter import filedialog
from .main import get_log_files, process_log_files, open_with_default_app
from .main import process_log_files
from .utils import find_log_files, open_with_default_app
if getattr(sys, "frozen", False):
@ -98,7 +99,7 @@ class Application(tk.Frame): @@ -98,7 +99,7 @@ class Application(tk.Frame):
opts = {"initialdir": initial_dir, "mustexist": True}
selection = tk.filedialog.askdirectory(**opts)
if selection:
self.log_files = get_log_files(selection)
self.log_files = find_log_files(selection)
self.set_active_state()
def set_active_state(self, event=None):

201
s3printlog/logparser.py

@ -1,201 +0,0 @@ @@ -1,201 +0,0 @@
# basic imports
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
import seaborn as sns
import pathlib
from collections import namedtuple
from enum import Enum
from io import StringIO
PrintLogResult = namedtuple("PrintLogResult", ["environment", "info"])
class CheckWhen(Enum):
PRE_RUN = "pre run"
POST_RUN = "post run"
class CheckResult(Enum):
OK = "ok"
FAIL = "fail"
SKIPPED = "skipped"
class DropCheckResult:
def __init__(
self,
path,
well,
result,
distance=np.nan,
traverse=np.nan,
volume=np.nan,
when=None,
):
self.well = well
self.path = path
self.result = result
self.distance = distance
self.traverse = traverse
self.volume = volume
self.when = when
def as_dict(self):
return {
"well": self.well,
"path": self.path,
"result": self.result.value,
"distance": self.distance,
"traverse": self.traverse,
"volume": self.volume,
"when": self.when.value,
}
@classmethod
def from_file(cls, path, encoding="iso-8859-1"):
with open(path, "r", encoding=encoding) as file_handle:
lines = file_handle.readlines()
# get x and y values, will be distance and traverse
xy_line = lines[1]
x_part, y_part = xy_line.split("\t")
x = parse_str_value(x_part, float)
y = parse_str_value(y_part, float)
# get other data values
for line in lines:
if line.startswith("Well"):
well = parse_log_line(line, str)
if well.startswith("1"):
# the source plate number is encoded, we remove it,
# our printers have only one source plate
well = well[1:]
elif line.startswith("Drop Volume"):
volume = parse_log_line(line, float)
# check for status
if path.stem.lower().endswith("ok"):
return cls(
path, well, CheckResult.OK, distance=x, traverse=y, volume=volume
)
else:
return cls(path, well, CheckResult.FAIL)
# helper functions
def parse_str_value(str_data, cast_to, default_value=np.nan):
try:
return cast_to(str_data.strip())
except Exception:
return default_value
def parse_log_line(line, cast_to, default_value=np.nan, separator="="):
_, str_data = line.rsplit(separator, 1)
return parse_str_value(str_data, cast_to, default_value)
def parse_log_files(log_list):
pre_run = dict()
post_run = dict()
well_list = list()
# use the files sorted by date and time
for path in sorted(log_list):
log_result = DropCheckResult.from_file(path)
if log_result.well not in pre_run:
log_result.when = CheckWhen.PRE_RUN
pre_run[log_result.well] = log_result
# we keep a separate list of wells in the order they appear
# there might be skipped wells after the pre run check
well_list.append(log_result.well)
else:
log_result.when = CheckWhen.POST_RUN
post_run[log_result.well] = log_result
skipped_runs = {well for well in pre_run if well not in post_run}
for well in skipped_runs:
post_result = DropCheckResult(
"", well, CheckResult.SKIPPED, when=CheckWhen.POST_RUN
)
post_run[well] = post_result
parsed_files = []
for well in well_list:
parsed_files.append(pre_run[well])
parsed_files.append(post_run[well])
return pd.DataFrame([pf.as_dict() for pf in parsed_files])
def split_print_log_line(line):
_, value = line.split(":", 1)
return value.strip()
def count_solutions(file_handle):
solutions = set()
for line in file_handle:
line = line.strip()
if not line or line[0] in ("X", "Y", "F", "["):
# empty line or uninteresting one, pick next one
continue
elif line.startswith("Drops/Field"):
# finished with all field definition, leave loop
break
entries = (item.strip() for item in line.split("\t"))
wells = (well for well in entries if well)
solutions.update(wells)
return len(solutions)
def parse_print_log(log_files):
env_lines = []
print_info = {}
with open(log_files, "r", encoding="iso-8859-1") as file_handle:
for line in file_handle:
if "\tHumidity=\t" in line:
env_lines.append(line)
elif line.startswith("Probe:"):
print_info["source"] = split_print_log_line(line)
elif line.startswith("Target:"):
target_and_fields = split_print_log_line(line)
target, fields = target_and_fields.rsplit(":", 1)
print_info["target"] = target.strip()
print_info["fields"] = len(fields.split(","))
elif line.startswith("Humidity:"):
print_info["humidity"] = split_print_log_line(line)
elif line.startswith("Run Name:"):
print_info["run"] = split_print_log_line(line)
elif line.startswith("Dot Pitch:"):
# important to pass the filehandle iterator here
print_info["solutions"] = count_solutions(file_handle)
buff = StringIO("".join(env_lines))
columns = ["datetime", "garbage 1", "humidity", "garbage 2", "temperature"]
tmp_df = pd.read_csv(
buff, sep="\t", header=None, names=columns, index_col=0, parse_dates=True
)
environment_df = tmp_df.drop(columns=["garbage 1", "garbage 2"])
return PrintLogResult(environment_df, print_info)
def augment_print_info(print_log_result, drop_log_list, encoding="iso-8859-1"):
""" gets voltage and pulse from a drop log file
Since the voltage and pulse should not change during a print run,
we add this information to the print log info
"""
one_log_file = drop_log_list[0]
with open(one_log_file, "r", encoding=encoding) as file_handle:
for line in file_handle:
if line.startswith("Nozzle Voltage"):
print_log_result.info["voltage"] = parse_log_line(line, str)
elif line.startswith("Nozzle Pulse"):
print_log_result.info["pulse"] = parse_log_line(line, str)
return print_log_result

107
s3printlog/main.py

@ -1,106 +1,17 @@ @@ -1,106 +1,17 @@
import matplotlib.pyplot as plt
import os
import pathlib
import subprocess
import sys
import warnings
from collections import namedtuple
from .analysis import (
generate_drop_check_chart,
generate_environment_graph,
find_missing_drops,
)
from .logparser import parse_log_files, parse_print_log, augment_print_info
from .report import generate_report
DROP_CHECK_SUFFIX = ".cor"
ENVIRONMENT_SUFFIX = "_Logfile.log"
DropProcessResult = namedtuple("DropProcessResult", ["drops", "missing"])
PrintLogResult = namedtuple("PrintLogResult", ["environment", "info"])
ProcessResult = namedtuple("ProcessResult", ["data_frame", "file_path"])
class LogFiles(namedtuple("LogFiles", ["folder", "drop_check", "environment"])):
__slots__ = ()
def __bool__(self):
if self.drop_check and self.environment:
return True
else:
return False
class NoLogFileError(IOError):
pass
def get_log_files(folder):
folder = pathlib.Path(folder)
visible = [p for p in folder.iterdir() if not p.name.startswith(".")]
drop_files = [p for p in visible if p.name.endswith(DROP_CHECK_SUFFIX)]
env_files = [p for p in visible if p.name.endswith(ENVIRONMENT_SUFFIX)]
if len(env_files) != 1:
env_files = [None]
return LogFiles(folder, drop_files, env_files[0])
def process_drop_checks(log_files):
drop_log_df = parse_log_files(log_files.drop_check)
generate_drop_check_chart(drop_log_df)
image_path = log_files.folder / f"{log_files.folder.name}_drop_check.png"
plt.savefig(image_path)
missing_drop_df = find_missing_drops(drop_log_df)
misssing_drop_list_path = (
log_files.folder / f"{log_files.folder.name}_missed_spots.xlsx"
)
missing_drop_df.to_excel(misssing_drop_list_path)
return DropProcessResult(
ProcessResult(drop_log_df, image_path),
ProcessResult(missing_drop_df, image_path),
)
def process_print_log(log_files):
print_log = parse_print_log(log_files.environment)
generate_environment_graph(print_log.environment)
image_path = log_files.folder / f"{log_files.folder.name}_environment.png"
plt.savefig(image_path)
tmp_result = PrintLogResult(
ProcessResult(print_log.environment, image_path), print_log.info
)
return augment_print_info(tmp_result, log_files.drop_check)
from . import utils
from . import parsers
from . import graphs
from . import report
def process_log_files(log_files):
drop_check_result = process_drop_checks(log_files)
print_log_result = process_print_log(log_files)
return generate_report(
log_files,
drop_check_result.drops,
drop_check_result.missing,
print_log_result.environment,
print_log_result.info,
)
data = parsers.parse_logs(log_files)
graph_paths = graphs.generate_all_graphs(data)
pdf_path = report.generate_report(data, graph_paths)
return pdf_path
def process_log_folder(folder):
log_files = get_log_files(folder)
log_files = utils.find_log_files(folder)
return process_log_files(log_files)
def open_with_default_app(some_path):
if sys.platform.startswith("linux"):
subprocess.call(["xdg-open", some_path])
elif sys.platform.startswith("darwin"):
subprocess.call(["open", some_path])
elif sys.platform.startswith("win"):
os.startfile(some_path)

243
s3printlog/parsers.py

@ -0,0 +1,243 @@ @@ -0,0 +1,243 @@
import io
import numpy
import pandas
import datetime
from collections import namedtuple
from . import utils
DropStatusInfo = namedtuple("DropStatusInfo", ["when", "status"])
GraphProperties = namedtuple("GraphProperties", ["min", "max", "label"])
GraphSettings = namedtuple("GraphSettings", ["distance", "offset", "volume"])
LogResult = namedtuple("LogResult", ["files", "print", "drops", "statistics"])
Nozzle = namedtuple("Nozzle", ["number", "voltage", "pulse", "drops_failed"])
SoftwareVersion = namedtuple("Version", ["major", "minor", "patch"])
Statistics = namedtuple("Statistics", ["nozzles", "failed_pre_run", "failed_post_run"])
GRAPH_SETTINGS = {
3: GraphSettings(
distance=GraphProperties(min=0, max=400, label="Distance [pixels]"),
offset=GraphProperties(min=-100, max=100, label="Traverse [pixels]"),
volume=GraphProperties(min=0, max=600, label="Volume [pl]"),
),
10: GraphSettings(
distance=GraphProperties(min=0, max=3, label="Speed [m/s]"),
offset=GraphProperties(min=-140, max=140, label="Deviaton [µm]"),
volume=GraphProperties(min=0, max=600, label="Volume [pl]"),
),
}
class PrintLog:
def __init__(self, log_file, printer, version):
# construction parameters
self.log_file = log_file
self.printer = printer
self.software_version = version
# runid is derived from the filename
run_id, _ = log_file.stem.rsplit("_", 1)
self.run_id = run_id
try:
self.graph_settings = GRAPH_SETTINGS[version.major]
except KeyError:
raise ValueError(f"Unknown Scienion Software Version {version.major}")
# common parameters of the print log
self.humidity_setting = None
self.pattern_file = None
self.print_solutions = None
self.run_method = None
self.source_plate = None
self.target_substrate = None
self.target_count = None
# dataframe for humidity and temperature
self.environment = None
def parse(self, filehandle):
self.parse_header(filehandle)
self.parse_source_wells(filehandle)
self.parse_environment(filehandle)
def parse_header(self, iterator):
for line in iterator:
if line.startswith("Field(s):"):
break
parts = line.split(":", 1)
if len(parts) != 2:
continue
key, value = parts[0].strip(), parts[1].strip()
if key == "Probe":
self.source_plate = value
elif key == "Target":
substrate, targets_str = value.split(":")
self.target_substrate = substrate.strip()
self.target_count = len(targets_str.split(","))
elif key.startswith("Pattern File"):
self.pattern_file = value
elif key == "Humidity":
self.humidity_setting = value
elif key == "Run Name":
self.run_method = value
def parse_source_wells(self, iterator):
# first we need to move ahead a little bit
for line in iterator:
if line.startswith("Field "):
break
raw_wells = []
for line in iterator:
if line.startswith("Drops"):
break
line = line.strip()
if line == "" or line[0] in ("F", "["):
continue
else:
raw_wells.extend(line.split("\t"))
stripped = (entry.strip() for entry in raw_wells)
wells = (entry for entry in stripped if entry)
self.print_solutions = len(set(wells))
def parse_environment(self, iterator):
buff = io.StringIO()
for line in iterator:
if "\tHumidity=\t" in line:
buff.write(line)
buff.seek(0)
f = lambda s: datetime.datetime.strptime(s, "%d.%m.%y-%H:%M:%S.%f")
tmp_df = pandas.read_csv(
buff, sep="\t", header=None, index_col=0, parse_dates=True, date_parser=f
)
self.environment = pandas.DataFrame(
{"humidity": tmp_df.iloc[:, 1], "temperature": tmp_df.iloc[:, 3]}
)
def parse_print_log(log_files):
with open(log_files.print, "r", encoding="iso-8859-1") as filehandle:
# parse the printer name
printer_line = next(filehandle)
printer = printer_line.split()[0]
# get the software version info
version_line = next(filehandle)
_, version_info = version_line.split(":", 1)
major, minor, patch, _ = version_info.strip().split(".", 3)
version = SoftwareVersion(int(major), int(minor), int(patch))
log_parser = PrintLog(log_files.print, printer, version)
log_parser.parse(filehandle)
return log_parser
def cast(original, to, default=numpy.nan):
if hasattr(original, "strip"):
original = original.strip()
try:
return to(original)
except:
return default
def parse_value(log_line, to, default=numpy.nan):
_, value = log_line.split("=", 1)
return cast(value, to, default)
def parse_file_name(file_path):
name_parts = [p for p in file_path.stem.split("_") if p]
*_, date, unknown, autodrop, time, info = name_parts
when = date + time # parsing datetime is done in the pandas dataframe
if info.lower().endswith("ok"):
status = utils.DropState.OK
else:
status = utils.DropState.FAULT
return DropStatusInfo(when, status)
def parse_drop_file(file_path):
status_info = parse_file_name(file_path)
data = {
"path": file_path,
"when": status_info.when,
"status": status_info.status.value,
"distance": numpy.nan, # as default value
"offset": numpy.nan, # as default value
"volume": numpy.nan, # as default value
}
with open(file_path, "r", encoding="iso-8859-1") as filehandle:
if status_info.status == utils.DropState.OK:
# only parse distance and offset if it is not a failed check
next(filehandle) # ignore first line
flight_info = next(filehandle)
distance, offset = flight_info.split()
data["distance"] = cast(distance, float)
data["offset"] = cast(offset, float)
for line in filehandle:
if line.startswith("Well"):
well_id = parse_value(line, str)
data["plate"] = cast(well_id[0], int)
data["well"] = well_id[1:]
elif line.startswith("Nozzle No"):
data["nozzle"] = parse_value(line, int)
elif line.startswith("Nozzle Voltage"):
data["voltage"] = parse_value(line, int)
elif line.startswith("Nozzle Pulse"):
data["pulse"] = parse_value(line, int)
elif (
line.startswith("Drop Volume")
and status_info.status == utils.DropState.OK
):
data["volume"] = parse_value(line, int)
data["well_id"] = f"{data['nozzle']}.{well_id}" # nozzle is added for a complete id
return data
def parse_drop_logs(log_files):
collection = (parse_drop_file(f) for f in log_files.drops)
df = pandas.DataFrame(collection)
df["when"] = pandas.to_datetime(df["when"], format="%Y%m%d%H%M%S")
# find the pre run values
grouped = df.groupby("well_id")
pre_run_df = grouped["when"].min().reset_index()
pre_run_df["measurement"] = "pre run"
# merge them back into the dataframe
df = df.merge(pre_run_df, on=["well_id", "when"], how="outer")
# the ones with not set values are post runs
df = df.fillna({"measurement": "post run"})
return df
def collect_statistics(drop_log):
nozzle_df = drop_log.groupby("nozzle").first()
nozzles = []
for nozzle_nr, row in nozzle_df.iterrows():
failures = utils.find_failed_drops(drop_log, nozzle_nr)
nozzles.append(Nozzle(nozzle_nr, row["voltage"], row["pulse"], failures))
total_failures = utils.find_failed_drops(drop_log, nozzle=None)
return Statistics(
nozzles, len(total_failures.pre_run), len(total_failures.post_run)
)
def parse_logs(log_files):
print_log = parse_print_log(log_files)
drop_log = parse_drop_logs(log_files)
stats = collect_statistics(drop_log)
return LogResult(log_files, print_log, drop_log, stats)

149
s3printlog/report.py

@ -18,16 +18,22 @@ from reportlab.platypus import ( @@ -18,16 +18,22 @@ from reportlab.platypus import (
Table,
)
ImageBuffer = namedtuple("ImageBuffer", ["buffer", "width", "height"])
FailedDropImage = namedtuple("FailedDropImage", ["path", "well"])
styles = getSampleStyleSheet()
style_n = styles["Normal"]
style_h1 = styles["Heading1"]
style_h2 = styles["Heading2"]
TABLE_STYLE = [
("TOPPADDING", (0, 0), (-1, -1), 0),
("RIGHTPADDING", (0, 0), (-1, -1), 7),
("BOTTOMPADDING", (0, 0), (-1, -1), 0),
("LEFTPADDING", (0, 0), (-1, -1), 0),
("FONTSIZE", (0, 0), (-1, -1), 8),
]
class DropPictures(Flowable):
"""A row of drop pictures flowable."""
@ -48,6 +54,39 @@ class DropPictures(Flowable): @@ -48,6 +54,39 @@ class DropPictures(Flowable):
canvas.drawString(offset + 0.5 * cm, 3.0 * cm, picture.well)
def print_info_flowable(data):
version = data.print.software_version
content = [
("Printer:", data.print.printer),
("Software version:", f"{version.major}.{version.minor}.{version.patch}"),
(
"Humidity Setting:",
f"{data.print.humidity_setting} (humidifier might be turned off)",
),
("Run Method:", data.print.run_method),
("Source Plate:", data.print.source_plate),
("Print Solutions:", f"{data.print.print_solutions} solutions"),
("Target Substrate:", data.print.target_substrate),
("Number of Targets:", f"{data.print.target_count} targets printed"),
]
if data.print.pattern_file:
content.append(("Pattern File:", data.print.pattern_file))
nozzles = sorted(data.statistics.nozzles)
content.append(("Number of Nozzles:", len(nozzles)))
for nozzle in nozzles:
content.append(
(
f"Settings Nozzle #{nozzle.number}:",
f"{nozzle.voltage}V, {nozzle.pulse}µs",
)
)
content.append(("Failed Drop Checks, Pre Run:", data.statistics.failed_pre_run))
content.append(("Failed Drop Checks, Post Run:", data.statistics.failed_post_run))
return Table(content, style=TABLE_STYLE, hAlign="LEFT")
def trim_image(image_path):
original = PIL.Image.open(image_path)
background = PIL.Image.new(original.mode, original.size, original.getpixel((0, 0)))
@ -67,23 +106,6 @@ def scaled_image_flowable(image_path, width=17 * cm): @@ -67,23 +106,6 @@ def scaled_image_flowable(image_path, width=17 * cm):
return Image(image_buffer.buffer, width=width, height=height)
def get_failed_drop_images(drops, missing, when):
mask = drops.data_frame["when"] == when
partial_df = drops.data_frame[mask]
mask = partial_df["result"] == "fail"
failed_df = partial_df[mask]
missing_wells = missing.data_frame[when]
mask = failed_df["well"].isin(missing_wells)
failed_images = [
FailedDropImage(item.path.with_suffix(".jpg"), item.well)
for item in failed_df[mask].itertuples()
]
return sorted(failed_images)
def graph_flowable(title, file_path):
section = [
Paragraph(title, style_h2),
@ -92,16 +114,29 @@ def graph_flowable(title, file_path): @@ -92,16 +114,29 @@ def graph_flowable(title, file_path):
]
return KeepTogether(section)
def get_failed_drop_images(failed_checks):
return [
FailedDropImage(item.path.with_suffix(".jpg"), item.well)
for item in failed_checks.itertuples()
]
def failed_drops_flowable(drops, missing, what):
failed_images = get_failed_drop_images(drops, missing, what)
def failed_drops_flowable(nozzle, measurement):
if measurement == "Pre Run":
failed_checks = nozzle.drops_failed.pre_run
elif measurement == "Post Run":
failed_checks = nozzle.drops_failed.post_run
else:
raise ValueError(f"Unknown mesurement: {measurement}")
failed_images = get_failed_drop_images(failed_checks)
if len(failed_images) == 0:
# no images to display here, we return early
return []
what_title = what.capitalize()
section = [PageBreak(), Paragraph(f"Failed Drop Check: {what_title}", style_h2)]
section = [
PageBreak(),
Paragraph(f"Failed Drop Images: Nozzle #{nozzle.number}, {measurement}", style_h2)
]
# group three images together
failed_iterator = iter(failed_images)
@ -113,62 +148,50 @@ def failed_drops_flowable(drops, missing, what): @@ -113,62 +148,50 @@ def failed_drops_flowable(drops, missing, what):
return section
def print_info_flowable(print_info):
data = [
("Source Plate:", print_info["source"]),
("Print Solutions:", f"{print_info['solutions']} solutions"),
("Target Substrate:", print_info["target"]),
("Number of Fields:", f"{print_info['fields']} fields printed"),
("Run Method:", print_info["run"]),
("Voltage:", f"{print_info['voltage']} V"),
("Pulse:", f"{print_info['pulse']} µs"),
(
"Humidity Setting:",
f"{print_info['humidity']} (humidifier might be turned off)",
),
]
return Table(
data,
style=[
("TOPPADDING", (0, 0), (-1, -1), 0),
("RIGHTPADDING", (0, 0), (-1, -1), 7),
("BOTTOMPADDING", (0, 0), (-1, -1), 0),
("LEFTPADDING", (0, 0), (-1, -1), 0),
("FONTSIZE", (0, 0), (-1, -1), 8),
],
hAlign="LEFT",
)
def generate_report(log_files, drops, missing, environment, print_info):
def generate_report(data, graphs):
story = []
start = environment.data_frame.index.min()
start = data.print.environment.index.min()
start_str = start.strftime("%Y-%m-%d %H:%m")
end = environment.data_frame.index.max()
end = start = data.print.environment.index.max()
end_str = end.strftime("%Y-%m-%d %H:%m")
headline = Paragraph(f"Print {start_str} - {end_str}", style_h1)
story.append(headline)
story.append(Spacer(width=17 * cm, height=0.5 * cm))
story.append(print_info_flowable(print_info))
story.append(print_info_flowable(data))
story.append(Spacer(width=17 * cm, height=0.5 * cm))
story.append(graph_flowable("Drop Check Graphs", drops.file_path))
story.append(graph_flowable("Environment Graphs", graphs.environment))
story.extend(failed_drops_flowable(drops, missing, "pre run"))
story.extend(failed_drops_flowable(drops, missing, "post run"))
for nozzle in sorted(data.statistics.nozzles):
story.append(PageBreak())
path = graphs.drops[nozzle.number]
story.append(
graph_flowable(f"Drop Check Graphs, Nozzle #{nozzle.number}", path)
)
if len(story) == 5:
# no failed drop checks where reported
story.append(Spacer(width=17 * cm, height=0.5 * cm))
story.append(Paragraph("No failed drop checks found.", style_n))
story.append(PageBreak())
story.append(graph_flowable("Environment Graphs", environment.file_path))
if len(nozzle.drops_failed.pre_run) == 0:
failed_wells_pre_run = "-"
else:
failed_wells_pre_run = ", ".join(nozzle.drops_failed.pre_run["well"])
if len(nozzle.drops_failed.post_run) == 0:
failed_wells_post_run = "-"
else:
failed_wells_post_run = ", ".join(nozzle.drops_failed.post_run["well"])
content = [
("Failed Pre Run Checks:", failed_wells_pre_run),
("Failed Post Run Checks:", failed_wells_post_run),
]
story.append(Table(content, style=TABLE_STYLE, hAlign="LEFT"))
story.extend(failed_drops_flowable(nozzle, "Pre Run"))
story.extend(failed_drops_flowable(nozzle, "Post Run"))
pdf_path = log_files.folder / f"{log_files.folder.name}_report.pdf"
pdf_path = data.files.folder / f"{data.files.folder.name}_report.pdf"
doc = SimpleDocTemplate(
str(pdf_path),
pagesize=A4,

72
s3printlog/utils.py

@ -0,0 +1,72 @@ @@ -0,0 +1,72 @@
import enum
import os
import pathlib
import subprocess
import sys
from collections import namedtuple
FailedWells = namedtuple("FailedWells", ["pre_run", "post_run"])
class DropState(enum.Enum):
OK = "ok"
FAULT = "fault"
class LogFiles(namedtuple("_LogFiles", ["folder", "print", "drops"])):
__slots__ = ()
def __bool__(self):
if self.print and self.drops:
return True
else:
return False
def _find_files(dir_path, endswith):
visible = (i for i in dir_path.iterdir() if not i.name.startswith("."))
return [item for item in visible if item.name.endswith(endswith)]
def find_log_files(folder):
dir_path = pathlib.Path(folder)
tmp_print_log = _find_files(dir_path, "_Logfile.log")
if len(tmp_print_log) == 1:
print_log = tmp_print_log[0]
else:
print_log = None
drop_logs = _find_files(dir_path, ".cor")
return LogFiles(dir_path, print_log, drop_logs)
def get_failed_drop_checks(dataframe, measurement, nozzle=None):
if nozzle is not None:
selection = dataframe["nozzle"] == nozzle
nozzle_df = dataframe[selection]
else:
nozzle_df = dataframe
# select first only the failed rows
selection = nozzle_df["status"] == DropState.FAULT.value
failure_df = nozzle_df[selection]
# selection based on measurement type
selection = failure_df["measurement"] == measurement
return failure_df[selection]
def find_failed_drops(dataframe, nozzle=None):
pre_run_df = get_failed_drop_checks(dataframe, "pre run", nozzle)
all_post_run_df = get_failed_drop_checks(dataframe, "post run", nozzle)
# if a check already failed in the pre run, we exclude it from the post run
selection = all_post_run_df["well_id"].isin(pre_run_df["well_id"])
post_run_df = all_post_run_df[~selection]
return FailedWells(pre_run_df, post_run_df)
def open_with_default_app(some_path):
if sys.platform.startswith("linux"):
subprocess.call(["xdg-open", some_path])
elif sys.platform.startswith("darwin"):
subprocess.call(["open", some_path])
elif sys.platform.startswith("win"):
os.startfile(some_path)

7
test.py

@ -3,9 +3,10 @@ @@ -3,9 +3,10 @@
from s3printlog import main
folder = 'C:/Users/Holgi/Developer/python-libraries/s3printlog/example 1'
#folder = "example 2"
folder = "example 3"
folder = 'C:/Users/Holgi/Developer/python-libraries/s3printlog/example data/example 1'
#folder = "example data/example 2"
#folder = "example data/example 3"
folder = "example data/example sx printer"
print("Generating report, PDF should be opened in a couple of seconds")
report_file = main.process_log_folder(folder)

Loading…
Cancel
Save