diff --git a/.gitignore b/.gitignore index 9f917b4..14370f6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,5 @@ # example data -example 1/ -example 2/ -example 3/ -example fail 1/ -example fail 2/ -example fail 3/ +example data/ # ---> Python # Byte-compiled / optimized / DLL files diff --git a/s3printlog/__init__.py b/s3printlog/__init__.py index 0c42496..e69de29 100644 --- a/s3printlog/__init__.py +++ b/s3printlog/__init__.py @@ -1,2 +0,0 @@ -from . import gui -from . import main diff --git a/s3printlog/analysis.py b/s3printlog/analysis.py deleted file mode 100644 index cdc506f..0000000 --- a/s3printlog/analysis.py +++ /dev/null @@ -1,127 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt -import matplotlib.ticker as ticker -import pandas as pd -import seaborn as sns -import pathlib - -from pandas.plotting import register_matplotlib_converters - -register_matplotlib_converters() - -from .logparser import CheckWhen, CheckResult - -# set plotting styles -sns.set_style("darkgrid") -sns.set_style( - "ticks", - { - "legend.frameon": True, - "xtick.direction": "in", - "ytick.direction": "in", - "axes.linewidth": 2, - }, -) -sns.set(rc={"figure.figsize": (12, 6)}) -sns.set_context("paper") - - -def generate_point_plot(df, figure_index, what, y_limit, y_label, colors, hue_order): - ax = sns.pointplot( - data=df, - x="well", - y=what, - hue="when", - hue_order=hue_order, - style="when", - markers=list("X."), - ax=figure_index, - style_order=hue_order[::-1], - palette=colors, - join=False, - ) - ax.set_ylim(y_limit) - ax.set_ylabel(y_label) - return ax - - -def adjust_common_figure_style(ax): - show_ticks = [] - selected_tick_labels = [] - source_columns = set() - for i, tick_label in enumerate(ax.get_xticklabels()): - well = tick_label.get_text() - column = well[0] - if column not in source_columns: - show_ticks.append(i) - selected_tick_labels.append(well) - source_columns.add(column) - - ax.set_xticks(show_ticks) - ax.set_xticklabels(selected_tick_labels) - ax.xaxis.grid(True) - ax.set_xlabel("") - - -def generate_figure(*args): - ax = generate_point_plot(*args) - adjust_common_figure_style(ax) - - -def generate_drop_check_chart(df): - df_sorted = df.sort_values(by="path", ascending=True) - colors = sns.palettes.color_palette() - hue_order = [CheckWhen.PRE_RUN.value, CheckWhen.POST_RUN.value] - palette = {CheckWhen.PRE_RUN.value: colors[1], CheckWhen.POST_RUN.value: colors[0]} - - plt.clf() - # figsize looks strange, but is fittet for pdf report - fig, axs = plt.subplots(nrows=3, sharex=True, figsize=(8.75, 8.75)) - - generate_figure( - df_sorted, axs[0], "distance", (0, 400), "Distance [pixels]", palette, hue_order - ) - generate_figure( - df_sorted, - axs[1], - "traverse", - (-100, 100), - "Traverse [pixels]", - palette, - hue_order, - ) - generate_figure( - df_sorted, axs[2], "volume", (0, 600), "Drop Volume [pl]", palette, hue_order - ) - axs[-1].set_xlabel("Print Solution Well") - - -def generate_environment_graph(df): - plt.clf() - fig, axs = plt.subplots(nrows=2, sharex=True, figsize=(8.5, 5.8)) - - ax = sns.lineplot(data=df["temperature"], ax=axs[0]) - ax.set_ylabel("Temperature [°C]") - ax.set_ylim((10, 40)) - ax.set_xlabel("") - - ax = sns.lineplot(data=df["humidity"], ax=axs[1]) - ax.set_ylabel("Humidity [%rH]") - ax.set_ylim((10, 90)) - ax.set_xlabel("Date and Time") - - -def find_missing_drops(df): - mask = df["result"] != "ok" - missing = df.loc[mask].copy() - pivot = missing.pivot(index="well", columns="when", values="well") - if "pre run" not in pivot.columns: - pivot["pre run"] = np.nan - if "post run" not in pivot.columns: - pivot["post run"] = np.nan - pivot = pivot.fillna("") - # remove labels for post run fails if there are pre run fails - pivot["nodups"] = pivot["post run"] - mask = pivot["pre run"] == pivot["post run"] - pivot["nodups"][mask] = "" - return pivot.drop(columns=["post run"]).rename(columns={"nodups": "post run"}) diff --git a/s3printlog/graphs.py b/s3printlog/graphs.py new file mode 100644 index 0000000..a4f027e --- /dev/null +++ b/s3printlog/graphs.py @@ -0,0 +1,153 @@ +import matplotlib.pyplot as plt +import matplotlib.ticker as ticker +import pandas +import pathlib +import seaborn + +from pandas.plotting import register_matplotlib_converters +from collections import namedtuple + +register_matplotlib_converters() + +# set plotting styles +seaborn.set_style("darkgrid") +seaborn.set_style( + "ticks", + { + "legend.frameon": True, + "xtick.direction": "in", + "ytick.direction": "in", + "axes.linewidth": 2, + }, +) +seaborn.set(rc={"figure.figsize": (12, 6)}) +seaborn.set_context("paper") + + +GraphPaths = namedtuple("GraphPaths", ["environment", "drops"]) + + +def save_plot(data, label, suffix=".png"): + if not suffix.startswith("."): + suffix = f".{suffix}" + folder = data.files.folder + path = folder / f"{folder.name}_{label}{suffix}" + plt.savefig(path) + return path + + +def generate_environment_graph(data): + dataframe = data.print.environment + plt.clf() + fig, axs = plt.subplots(nrows=2, sharex=True, figsize=(8.5, 5.8)) + + ax = seaborn.lineplot(data=dataframe["temperature"], ax=axs[0]) + ax.set_ylabel("Temperature [°C]") + ax.set_ylim((10, 40)) + ax.set_xlabel("") + + ax = seaborn.lineplot(data=dataframe["humidity"], ax=axs[1]) + ax.set_ylabel("Humidity [%rH]") + ax.set_ylim((10, 90)) + ax.set_xlabel("Date / Time") + + return save_plot(data, "environment") + + +def _drop_point_plot(df, figure_index, what, y_limit, y_label, colors, hue_order): + ax = seaborn.pointplot( + data=df, + x="well", + y=what, + hue="measurement", + hue_order=hue_order, + style="measurement", + markers=list("X."), + ax=figure_index, + style_order=hue_order[::-1], + palette=colors, + join=False, + ) + ax.set_ylim(y_limit) + ax.set_ylabel(y_label) + return ax + + +def _drop_figure_styles(ax): + show_ticks = [] + selected_tick_labels = [] + source_columns = set() + for i, tick_label in enumerate(ax.get_xticklabels()): + well = tick_label.get_text() + column = well[0] + if column not in source_columns: + show_ticks.append(i) + selected_tick_labels.append(well) + source_columns.add(column) + + ax.set_xticks(show_ticks) + ax.set_xticklabels(selected_tick_labels) + ax.xaxis.grid(True) + ax.set_xlabel("") + + +def _make_drop_figure(*args): + ax = _drop_point_plot(*args) + _drop_figure_styles(ax) + + +def generate_drop_graph(data, nozzle): + # select the data of the nozlle + selection = data.drops["nozzle"] == nozzle + nozzle_df = data.drops[selection] + sorted_df = nozzle_df.sort_values(by="when", ascending=True) + + # setup some parameters + colors = seaborn.palettes.color_palette() + hue_order = ["pre run", "post run"] + palette = {"pre run": colors[1], "post run": colors[0]} + settings = data.print.graph_settings + + plt.clf() + # figsize looks strange, but is fittet for pdf report + fig, axs = plt.subplots(nrows=3, sharex=True, figsize=(8.75, 8.75)) + + _make_drop_figure( + sorted_df, + axs[0], + "distance", + (settings.distance.min, settings.distance.max), + settings.distance.label, + palette, + hue_order, + ) + _make_drop_figure( + sorted_df, + axs[1], + "offset", + (settings.offset.min, settings.offset.max), + settings.offset.label, + palette, + hue_order, + ) + _make_drop_figure( + sorted_df, + axs[2], + "volume", + (settings.volume.min, settings.volume.max), + settings.volume.label, + palette, + hue_order, + ) + axs[-1].set_xlabel("Print Solution Well") + + return save_plot(data, f"nozzle_{nozzle}") + + +def generate_all_graphs(data): + env_graph = generate_environment_graph(data) + + nozzles = data.drops["nozzle"].unique() + drop_graphs = {n: generate_drop_graph(data, n) for n in nozzles} + + return GraphPaths(env_graph, drop_graphs) diff --git a/s3printlog/gui.py b/s3printlog/gui.py index fbdb499..af9b6fe 100644 --- a/s3printlog/gui.py +++ b/s3printlog/gui.py @@ -5,7 +5,8 @@ import tkinter.ttk as ttk from pathlib import Path from tkinter import filedialog -from .main import get_log_files, process_log_files, open_with_default_app +from .main import process_log_files +from .utils import find_log_files, open_with_default_app if getattr(sys, "frozen", False): @@ -98,7 +99,7 @@ class Application(tk.Frame): opts = {"initialdir": initial_dir, "mustexist": True} selection = tk.filedialog.askdirectory(**opts) if selection: - self.log_files = get_log_files(selection) + self.log_files = find_log_files(selection) self.set_active_state() def set_active_state(self, event=None): diff --git a/s3printlog/logparser.py b/s3printlog/logparser.py deleted file mode 100644 index dd525c9..0000000 --- a/s3printlog/logparser.py +++ /dev/null @@ -1,201 +0,0 @@ -# basic imports -import numpy as np -import matplotlib.pyplot as plt -import matplotlib.ticker as ticker -import pandas as pd -import seaborn as sns -import pathlib - -from collections import namedtuple -from enum import Enum -from io import StringIO - - -PrintLogResult = namedtuple("PrintLogResult", ["environment", "info"]) - - -class CheckWhen(Enum): - PRE_RUN = "pre run" - POST_RUN = "post run" - - -class CheckResult(Enum): - OK = "ok" - FAIL = "fail" - SKIPPED = "skipped" - - -class DropCheckResult: - def __init__( - self, - path, - well, - result, - distance=np.nan, - traverse=np.nan, - volume=np.nan, - when=None, - ): - self.well = well - self.path = path - self.result = result - self.distance = distance - self.traverse = traverse - self.volume = volume - self.when = when - - def as_dict(self): - return { - "well": self.well, - "path": self.path, - "result": self.result.value, - "distance": self.distance, - "traverse": self.traverse, - "volume": self.volume, - "when": self.when.value, - } - - @classmethod - def from_file(cls, path, encoding="iso-8859-1"): - with open(path, "r", encoding=encoding) as file_handle: - lines = file_handle.readlines() - - # get x and y values, will be distance and traverse - xy_line = lines[1] - x_part, y_part = xy_line.split("\t") - x = parse_str_value(x_part, float) - y = parse_str_value(y_part, float) - - # get other data values - for line in lines: - if line.startswith("Well"): - well = parse_log_line(line, str) - if well.startswith("1"): - # the source plate number is encoded, we remove it, - # our printers have only one source plate - well = well[1:] - elif line.startswith("Drop Volume"): - volume = parse_log_line(line, float) - - # check for status - if path.stem.lower().endswith("ok"): - return cls( - path, well, CheckResult.OK, distance=x, traverse=y, volume=volume - ) - else: - return cls(path, well, CheckResult.FAIL) - - -# helper functions - - -def parse_str_value(str_data, cast_to, default_value=np.nan): - try: - return cast_to(str_data.strip()) - except Exception: - return default_value - - -def parse_log_line(line, cast_to, default_value=np.nan, separator="="): - _, str_data = line.rsplit(separator, 1) - return parse_str_value(str_data, cast_to, default_value) - - -def parse_log_files(log_list): - pre_run = dict() - post_run = dict() - well_list = list() - # use the files sorted by date and time - for path in sorted(log_list): - log_result = DropCheckResult.from_file(path) - if log_result.well not in pre_run: - log_result.when = CheckWhen.PRE_RUN - pre_run[log_result.well] = log_result - # we keep a separate list of wells in the order they appear - # there might be skipped wells after the pre run check - well_list.append(log_result.well) - else: - log_result.when = CheckWhen.POST_RUN - post_run[log_result.well] = log_result - - skipped_runs = {well for well in pre_run if well not in post_run} - for well in skipped_runs: - post_result = DropCheckResult( - "", well, CheckResult.SKIPPED, when=CheckWhen.POST_RUN - ) - post_run[well] = post_result - - parsed_files = [] - for well in well_list: - parsed_files.append(pre_run[well]) - parsed_files.append(post_run[well]) - - return pd.DataFrame([pf.as_dict() for pf in parsed_files]) - - -def split_print_log_line(line): - _, value = line.split(":", 1) - return value.strip() - - -def count_solutions(file_handle): - solutions = set() - for line in file_handle: - line = line.strip() - if not line or line[0] in ("X", "Y", "F", "["): - # empty line or uninteresting one, pick next one - continue - elif line.startswith("Drops/Field"): - # finished with all field definition, leave loop - break - entries = (item.strip() for item in line.split("\t")) - wells = (well for well in entries if well) - solutions.update(wells) - return len(solutions) - - -def parse_print_log(log_files): - env_lines = [] - print_info = {} - with open(log_files, "r", encoding="iso-8859-1") as file_handle: - for line in file_handle: - if "\tHumidity=\t" in line: - env_lines.append(line) - elif line.startswith("Probe:"): - print_info["source"] = split_print_log_line(line) - elif line.startswith("Target:"): - target_and_fields = split_print_log_line(line) - target, fields = target_and_fields.rsplit(":", 1) - print_info["target"] = target.strip() - print_info["fields"] = len(fields.split(",")) - elif line.startswith("Humidity:"): - print_info["humidity"] = split_print_log_line(line) - elif line.startswith("Run Name:"): - print_info["run"] = split_print_log_line(line) - elif line.startswith("Dot Pitch:"): - # important to pass the filehandle iterator here - print_info["solutions"] = count_solutions(file_handle) - - buff = StringIO("".join(env_lines)) - columns = ["datetime", "garbage 1", "humidity", "garbage 2", "temperature"] - tmp_df = pd.read_csv( - buff, sep="\t", header=None, names=columns, index_col=0, parse_dates=True - ) - environment_df = tmp_df.drop(columns=["garbage 1", "garbage 2"]) - return PrintLogResult(environment_df, print_info) - - -def augment_print_info(print_log_result, drop_log_list, encoding="iso-8859-1"): - """ gets voltage and pulse from a drop log file - - Since the voltage and pulse should not change during a print run, - we add this information to the print log info - """ - one_log_file = drop_log_list[0] - with open(one_log_file, "r", encoding=encoding) as file_handle: - for line in file_handle: - if line.startswith("Nozzle Voltage"): - print_log_result.info["voltage"] = parse_log_line(line, str) - elif line.startswith("Nozzle Pulse"): - print_log_result.info["pulse"] = parse_log_line(line, str) - return print_log_result diff --git a/s3printlog/main.py b/s3printlog/main.py index 3ead6d4..dca9360 100644 --- a/s3printlog/main.py +++ b/s3printlog/main.py @@ -1,106 +1,17 @@ -import matplotlib.pyplot as plt -import os import pathlib -import subprocess -import sys -import warnings -from collections import namedtuple - -from .analysis import ( - generate_drop_check_chart, - generate_environment_graph, - find_missing_drops, -) -from .logparser import parse_log_files, parse_print_log, augment_print_info -from .report import generate_report - -DROP_CHECK_SUFFIX = ".cor" -ENVIRONMENT_SUFFIX = "_Logfile.log" - - -DropProcessResult = namedtuple("DropProcessResult", ["drops", "missing"]) -PrintLogResult = namedtuple("PrintLogResult", ["environment", "info"]) -ProcessResult = namedtuple("ProcessResult", ["data_frame", "file_path"]) - - -class LogFiles(namedtuple("LogFiles", ["folder", "drop_check", "environment"])): - __slots__ = () - - def __bool__(self): - if self.drop_check and self.environment: - return True - else: - return False - - -class NoLogFileError(IOError): - pass - - -def get_log_files(folder): - folder = pathlib.Path(folder) - visible = [p for p in folder.iterdir() if not p.name.startswith(".")] - drop_files = [p for p in visible if p.name.endswith(DROP_CHECK_SUFFIX)] - env_files = [p for p in visible if p.name.endswith(ENVIRONMENT_SUFFIX)] - if len(env_files) != 1: - env_files = [None] - return LogFiles(folder, drop_files, env_files[0]) - - -def process_drop_checks(log_files): - drop_log_df = parse_log_files(log_files.drop_check) - - generate_drop_check_chart(drop_log_df) - image_path = log_files.folder / f"{log_files.folder.name}_drop_check.png" - plt.savefig(image_path) - - missing_drop_df = find_missing_drops(drop_log_df) - misssing_drop_list_path = ( - log_files.folder / f"{log_files.folder.name}_missed_spots.xlsx" - ) - missing_drop_df.to_excel(misssing_drop_list_path) - - return DropProcessResult( - ProcessResult(drop_log_df, image_path), - ProcessResult(missing_drop_df, image_path), - ) - - -def process_print_log(log_files): - print_log = parse_print_log(log_files.environment) - - generate_environment_graph(print_log.environment) - image_path = log_files.folder / f"{log_files.folder.name}_environment.png" - plt.savefig(image_path) - - tmp_result = PrintLogResult( - ProcessResult(print_log.environment, image_path), print_log.info - ) - return augment_print_info(tmp_result, log_files.drop_check) +from . import utils +from . import parsers +from . import graphs +from . import report def process_log_files(log_files): - drop_check_result = process_drop_checks(log_files) - print_log_result = process_print_log(log_files) - return generate_report( - log_files, - drop_check_result.drops, - drop_check_result.missing, - print_log_result.environment, - print_log_result.info, - ) - + data = parsers.parse_logs(log_files) + graph_paths = graphs.generate_all_graphs(data) + pdf_path = report.generate_report(data, graph_paths) + return pdf_path def process_log_folder(folder): - log_files = get_log_files(folder) + log_files = utils.find_log_files(folder) return process_log_files(log_files) - - -def open_with_default_app(some_path): - if sys.platform.startswith("linux"): - subprocess.call(["xdg-open", some_path]) - elif sys.platform.startswith("darwin"): - subprocess.call(["open", some_path]) - elif sys.platform.startswith("win"): - os.startfile(some_path) diff --git a/s3printlog/parsers.py b/s3printlog/parsers.py new file mode 100644 index 0000000..c6aa45c --- /dev/null +++ b/s3printlog/parsers.py @@ -0,0 +1,243 @@ +import io +import numpy +import pandas +import datetime + +from collections import namedtuple + +from . import utils + + +DropStatusInfo = namedtuple("DropStatusInfo", ["when", "status"]) +GraphProperties = namedtuple("GraphProperties", ["min", "max", "label"]) +GraphSettings = namedtuple("GraphSettings", ["distance", "offset", "volume"]) +LogResult = namedtuple("LogResult", ["files", "print", "drops", "statistics"]) +Nozzle = namedtuple("Nozzle", ["number", "voltage", "pulse", "drops_failed"]) +SoftwareVersion = namedtuple("Version", ["major", "minor", "patch"]) +Statistics = namedtuple("Statistics", ["nozzles", "failed_pre_run", "failed_post_run"]) + +GRAPH_SETTINGS = { + 3: GraphSettings( + distance=GraphProperties(min=0, max=400, label="Distance [pixels]"), + offset=GraphProperties(min=-100, max=100, label="Traverse [pixels]"), + volume=GraphProperties(min=0, max=600, label="Volume [pl]"), + ), + 10: GraphSettings( + distance=GraphProperties(min=0, max=3, label="Speed [m/s]"), + offset=GraphProperties(min=-140, max=140, label="Deviaton [µm]"), + volume=GraphProperties(min=0, max=600, label="Volume [pl]"), + ), +} + + +class PrintLog: + def __init__(self, log_file, printer, version): + # construction parameters + self.log_file = log_file + self.printer = printer + self.software_version = version + + # runid is derived from the filename + run_id, _ = log_file.stem.rsplit("_", 1) + self.run_id = run_id + + try: + self.graph_settings = GRAPH_SETTINGS[version.major] + except KeyError: + raise ValueError(f"Unknown Scienion Software Version {version.major}") + + # common parameters of the print log + self.humidity_setting = None + self.pattern_file = None + self.print_solutions = None + self.run_method = None + self.source_plate = None + self.target_substrate = None + self.target_count = None + + # dataframe for humidity and temperature + self.environment = None + + def parse(self, filehandle): + self.parse_header(filehandle) + self.parse_source_wells(filehandle) + self.parse_environment(filehandle) + + def parse_header(self, iterator): + for line in iterator: + if line.startswith("Field(s):"): + break + + parts = line.split(":", 1) + if len(parts) != 2: + continue + + key, value = parts[0].strip(), parts[1].strip() + if key == "Probe": + self.source_plate = value + elif key == "Target": + substrate, targets_str = value.split(":") + self.target_substrate = substrate.strip() + self.target_count = len(targets_str.split(",")) + elif key.startswith("Pattern File"): + self.pattern_file = value + elif key == "Humidity": + self.humidity_setting = value + elif key == "Run Name": + self.run_method = value + + def parse_source_wells(self, iterator): + # first we need to move ahead a little bit + for line in iterator: + if line.startswith("Field "): + break + raw_wells = [] + + for line in iterator: + if line.startswith("Drops"): + break + line = line.strip() + if line == "" or line[0] in ("F", "["): + continue + else: + raw_wells.extend(line.split("\t")) + + stripped = (entry.strip() for entry in raw_wells) + wells = (entry for entry in stripped if entry) + self.print_solutions = len(set(wells)) + + def parse_environment(self, iterator): + buff = io.StringIO() + for line in iterator: + if "\tHumidity=\t" in line: + buff.write(line) + buff.seek(0) + + f = lambda s: datetime.datetime.strptime(s, "%d.%m.%y-%H:%M:%S.%f") + tmp_df = pandas.read_csv( + buff, sep="\t", header=None, index_col=0, parse_dates=True, date_parser=f + ) + self.environment = pandas.DataFrame( + {"humidity": tmp_df.iloc[:, 1], "temperature": tmp_df.iloc[:, 3]} + ) + + +def parse_print_log(log_files): + with open(log_files.print, "r", encoding="iso-8859-1") as filehandle: + # parse the printer name + printer_line = next(filehandle) + printer = printer_line.split()[0] + + # get the software version info + version_line = next(filehandle) + _, version_info = version_line.split(":", 1) + major, minor, patch, _ = version_info.strip().split(".", 3) + version = SoftwareVersion(int(major), int(minor), int(patch)) + + log_parser = PrintLog(log_files.print, printer, version) + log_parser.parse(filehandle) + return log_parser + + +def cast(original, to, default=numpy.nan): + if hasattr(original, "strip"): + original = original.strip() + try: + return to(original) + except: + return default + + +def parse_value(log_line, to, default=numpy.nan): + _, value = log_line.split("=", 1) + return cast(value, to, default) + + +def parse_file_name(file_path): + name_parts = [p for p in file_path.stem.split("_") if p] + *_, date, unknown, autodrop, time, info = name_parts + when = date + time # parsing datetime is done in the pandas dataframe + if info.lower().endswith("ok"): + status = utils.DropState.OK + else: + status = utils.DropState.FAULT + return DropStatusInfo(when, status) + + +def parse_drop_file(file_path): + status_info = parse_file_name(file_path) + data = { + "path": file_path, + "when": status_info.when, + "status": status_info.status.value, + "distance": numpy.nan, # as default value + "offset": numpy.nan, # as default value + "volume": numpy.nan, # as default value + } + + with open(file_path, "r", encoding="iso-8859-1") as filehandle: + if status_info.status == utils.DropState.OK: + # only parse distance and offset if it is not a failed check + next(filehandle) # ignore first line + flight_info = next(filehandle) + distance, offset = flight_info.split() + data["distance"] = cast(distance, float) + data["offset"] = cast(offset, float) + + for line in filehandle: + if line.startswith("Well"): + well_id = parse_value(line, str) + data["plate"] = cast(well_id[0], int) + data["well"] = well_id[1:] + elif line.startswith("Nozzle No"): + data["nozzle"] = parse_value(line, int) + elif line.startswith("Nozzle Voltage"): + data["voltage"] = parse_value(line, int) + elif line.startswith("Nozzle Pulse"): + data["pulse"] = parse_value(line, int) + elif ( + line.startswith("Drop Volume") + and status_info.status == utils.DropState.OK + ): + data["volume"] = parse_value(line, int) + + data["well_id"] = f"{data['nozzle']}.{well_id}" # nozzle is added for a complete id + return data + + +def parse_drop_logs(log_files): + collection = (parse_drop_file(f) for f in log_files.drops) + df = pandas.DataFrame(collection) + df["when"] = pandas.to_datetime(df["when"], format="%Y%m%d%H%M%S") + + # find the pre run values + grouped = df.groupby("well_id") + pre_run_df = grouped["when"].min().reset_index() + pre_run_df["measurement"] = "pre run" + + # merge them back into the dataframe + df = df.merge(pre_run_df, on=["well_id", "when"], how="outer") + + # the ones with not set values are post runs + df = df.fillna({"measurement": "post run"}) + return df + + +def collect_statistics(drop_log): + nozzle_df = drop_log.groupby("nozzle").first() + nozzles = [] + for nozzle_nr, row in nozzle_df.iterrows(): + failures = utils.find_failed_drops(drop_log, nozzle_nr) + nozzles.append(Nozzle(nozzle_nr, row["voltage"], row["pulse"], failures)) + + total_failures = utils.find_failed_drops(drop_log, nozzle=None) + return Statistics( + nozzles, len(total_failures.pre_run), len(total_failures.post_run) + ) + + +def parse_logs(log_files): + print_log = parse_print_log(log_files) + drop_log = parse_drop_logs(log_files) + stats = collect_statistics(drop_log) + return LogResult(log_files, print_log, drop_log, stats) diff --git a/s3printlog/report.py b/s3printlog/report.py index 44521eb..bb1af0e 100644 --- a/s3printlog/report.py +++ b/s3printlog/report.py @@ -18,16 +18,22 @@ from reportlab.platypus import ( Table, ) - ImageBuffer = namedtuple("ImageBuffer", ["buffer", "width", "height"]) FailedDropImage = namedtuple("FailedDropImage", ["path", "well"]) - styles = getSampleStyleSheet() style_n = styles["Normal"] style_h1 = styles["Heading1"] style_h2 = styles["Heading2"] +TABLE_STYLE = [ + ("TOPPADDING", (0, 0), (-1, -1), 0), + ("RIGHTPADDING", (0, 0), (-1, -1), 7), + ("BOTTOMPADDING", (0, 0), (-1, -1), 0), + ("LEFTPADDING", (0, 0), (-1, -1), 0), + ("FONTSIZE", (0, 0), (-1, -1), 8), +] + class DropPictures(Flowable): """A row of drop pictures flowable.""" @@ -48,6 +54,39 @@ class DropPictures(Flowable): canvas.drawString(offset + 0.5 * cm, 3.0 * cm, picture.well) + +def print_info_flowable(data): + version = data.print.software_version + content = [ + ("Printer:", data.print.printer), + ("Software version:", f"{version.major}.{version.minor}.{version.patch}"), + ( + "Humidity Setting:", + f"{data.print.humidity_setting} (humidifier might be turned off)", + ), + ("Run Method:", data.print.run_method), + ("Source Plate:", data.print.source_plate), + ("Print Solutions:", f"{data.print.print_solutions} solutions"), + ("Target Substrate:", data.print.target_substrate), + ("Number of Targets:", f"{data.print.target_count} targets printed"), + ] + if data.print.pattern_file: + content.append(("Pattern File:", data.print.pattern_file)) + nozzles = sorted(data.statistics.nozzles) + content.append(("Number of Nozzles:", len(nozzles))) + for nozzle in nozzles: + content.append( + ( + f"Settings Nozzle #{nozzle.number}:", + f"{nozzle.voltage}V, {nozzle.pulse}µs", + ) + ) + content.append(("Failed Drop Checks, Pre Run:", data.statistics.failed_pre_run)) + content.append(("Failed Drop Checks, Post Run:", data.statistics.failed_post_run)) + + return Table(content, style=TABLE_STYLE, hAlign="LEFT") + + def trim_image(image_path): original = PIL.Image.open(image_path) background = PIL.Image.new(original.mode, original.size, original.getpixel((0, 0))) @@ -67,23 +106,6 @@ def scaled_image_flowable(image_path, width=17 * cm): return Image(image_buffer.buffer, width=width, height=height) -def get_failed_drop_images(drops, missing, when): - mask = drops.data_frame["when"] == when - partial_df = drops.data_frame[mask] - - mask = partial_df["result"] == "fail" - failed_df = partial_df[mask] - - missing_wells = missing.data_frame[when] - mask = failed_df["well"].isin(missing_wells) - - failed_images = [ - FailedDropImage(item.path.with_suffix(".jpg"), item.well) - for item in failed_df[mask].itertuples() - ] - return sorted(failed_images) - - def graph_flowable(title, file_path): section = [ Paragraph(title, style_h2), @@ -92,16 +114,29 @@ def graph_flowable(title, file_path): ] return KeepTogether(section) +def get_failed_drop_images(failed_checks): + return [ + FailedDropImage(item.path.with_suffix(".jpg"), item.well) + for item in failed_checks.itertuples() + ] -def failed_drops_flowable(drops, missing, what): - failed_images = get_failed_drop_images(drops, missing, what) +def failed_drops_flowable(nozzle, measurement): + if measurement == "Pre Run": + failed_checks = nozzle.drops_failed.pre_run + elif measurement == "Post Run": + failed_checks = nozzle.drops_failed.post_run + else: + raise ValueError(f"Unknown mesurement: {measurement}") + failed_images = get_failed_drop_images(failed_checks) if len(failed_images) == 0: # no images to display here, we return early return [] - what_title = what.capitalize() - section = [PageBreak(), Paragraph(f"Failed Drop Check: {what_title}", style_h2)] + section = [ + PageBreak(), + Paragraph(f"Failed Drop Images: Nozzle #{nozzle.number}, {measurement}", style_h2) + ] # group three images together failed_iterator = iter(failed_images) @@ -113,62 +148,50 @@ def failed_drops_flowable(drops, missing, what): return section -def print_info_flowable(print_info): - data = [ - ("Source Plate:", print_info["source"]), - ("Print Solutions:", f"{print_info['solutions']} solutions"), - ("Target Substrate:", print_info["target"]), - ("Number of Fields:", f"{print_info['fields']} fields printed"), - ("Run Method:", print_info["run"]), - ("Voltage:", f"{print_info['voltage']} V"), - ("Pulse:", f"{print_info['pulse']} µs"), - ( - "Humidity Setting:", - f"{print_info['humidity']} (humidifier might be turned off)", - ), - ] - return Table( - data, - style=[ - ("TOPPADDING", (0, 0), (-1, -1), 0), - ("RIGHTPADDING", (0, 0), (-1, -1), 7), - ("BOTTOMPADDING", (0, 0), (-1, -1), 0), - ("LEFTPADDING", (0, 0), (-1, -1), 0), - ("FONTSIZE", (0, 0), (-1, -1), 8), - ], - hAlign="LEFT", - ) - - -def generate_report(log_files, drops, missing, environment, print_info): +def generate_report(data, graphs): story = [] - start = environment.data_frame.index.min() + start = data.print.environment.index.min() start_str = start.strftime("%Y-%m-%d %H:%m") - end = environment.data_frame.index.max() + end = start = data.print.environment.index.max() end_str = end.strftime("%Y-%m-%d %H:%m") headline = Paragraph(f"Print {start_str} - {end_str}", style_h1) story.append(headline) story.append(Spacer(width=17 * cm, height=0.5 * cm)) - story.append(print_info_flowable(print_info)) + story.append(print_info_flowable(data)) story.append(Spacer(width=17 * cm, height=0.5 * cm)) - story.append(graph_flowable("Drop Check Graphs", drops.file_path)) + story.append(graph_flowable("Environment Graphs", graphs.environment)) - story.extend(failed_drops_flowable(drops, missing, "pre run")) - story.extend(failed_drops_flowable(drops, missing, "post run")) + for nozzle in sorted(data.statistics.nozzles): + story.append(PageBreak()) + + path = graphs.drops[nozzle.number] + story.append( + graph_flowable(f"Drop Check Graphs, Nozzle #{nozzle.number}", path) + ) - if len(story) == 5: - # no failed drop checks where reported story.append(Spacer(width=17 * cm, height=0.5 * cm)) - story.append(Paragraph("No failed drop checks found.", style_n)) - - story.append(PageBreak()) - story.append(graph_flowable("Environment Graphs", environment.file_path)) - - pdf_path = log_files.folder / f"{log_files.folder.name}_report.pdf" + + if len(nozzle.drops_failed.pre_run) == 0: + failed_wells_pre_run = "-" + else: + failed_wells_pre_run = ", ".join(nozzle.drops_failed.pre_run["well"]) + if len(nozzle.drops_failed.post_run) == 0: + failed_wells_post_run = "-" + else: + failed_wells_post_run = ", ".join(nozzle.drops_failed.post_run["well"]) + content = [ + ("Failed Pre Run Checks:", failed_wells_pre_run), + ("Failed Post Run Checks:", failed_wells_post_run), + ] + story.append(Table(content, style=TABLE_STYLE, hAlign="LEFT")) + story.extend(failed_drops_flowable(nozzle, "Pre Run")) + story.extend(failed_drops_flowable(nozzle, "Post Run")) + + pdf_path = data.files.folder / f"{data.files.folder.name}_report.pdf" doc = SimpleDocTemplate( str(pdf_path), pagesize=A4, diff --git a/s3printlog/utils.py b/s3printlog/utils.py new file mode 100644 index 0000000..cb201f8 --- /dev/null +++ b/s3printlog/utils.py @@ -0,0 +1,72 @@ +import enum +import os +import pathlib +import subprocess +import sys + +from collections import namedtuple + +FailedWells = namedtuple("FailedWells", ["pre_run", "post_run"]) + + +class DropState(enum.Enum): + OK = "ok" + FAULT = "fault" + + +class LogFiles(namedtuple("_LogFiles", ["folder", "print", "drops"])): + __slots__ = () + + def __bool__(self): + if self.print and self.drops: + return True + else: + return False + + +def _find_files(dir_path, endswith): + visible = (i for i in dir_path.iterdir() if not i.name.startswith(".")) + return [item for item in visible if item.name.endswith(endswith)] + + +def find_log_files(folder): + dir_path = pathlib.Path(folder) + tmp_print_log = _find_files(dir_path, "_Logfile.log") + if len(tmp_print_log) == 1: + print_log = tmp_print_log[0] + else: + print_log = None + drop_logs = _find_files(dir_path, ".cor") + return LogFiles(dir_path, print_log, drop_logs) + + +def get_failed_drop_checks(dataframe, measurement, nozzle=None): + if nozzle is not None: + selection = dataframe["nozzle"] == nozzle + nozzle_df = dataframe[selection] + else: + nozzle_df = dataframe + # select first only the failed rows + selection = nozzle_df["status"] == DropState.FAULT.value + failure_df = nozzle_df[selection] + # selection based on measurement type + selection = failure_df["measurement"] == measurement + return failure_df[selection] + + +def find_failed_drops(dataframe, nozzle=None): + pre_run_df = get_failed_drop_checks(dataframe, "pre run", nozzle) + all_post_run_df = get_failed_drop_checks(dataframe, "post run", nozzle) + # if a check already failed in the pre run, we exclude it from the post run + selection = all_post_run_df["well_id"].isin(pre_run_df["well_id"]) + post_run_df = all_post_run_df[~selection] + return FailedWells(pre_run_df, post_run_df) + + +def open_with_default_app(some_path): + if sys.platform.startswith("linux"): + subprocess.call(["xdg-open", some_path]) + elif sys.platform.startswith("darwin"): + subprocess.call(["open", some_path]) + elif sys.platform.startswith("win"): + os.startfile(some_path) diff --git a/test.py b/test.py index 7a8cb11..b7c247e 100644 --- a/test.py +++ b/test.py @@ -3,9 +3,10 @@ from s3printlog import main -folder = 'C:/Users/Holgi/Developer/python-libraries/s3printlog/example 1' -#folder = "example 2" -folder = "example 3" +folder = 'C:/Users/Holgi/Developer/python-libraries/s3printlog/example data/example 1' +#folder = "example data/example 2" +#folder = "example data/example 3" +folder = "example data/example sx printer" print("Generating report, PDF should be opened in a couple of seconds") report_file = main.process_log_folder(folder)