You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
175 lines
5.2 KiB
175 lines
5.2 KiB
""" Sartorius Logger |
|
|
|
Make time series measurements with a Sartorius sartoriusb. |
|
""" |
|
|
|
__version__ = "0.0.1" |
|
|
|
import pandas |
|
import sartoriusb |
|
import time |
|
|
|
from collections import namedtuple |
|
from datetime import datetime |
|
from tqdm import tqdm |
|
|
|
from .datalogger import DataLogger, NullLogger |
|
from .parsers import parse_cli_arguments |
|
|
|
|
|
SCALE_INFO_LABELS = { |
|
sartoriusb.CMD_INFO_TYPE: "Scale Model", |
|
sartoriusb.CMD_INFO_SNR: "Scale Serial Number", |
|
sartoriusb.CMD_INFO_VERSION_SCALE: "Software Version of Scale", |
|
sartoriusb.CMD_INFO_VERSION_CONTROL_UNIT: "Software Version of Control Unit", # noqa: E501 |
|
} |
|
|
|
MEASUREMENT_KEYS = [ |
|
"nr", |
|
"time", |
|
"mode", |
|
"value", |
|
"unit", |
|
"stable", |
|
"message", |
|
] |
|
|
|
|
|
Result = namedtuple("Result", ["info", "scale", "data", "log_file"]) |
|
|
|
|
|
def get_scale_info(conn): |
|
""" returns the available scale information """ |
|
data = {} |
|
|
|
for command, label in SCALE_INFO_LABELS.items(): |
|
raw_data_lines = conn.get(command) |
|
if raw_data_lines: |
|
raw_data = raw_data_lines[0] |
|
raw_data = raw_data.strip() |
|
parts = raw_data.split(maxsplit=1) |
|
info = parts[1] if len(parts) > 1 else "" |
|
else: |
|
# propably a timeout of the serial connection |
|
info = "" |
|
data[label] = info |
|
|
|
return data |
|
|
|
|
|
def _measure_and_log(nr, conn, logger): |
|
""" performs and logs one measurement |
|
|
|
:params nr: number of measurement |
|
:params conn: connection to the scale |
|
:params log: data logger instance |
|
:returns: dict for measurement point |
|
""" |
|
measurement = conn.measure() |
|
data_list = [nr, datetime.now()] + list(measurement) |
|
logger.add_list(data_list) |
|
|
|
data_dict = dict(zip(MEASUREMENT_KEYS, data_list)) |
|
|
|
# for the pandas data frame the value should be transformed into a float |
|
try: |
|
data_dict["value"] = float(data_dict["value"]) |
|
except (ValueError, TypeError): |
|
pass |
|
|
|
return data_dict |
|
|
|
|
|
def no_progress_bar(iterator): |
|
"""" as stub function for not displaying a progress bar """ |
|
return iterator |
|
|
|
|
|
def _get_log_file_path(settings): |
|
""" constructs the path to the log file """ |
|
|
|
now = datetime.now() |
|
log_file_name = now.strftime("%Y-%m-%d %H-%M-%S") + ".txt" |
|
return settings.directory / log_file_name |
|
|
|
|
|
def _log_measurement_info(logger, settings): |
|
""" logs all measurement info """ |
|
nr_of_measurements = 1 + ( |
|
settings.duration.seconds // settings.interval.seconds |
|
) |
|
measurement_info = { |
|
"Measurements": nr_of_measurements, |
|
"Duration": f"{settings.duration.value}{settings.duration.unit}", |
|
"Interval": f"{settings.interval.value}{settings.interval.unit}", |
|
"Com-Port": settings.port, |
|
} |
|
logger.add_section("Measurement Settings", measurement_info.items()) |
|
return measurement_info |
|
|
|
|
|
def _log_scale_info(logger, conn): |
|
""" logs common scale info """ |
|
scale_info = get_scale_info(conn) |
|
logger.add_section("Scale Info", scale_info.items()) |
|
return scale_info |
|
|
|
|
|
def measure_series(settings, progress_bar=no_progress_bar, data_logger=None): |
|
""" serial measurements |
|
|
|
will return the data as pandas data frames in a "Result" named tuple |
|
|
|
:params settings: parser.Settings named tuple |
|
:params progress_bar: progress bar function to use |
|
:params data_logger: class of the data logger to use |
|
:returns: named tuple "Result" |
|
""" |
|
data_logger = data_logger or NullLogger() |
|
|
|
data_collection = [] |
|
|
|
with data_logger as logger: |
|
measurement_info = _log_measurement_info(logger, settings) |
|
|
|
with sartoriusb.SartoriusUsb(settings.port) as conn: |
|
scale_info = _log_scale_info(logger, conn) |
|
|
|
# add column headers |
|
headers = [item.capitalize() for item in MEASUREMENT_KEYS] |
|
logger.add_section( |
|
"Measured Data", [headers], append_empty_line=False |
|
) |
|
|
|
nr_of_measurements = measurement_info["Measurements"] |
|
for i in progress_bar(range(1, nr_of_measurements)): |
|
data = _measure_and_log(i, conn, logger) |
|
data_collection.append(data) |
|
time.sleep(settings.interval.seconds) |
|
|
|
data = _measure_and_log(nr_of_measurements, conn, logger) |
|
data_collection.append(data) |
|
|
|
data_df = pandas.DataFrame(data_collection).set_index("time") |
|
info_df = pandas.DataFrame(measurement_info.items()).set_index(0) |
|
scale_df = pandas.DataFrame(scale_info.items()).set_index(0) |
|
|
|
return Result(info_df, scale_df, data_df, data_logger.path) |
|
|
|
|
|
def export_as_excel(measurement_result): |
|
""" saves the collected data as an Excel file """ |
|
excel_path = measurement_result.log_file.with_suffix(".xlsx") |
|
with pandas.ExcelWriter(excel_path) as writer: |
|
measurement_result.data.to_excel(writer, sheet_name="Measurements") |
|
measurement_result.info.to_excel(writer, sheet_name="Settings") |
|
measurement_result.scale.to_excel(writer, sheet_name="Scale") |
|
|
|
|
|
def cli(): |
|
settings = parse_cli_arguments() |
|
log_file_path = _get_log_file_path(settings) |
|
result = measure_series( |
|
settings, progress_bar=tqdm, data_logger=DataLogger(log_file_path) |
|
) |
|
export_as_excel(result)
|
|
|