From ae1427c8ac90d42945074fbc8a9bd2165429f557 Mon Sep 17 00:00:00 2001 From: Holger Frey Date: Mon, 11 Nov 2019 09:20:53 +0100 Subject: [PATCH] initial import --- .gitignore | 4 + Makefile | 62 +++++ README.md | 4 +- pyproject.toml | 53 ++++ sartorius_logger/__init__.py | 175 +++++++++++++ sartorius_logger/datalogger.py | 117 +++++++++ sartorius_logger/parsers.py | 99 ++++++++ tests/__init__.py | 1 + tests/stubs.py | 95 +++++++ tests/test_sartorius_logger.py | 290 ++++++++++++++++++++++ tests/test_sartorius_logger_datalogger.py | 206 +++++++++++++++ tests/test_sartorius_logger_parsers.py | 131 ++++++++++ 12 files changed, 1235 insertions(+), 2 deletions(-) create mode 100644 Makefile create mode 100644 pyproject.toml create mode 100644 sartorius_logger/__init__.py create mode 100644 sartorius_logger/datalogger.py create mode 100644 sartorius_logger/parsers.py create mode 100644 tests/__init__.py create mode 100644 tests/stubs.py create mode 100644 tests/test_sartorius_logger.py create mode 100644 tests/test_sartorius_logger_datalogger.py create mode 100644 tests/test_sartorius_logger_parsers.py diff --git a/.gitignore b/.gitignore index 7f7cccc..1d77a72 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ __pycache__/ # Distribution / packaging .Python +.venv env/ build/ develop-eggs/ @@ -58,3 +59,6 @@ docs/_build/ # PyBuilder target/ +# Mac Stuff +.DS_Store + diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..0d6cc13 --- /dev/null +++ b/Makefile @@ -0,0 +1,62 @@ +.DEFAULT_GOAL := help + +define BROWSER_PYSCRIPT +import os, webbrowser, sys + +try: + from urllib import pathname2url +except: + from urllib.request import pathname2url + +webbrowser.open("file://" + pathname2url(os.path.abspath(sys.argv[1]))) +endef +export BROWSER_PYSCRIPT + +define PRINT_HELP_PYSCRIPT +import re, sys + +for line in sys.stdin: + match = re.match(r'^([a-zA-Z_-]+):.*?## (.*)$$', line) + if match: + target, help = match.groups() + print("%-20s %s" % (target, help)) +endef +export PRINT_HELP_PYSCRIPT + +BROWSER := python -c "$$BROWSER_PYSCRIPT" + +help: + @python -c "$$PRINT_HELP_PYSCRIPT" < $(MAKEFILE_LIST) + +clean: clean-build clean-pyc clean-test ## remove all build, test, coverage and Python artifacts + +clean-build: ## remove build artifacts + rm -fr build/ + rm -fr dist/ + rm -fr .eggs/ + find . -name '*.egg-info' -exec rm -fr {} + + find . -name '*.egg' -exec rm -f {} + + +clean-pyc: ## remove Python file artifacts + find . -name '*.pyc' -exec rm -f {} + + find . -name '*.pyo' -exec rm -f {} + + find . -name '*~' -exec rm -f {} + + find . -name '__pycache__' -exec rm -fr {} + + +clean-test: ## remove test and coverage artifacts + rm -f .coverage + rm -fr htmlcov/ + +lint: ## check style with flake8 + black sartorius_logger tests + flake8 sartorius_logger tests + +test: ## run tests quickly (without tests for example.py) with the default Python + pytest tests -x --disable-warnings -k "not app" + +coverage: ## check code coverage with the default Python + pytest tests --cov=sartorius_logger + coverage html + $(BROWSER) htmlcov/index.html + + diff --git a/README.md b/README.md index dffacbf..a72e50b 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,3 @@ -# sartoriuslogger +# Sartorius Logger -Record serial measurements of a Sartorius Quintix Scale \ No newline at end of file +Make time series measurements with a Sartorius scale. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..8152683 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,53 @@ +[build-system] +requires = ["flit"] +build-backend = "flit.buildapi" + +[tool.flit.metadata] +module = "sartorius_logger" +author = "Holger Frey" +author-email = "frey@imtek.de" +home-page = "https://git.cpi.imtek.uni-freiburg.de/holgi/sartorius_logger" +description-file = "README.md" +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: Freely Distributable", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3 :: Only", + "Topic :: Scientific/Engineering", +] +requires = [ + "sartoriusb >= 0.2.0", + "pandas >= 0.25.0", + "openpyxl >= 3.0.0", + "tqdm >= 4.37.0" +] +requires-python = ">=3.7" + +[tool.flit.metadata.requires-extra] +test = [ + "black", + "flake8", + "flake8-comprehensions", + "pytest >=4.0.0", + "pytest-cov", + "pytest-mock", +] +dev = [ + "keyring", +] + +[tool.black] +line-length = 79 +py37 = true +include = '\.pyi?$' +exclude = ''' +/( + \.git + | \.tox + | \.venv + | build + | dist +)/ +''' diff --git a/sartorius_logger/__init__.py b/sartorius_logger/__init__.py new file mode 100644 index 0000000..8ef0ef7 --- /dev/null +++ b/sartorius_logger/__init__.py @@ -0,0 +1,175 @@ +""" Sartorius Logger + +Make time series measurements with a Sartorius sartoriusb. +""" + +__version__ = "0.0.1" + +import pandas +import sartoriusb +import time + +from collections import namedtuple +from datetime import datetime +from tqdm import tqdm + +from .datalogger import DataLogger, NullLogger +from .parsers import parse_cli_arguments + + +SCALE_INFO_LABELS = { + sartoriusb.CMD_INFO_TYPE: "Scale Model", + sartoriusb.CMD_INFO_SNR: "Scale Serial Number", + sartoriusb.CMD_INFO_VERSION_SCALE: "Software Version of Scale", + sartoriusb.CMD_INFO_VERSION_CONTROL_UNIT: "Software Version of Control Unit", # noqa: E501 +} + +MEASUREMENT_KEYS = [ + "nr", + "time", + "mode", + "value", + "unit", + "stable", + "message", +] + + +Result = namedtuple("Result", ["info", "scale", "data", "log_file"]) + + +def get_scale_info(conn): + """ returns the available scale information """ + data = {} + + for command, label in SCALE_INFO_LABELS.items(): + raw_data_lines = conn.get(command) + if raw_data_lines: + raw_data = raw_data_lines[0] + raw_data = raw_data.strip() + parts = raw_data.split(maxsplit=1) + info = parts[1] if len(parts) > 1 else "" + else: + # propably a timeout of the serial connection + info = "" + data[label] = info + + return data + + +def _measure_and_log(nr, conn, logger): + """ performs and logs one measurement + + :params nr: number of measurement + :params conn: connection to the scale + :params log: data logger instance + :returns: dict for measurement point + """ + measurement = conn.measure() + data_list = [nr, datetime.now()] + list(measurement) + logger.add_list(data_list) + + data_dict = dict(zip(MEASUREMENT_KEYS, data_list)) + + # for the pandas data frame the value should be transformed into a float + try: + data_dict["value"] = float(data_dict["value"]) + except (ValueError, TypeError): + pass + + return data_dict + + +def no_progress_bar(iterator): + """" as stub function for not displaying a progress bar """ + return iterator + + +def _get_log_file_path(settings): + """ constructs the path to the log file """ + + now = datetime.now() + log_file_name = now.strftime("%Y-%m-%d %H-%M-%S") + ".txt" + return settings.directory / log_file_name + + +def _log_measurement_info(logger, settings): + """ logs all measurement info """ + nr_of_measurements = 1 + ( + settings.duration.seconds // settings.interval.seconds + ) + measurement_info = { + "Measurements": nr_of_measurements, + "Duration": f"{settings.duration.value}{settings.duration.unit}", + "Interval": f"{settings.interval.value}{settings.interval.unit}", + "Com-Port": settings.port, + } + logger.add_section("Measurement Settings", measurement_info.items()) + return measurement_info + + +def _log_scale_info(logger, conn): + """ logs common scale info """ + scale_info = get_scale_info(conn) + logger.add_section("Scale Info", scale_info.items()) + return scale_info + + +def measure_series(settings, progress_bar=no_progress_bar, data_logger=None): + """ serial measurements + + will return the data as pandas data frames in a "Result" named tuple + + :params settings: parser.Settings named tuple + :params progress_bar: progress bar function to use + :params data_logger: class of the data logger to use + :returns: named tuple "Result" + """ + data_logger = data_logger or NullLogger() + + data_collection = [] + + with data_logger as logger: + measurement_info = _log_measurement_info(logger, settings) + + with sartoriusb.SartoriusUsb(settings.port) as conn: + scale_info = _log_scale_info(logger, conn) + + # add column headers + headers = [item.capitalize() for item in MEASUREMENT_KEYS] + logger.add_section( + "Measured Data", [headers], append_empty_line=False + ) + + nr_of_measurements = measurement_info["Measurements"] + for i in progress_bar(range(1, nr_of_measurements)): + data = _measure_and_log(i, conn, logger) + data_collection.append(data) + time.sleep(settings.interval.seconds) + + data = _measure_and_log(nr_of_measurements, conn, logger) + data_collection.append(data) + + data_df = pandas.DataFrame(data_collection).set_index("time") + info_df = pandas.DataFrame(measurement_info.items()).set_index(0) + scale_df = pandas.DataFrame(scale_info.items()).set_index(0) + + return Result(info_df, scale_df, data_df, data_logger.path) + + +def export_as_excel(measurement_result): + """ saves the collected data as an Excel file """ + excel_path = measurement_result.log_file.with_suffix(".xlsx") + with pandas.ExcelWriter(excel_path) as writer: + measurement_result.data.to_excel(writer, sheet_name="Measurements") + measurement_result.info.to_excel(writer, sheet_name="Settings") + measurement_result.scale.to_excel(writer, sheet_name="Scale") + + +def cli(): + settings = parse_cli_arguments() + log_file_path = _get_log_file_path(settings) + result = measure_series( + settings, progress_bar=tqdm, data_logger=DataLogger(log_file_path) + ) + export_as_excel(result) diff --git a/sartorius_logger/datalogger.py b/sartorius_logger/datalogger.py new file mode 100644 index 0000000..51fe899 --- /dev/null +++ b/sartorius_logger/datalogger.py @@ -0,0 +1,117 @@ +""" data logger """ + +from pathlib import Path + + +CR = chr(13) +LF = chr(10) + + +def is_container(obj): + """' checks if the object is iterable but not string or bytes """ + return hasattr(obj, "__iter__") and not isinstance(obj, (str, bytes)) + + +class DataLogger: + def __init__(self, log_path, none_rep="", sep="\t", end=CR + LF): + """ initializes the data logger class + + :params log_path: file path of the log file + :params none_rep: string representation for None values + :params sep: character used to separate values in a list + :params end: character used for line ending + """ + + self.path = Path(log_path) + self.end = end + self.sep = sep + self.none_rep = none_rep + + self.handle = None + + def open(self): + """ opens the log file for writing """ + if self.handle is None: + self.path.parent.mkdir(parents=True, exist_ok=True) + self.handle = self.path.open("a") + + def close(self): + """ closes the log file for writing """ + if self.handle: + self.handle.close() + self.handle = None + + def __call__(self, data): + """ writes data to the log + + :params data: either a string or a container, like a list + """ + if is_container(data): + self.add_list(data) + else: + self.add(data) + + def add(self, text): + """ adds one line of text to the logfile """ + if self.handle is None: + raise IOError("Log file not opened for writing") + text = self._universal_new_lines(text) + self.handle.write(text) + + def add_list(self, data): + """ adds the values in a list to one line in the log FileCache + + The string set in the `sep` property is used to separate the values + in the output, None values are represented by the string in `none_rep` + property. + """ + conveted_nones = ((self.none_rep if v is None else v) for v in data) + value_strings = (str(v) for v in conveted_nones) + line = self.sep.join(value_strings) + self.add(line) + + def add_section(self, title, list_of_data, append_empty_line=True): + """ adds a section to the log file """ + title = title.strip() + self.add(f"[{title}]") + for data in list_of_data: + self.add_list(data) + if append_empty_line: + self.add("") + + def __enter__(self): + """ Context manager: opens the log file for writing """ + self.open() + return self + + def __exit__(self, exc_type, exc_value, exc_traceback): + """ Context manager: closes the log file """ + self.close() + + def _universal_new_lines(self, text): + """ unifies new line characters of a text + + The `end` property is used as a newline character and ensures one is + present at the end of the text. + """ + lines = text.splitlines() + return self.end.join(lines) + self.end + + +class NullLogger: + """ A stub logger with the DataLogger interface that does nothing """ + + def __init__(self, *args, **kargs): + pass + + def __call__(self, *args, **kargs): + pass + + def __getattr__(self, key): + return self + + def __enter__(self): + return self + + def __exit__(self, *args, **kargs): + pass diff --git a/sartorius_logger/parsers.py b/sartorius_logger/parsers.py new file mode 100644 index 0000000..11d1317 --- /dev/null +++ b/sartorius_logger/parsers.py @@ -0,0 +1,99 @@ +import argparse +import re + +from collections import namedtuple +from pathlib import Path + + +TIME_UNIT_FACTORS = { + "s": 1, + "m": 60, + "h": 60 * 60, +} + +ParsedDuration = namedtuple("ParsedDuration", ["value", "unit", "seconds"]) + +Settings = namedtuple( + "Settings", ["duration", "interval", "directory", "port"] +) + + +def parse_duration(raw_value, default_unit="m"): + """' parses a duration like 10sec, 5min or 2h + + :params raw_value: string value to parse + :params default_unit: default unit to use, if no (known) unit is present + :returns: ParsedTime named tuple or None + """ + match = re.match(r"(?P\d+)(?P.*)", raw_value) + if not match: + return None + value = int(match["value"]) + unit = _normalize_time_unit(match["unit"], default_unit=default_unit) + factor = TIME_UNIT_FACTORS[unit] + return ParsedDuration(value, unit, value * factor) + + +def parse_cli_arguments(): + """ parses command line interface arguments """ + parser = argparse.ArgumentParser( + description="Make time series measurements with a Sartorius scale.", + epilog=( + "Times can be specified as 10s, 10m, or 10h" + "for seconds, minutes or hours respectively." + "A relative directory path starts at the Desktop." + ), + ) + parser.add_argument("port", nargs="?", default="COM4", metavar="COMPORT") + parser.add_argument("-d", "--duration", nargs="?", default="30m") + parser.add_argument("-i", "--interval", nargs="?", default="10s") + parser.add_argument("-o", "--output", nargs="?", metavar="DIRECTORY") + raw_arguments = parser.parse_args() + return _normalize_cli_arguments(raw_arguments) + + +# helper functions + + +def _normalize_time_unit(raw_unit, default_unit): + """ checks if a known time unit is present, else uses the default unit """ + known_units = "hms" + found = [unit for unit in known_units if unit in raw_unit.lower()] + unit = found[0] if found else default_unit + return unit + + +def _normalize_cli_arguments(raw_arguments): + """ transforms cli arguments into a suitable form """ + # time related stuff + duration = parse_duration(raw_arguments.duration, default_unit="m") + interval = parse_duration(raw_arguments.interval, default_unit="s") + + # directory stuff + dir_path = _check_output_directory_path(raw_arguments.output) + + return Settings(duration, interval, dir_path, raw_arguments.port) + + +def _check_output_directory_path(raw_path): + """ returns the absolue path of the output directory + + The desktop path (~/Desktop) is considered the default directory. If a + relative path is provided, it is considered relative to the desktop. + """ + # the desktop path is the default output directory + out_path = desktop_path = Path.home() / "Desktop" + + # if a relative path is provided, make it relative to the desktop + if raw_path: + out_path = Path(raw_path) + if not out_path.is_absolute(): + out_path = desktop_path / out_path + out_path = out_path.resolve() + + # if the provided path is something strange like an existing file, + # use the default path (Desktop) + if out_path.exists() and not out_path.is_dir(): + out_path = desktop_path + + return out_path diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..611b679 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +""" Test package for the sartorius_logger package """ diff --git a/tests/stubs.py b/tests/stubs.py new file mode 100644 index 0000000..9e76428 --- /dev/null +++ b/tests/stubs.py @@ -0,0 +1,95 @@ +import sartoriusb + +from sartorius_logger.datalogger import DataLogger + + +class DummyLogger(DataLogger): + """ A DataLogger implementation for testing """ + + def __init__(self, *args, **kargs): + super().__init__("log/file.txt", end="") + self.handle = StubFile() + + def open(self): + """ simulates opening the log file for writing """ + pass + + def close(self): + """ simulates closing the log file for writing """ + pass + + +class StubFile: + """ a stub file handle that captures everything written to it """ + + def __init__(self): + self.captured = [] + + def write(self, data): + """ simulates a filehandle write """ + self.captured.append(data) + + def __eq__(self, other): + """ quick way of comparing the captured output to something other """ + return self.captured == other + + +class DummyScale: + """ simulates a sartoriusb.SartoriusUSB class """ + + _responses = { + sartoriusb.CMD_INFO_TYPE: "Model: Quintix-1\r\n", + sartoriusb.CMD_INFO_SNR: "SerialNumber: 0815 \r", + sartoriusb.CMD_INFO_VERSION_SCALE: "ScaleVersion beta1\n", + sartoriusb.CMD_INFO_VERSION_CONTROL_UNIT: "Version.command 1.2.3", + } + + _measurements = [ + "G + 850.1 \r\n", + "G + 851.0 \r\n", + "G + 852.2 \r\n", + "G + 853.0 mg \r\n", + "G + 854.4 mg \r\n", + "G + 857.0 mg \r\n", + "G + 857.0 mg \r\n", + "G + 858.7 mg \r\n", + "G + 859.0 mg \r\n", + "G - 1000.0 mg \r\n", + " Low \r\n", + "G 0.0 \r\n", + "G 0.0 mg \r\n", + ] + + def __init__(self, *args, measurements=None, **kargs): + """ intializes the dummy scale + + :params measurements: overides for the build in measurements + :params responses: overides for the build in responses + """ + if measurements is None: + self.measurements = self._measurements.copy() + else: + self.measurements = measurements + self.responses = self._responses.copy() + self.iterator = iter(self.measurements) + + def get(self, command): + """ simulates the get() method """ + response = self.responses.get(command, False) + return [response] if response else [] + + def measure(self): + """ simulates the measure() method """ + try: + value = next(self.iterator) + return sartoriusb.parse_measurement(value) + except StopIteration: + return sartoriusb.Measurement( + None, None, None, None, "Connection Timeout" + ) + + def __enter__(self): + return self + + def __exit__(self, *args, **kargs): + pass diff --git a/tests/test_sartorius_logger.py b/tests/test_sartorius_logger.py new file mode 100644 index 0000000..a93d029 --- /dev/null +++ b/tests/test_sartorius_logger.py @@ -0,0 +1,290 @@ +""" tests for the sartorius_logger package """ + +import pytest + + +@pytest.fixture(scope="session") +def settings_fixture(): + from sartorius_logger.parsers import ParsedDuration, Settings + from pathlib import Path + + duration = ParsedDuration(5, "m", 5 * 60) + interval = ParsedDuration(10, "s", 10) + directory = Path("/something/made/up") + + yield Settings(duration, interval, directory, "usb") + + +@pytest.fixture() +def logger_fixture(): + from . import stubs + + yield stubs.DummyLogger() + + +@pytest.fixture() +def scale_fixture(): + from . import stubs + + yield stubs.DummyScale() + + +def test_get_scale_info(scale_fixture): + from sartorius_logger import get_scale_info + + result = get_scale_info(scale_fixture) + + assert result == { + "Scale Model": "Quintix-1", + "Scale Serial Number": "0815", + "Software Version of Scale": "beta1", + "Software Version of Control Unit": "1.2.3", + } + +def test_get_scale_info_with_timeout(scale_fixture): + from sartorius_logger import get_scale_info + from sartoriusb import CMD_INFO_SNR + + scale_fixture.responses[CMD_INFO_SNR] = "" + result = get_scale_info(scale_fixture) + + assert result == { + "Scale Model": "Quintix-1", + "Scale Serial Number": "", + "Software Version of Scale": "beta1", + "Software Version of Control Unit": "1.2.3", + } + + +def test_measure_and_log(scale_fixture, logger_fixture): + from sartorius_logger import _measure_and_log + from datetime import datetime + + result = _measure_and_log(42, scale_fixture, logger_fixture) + + reported_time = result.pop("time") + assert isinstance(reported_time, datetime) + assert (datetime.now() - reported_time).total_seconds() < 10 + assert result == { + "nr": 42, + "mode": "G", + "value": 850.1, + "unit": None, + "stable": False, + "message": None, + } + assert logger_fixture.handle == [ + "\t".join(["42", str(reported_time), "G", "+850.1", "", "False", ""]) + ] + + +def test_measure_and_log_with_none_value(logger_fixture): + from sartorius_logger import _measure_and_log + from datetime import datetime + from . import stubs + + scale = stubs.DummyScale(measurements=[]) + result = _measure_and_log(42, scale, logger_fixture) + + reported_time = result.pop("time") + assert isinstance(reported_time, datetime) + assert result == { + "nr": 42, + "mode": None, + "value": None, + "unit": None, + "stable": None, + "message": "Connection Timeout", + } + assert logger_fixture.handle.captured == [ + "\t".join( + ["42", str(reported_time), "", "", "", "", "Connection Timeout"] + ) + ] + + +def test_no_progress_bar_returns_iterator_unchanged(): + from sartorius_logger import no_progress_bar + + iterator = [] + + assert no_progress_bar(iterator) is iterator + + +def test_get_log_file_path(settings_fixture): + from sartorius_logger import _get_log_file_path + from datetime import datetime + from pathlib import Path + + result = _get_log_file_path(settings_fixture) + + assert isinstance(result, Path) + assert result.suffix == ".txt" + dt = datetime.strptime(result.stem, "%Y-%m-%d %H-%M-%S") + assert (datetime.now() - dt).total_seconds() < 10 + + +def test_log_measurement_info(logger_fixture, settings_fixture): + from sartorius_logger import _log_measurement_info + + result = _log_measurement_info(logger_fixture, settings_fixture) + + assert result == { + "Measurements": 31, + "Duration": "5m", + "Interval": "10s", + "Com-Port": "usb", + } + assert logger_fixture.handle == [ + "[Measurement Settings]", + "Measurements\t31", + "Duration\t5m", + "Interval\t10s", + "Com-Port\tusb", + "", + ] + + +def test_log_scale_info(logger_fixture, scale_fixture): + from sartorius_logger import _log_scale_info + + result = _log_scale_info(logger_fixture, scale_fixture) + + assert result == { + "Scale Model": "Quintix-1", + "Scale Serial Number": "0815", + "Software Version of Scale": "beta1", + "Software Version of Control Unit": "1.2.3", + } + assert logger_fixture.handle == [ + "[Scale Info]", + "Scale Model\tQuintix-1", + "Scale Serial Number\t0815", + "Software Version of Scale\tbeta1", + "Software Version of Control Unit\t1.2.3", + "", + ] + + +def test_measure_series(mocker, settings_fixture, logger_fixture): + from sartorius_logger import measure_series, Result + from pandas import DataFrame + from pathlib import Path + + from .stubs import DummyScale + + mocker.patch("sartorius_logger.sartoriusb.SartoriusUsb", DummyScale) + mocker.patch("sartorius_logger.time.sleep") + result = measure_series(settings_fixture, data_logger=logger_fixture) + + assert isinstance(result, Result) + + assert isinstance(result.log_file, Path) + assert result.log_file.parent == Path("log") + assert result.log_file.suffix == ".txt" + + assert isinstance(result.info, DataFrame) + assert result.info.to_dict() == { + 1: { + "Measurements": 31, + "Duration": "5m", + "Interval": "10s", + "Com-Port": "usb", + } + } + + assert isinstance(result.scale, DataFrame) + assert result.scale.to_dict() == { + 1: { + "Scale Model": "Quintix-1", + "Scale Serial Number": "0815", + "Software Version of Scale": "beta1", + "Software Version of Control Unit": "1.2.3", + } + } + + assert isinstance(result.data, DataFrame) + df_dict = {k: list(v.values()) for k, v in result.data.to_dict().items()} + + assert df_dict["nr"] == list(range(1, 32)) + assert df_dict["mode"] == ["G"] * 10 + [None, "G", "G"] + [None] * 18 + # some hassle with nan values + values = [v if "." in str(v) else None for v in df_dict["value"]] + assert ( + values + == [850.1, 851.0, 852.2, 853.0, 854.4, 857.0, 857.0] + + [858.7, 859.0, -1000.0, None, 0.0, 0.0] + + [None] * 18 + ) + assert ( + df_dict["unit"] + == [None, None, None, "mg", "mg", "mg", "mg"] + + ["mg", "mg", "mg", None, None, "mg"] + + [None] * 18 + ) + assert ( + df_dict["stable"] + == [False, False, False, True, True, True, True] + + [True, True, True, None, False, True] + + [None] * 18 + ) + assert ( + df_dict["message"] + == [None] * 10 + ["Low", None, None] + ["Connection Timeout"] * 18 + ) + + log = logger_fixture.handle.captured + assert len(log) == 45 + assert log[0] == "[Measurement Settings]" + assert log[6] == "[Scale Info]" + assert log[12] == "[Measured Data]" + assert log[13] == "Nr\tTime\tMode\tValue\tUnit\tStable\tMessage" + assert log[14].startswith("1\t") + assert log[14].endswith("\tG\t+850.1\t\tFalse\t") + assert log[20].startswith("7\t") + assert log[20].endswith("\tG\t+857.0\tmg\tTrue\t") + assert log[24].startswith("11\t") + assert log[24].endswith("\t\t\t\t\tLow") + assert log[-1].startswith("31\t") + assert log[-1].endswith("\t\t\t\t\tConnection Timeout") + + +def test_export_as_excel(mocker, settings_fixture, logger_fixture): + from sartorius_logger import measure_series, export_as_excel + from pathlib import Path + from tempfile import TemporaryDirectory + from .stubs import DummyScale + + mocker.patch("sartorius_logger.sartoriusb.SartoriusUsb", DummyScale) + mocker.patch("sartorius_logger.time.sleep") + result = measure_series(settings_fixture, data_logger=logger_fixture) + + with TemporaryDirectory() as temp_path: + temp_log_file = Path(temp_path) / "measurement.txt" + expected_xls_path = Path(temp_path) / "measurement.xlsx" + result = result._replace(log_file=temp_log_file) + export_as_excel(result) + + assert expected_xls_path.is_file() + + +def test_cli(mocker, settings_fixture): + mocker.patch("sartorius_logger.parse_cli_arguments", return_value=settings_fixture) + mocker.patch("sartorius_logger.measure_series", return_value="Results") + mocker.patch("sartorius_logger.export_as_excel") + + from sartorius_logger import cli, export_as_excel, measure_series + from sartorius_logger.datalogger import DataLogger + from tqdm import tqdm + from unittest.mock import call + + cli() + + assert measure_series.call_count == 1 + print(measure_series.call_args) + call_args, call_kargs = measure_series.call_args + assert call_args == (settings_fixture, ) + assert call_kargs["progress_bar"] == tqdm + assert isinstance(call_kargs["data_logger"], DataLogger) + assert export_as_excel.call_count == 1 + assert export_as_excel.call_args == call("Results") diff --git a/tests/test_sartorius_logger_datalogger.py b/tests/test_sartorius_logger_datalogger.py new file mode 100644 index 0000000..0f1c4cb --- /dev/null +++ b/tests/test_sartorius_logger_datalogger.py @@ -0,0 +1,206 @@ +""" tests for the sartorius_logger.datalogger module """ + +import pytest + +from unittest.mock import call + + +@pytest.mark.parametrize( + "value,expected", + [ + ([], True), + ((), True), + ({}, True), + (set(), True), + (iter("A"), True), + ("string", False), + (b"bytes", False), + ], +) +def test_is_container(value, expected): + from sartorius_logger.datalogger import is_container + + result = is_container(value) + + assert result == expected + + +def test_datalogger_init(): + from sartorius_logger.datalogger import DataLogger + from pathlib import Path + + logger = DataLogger("/logs", none_rep="", sep=";", end="
") + + assert isinstance(logger.path, Path) + assert logger.path == Path("/logs") + assert logger.none_rep == "" + assert logger.sep == ";" + assert logger.end == "
" + assert logger.handle is None + + +def test_datalogger_open(mocker): + from sartorius_logger.datalogger import DataLogger + from pathlib import Path + from tempfile import TemporaryDirectory + + with TemporaryDirectory() as tmp_dir: + tmp_dir_path = Path(tmp_dir) + log_folder = tmp_dir_path / "new_folder" + log_path = log_folder / "test.log" + + logger = DataLogger(log_path) + logger.open() + + assert log_folder.exists() + assert log_path.exists() + + logger.close() + + +def test_datalogger_open_on_already_opened_connection(mocker): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path") + logger.handle = "not None" + logger.path = mocker.Mock() + + logger.open() + + assert logger.path.open.call_count == 0 + + +def test_datalogger_close(mocker): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path") + logger.handle = mocker.Mock() + mocked_handle = logger.handle + + logger.close() + + assert mocked_handle.close.call_count == 1 + assert logger.handle is None + + +@pytest.mark.parametrize( + "value,add_count,list_count", [(["a"], 0, 1), ("b", 1, 0)] +) +def test_datalogger_call(mocker, value, add_count, list_count): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path") + mocker.patch.object(logger, "add") + mocker.patch.object(logger, "add_list") + + logger(value) + + assert logger.add.call_count == add_count + assert logger.add_list.call_count == list_count + + +def test_datalogger_add(mocker): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path", end="
") + logger.handle = mocker.Mock() + + logger.add("some \n text \r lines") + + assert logger.handle.write.call_count == 1 + assert logger.handle.write.call_args == call( + "some
text
lines
" + ) + + +def test_datalogger_add_raises_error_if_logfile_not_opened(): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path") + + with pytest.raises(IOError): + logger.add("whoops") + + +def test_datalogger_add_list(mocker): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path", none_rep="REDACTED", sep="; ", end="
") + logger.add = mocker.Mock() + + logger.add_list(["foo", 1, None, "", "bar"]) + + assert logger.add.call_count == 1 + assert logger.add.call_args == call("foo; 1; REDACTED; ; bar") + + +@pytest.mark.parametrize("add_empty_line", [True, False]) +def test_datalogger_add_section(mocker, add_empty_line): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path", none_rep="REDACTED", sep="; ", end="
") + logger.add = mocker.Mock() + + logger.add_section( + "A really cool title\n", [[1, 2], [3, 4, 5]], add_empty_line + ) + + assert logger.add.call_count == 4 if add_empty_line else 3 + assert logger.add.call_args_list[:3] == [ + call("[A really cool title]"), + call("1; 2"), + call("3; 4; 5"), + ] + if add_empty_line: + logger.add.call_args = call("") + + +def test_datalogger_as_context_manager(mocker): + from sartorius_logger.datalogger import DataLogger + + logger = DataLogger("log_path") + logger.open = mocker.Mock() + logger.close = mocker.Mock() + + with logger as context: + assert logger is context + + assert logger.open.call_count == 1 + assert logger.close.call_count == 1 + + +def test_datalogger_null_logger_init_should_not_raise_error(mocker): + from sartorius_logger.datalogger import NullLogger + + NullLogger("log_path", end="
", unknown="nothing") + + +def test_datalogger_null_logger_is_callable(mocker): + from sartorius_logger.datalogger import NullLogger + + nl = NullLogger() + + assert nl("whatever") is None + + +def test_datalogger_null_logger_attribute_access_returns_instance(mocker): + from sartorius_logger.datalogger import NullLogger + + nl = NullLogger() + + assert nl.some_really_weird_attribute is nl + + +def test_datalogger_null_logger_allows_method_calls(mocker): + from sartorius_logger.datalogger import NullLogger + + nl = NullLogger() + + assert nl.calling_any_method(1, b=2) is None + + +def test_datalogger_null_as_context_manager(mocker): + from sartorius_logger.datalogger import NullLogger + + with NullLogger() as nl: + assert isinstance(nl, NullLogger) diff --git a/tests/test_sartorius_logger_parsers.py b/tests/test_sartorius_logger_parsers.py new file mode 100644 index 0000000..cd3e54f --- /dev/null +++ b/tests/test_sartorius_logger_parsers.py @@ -0,0 +1,131 @@ +""" tests for the sartorius_logger.parsers module """ + +import pytest + +from collections import namedtuple + + +@pytest.mark.parametrize( + "text,value,unit,seconds", + [ + ("1", 1, "h", 60 * 60), + ("12", 12, "h", 12 * 60 * 60), + ("3s", 3, "s", 3), + ("4sec", 4, "s", 4), + ("5M", 5, "m", 5 * 60), + ("6mins", 6, "m", 6 * 60), + ("7h", 7, "h", 7 * 60 * 60), + ("8hours", 8, "h", 8 * 60 * 60), + ("9x", 9, "h", 9 * 60 * 60), + ("10s", 10, "s", 10), + ("11.5 m", 11, "m", 11 * 60), + ], +) +def test_parse_duration(text, value, unit, seconds): + from sartorius_logger.parsers import parse_duration, ParsedDuration + + result = parse_duration(text, default_unit="h") + + assert isinstance(result, ParsedDuration) + assert result == (value, unit, seconds) + + +@pytest.mark.parametrize("value", ["m", "", "x1"]) +def test_parse_duration_none_if_not_number_on_start(value): + from sartorius_logger.parsers import parse_duration + + result = parse_duration(value) + + assert result is None + + +@pytest.mark.parametrize( + "value,expected", + [ + ("s", "s"), + ("m", "m"), + ("h", "h"), + ("sec", "s"), + ("min", "m"), + ("hours", "h"), + ("seconds", "s"), + ("minutes", "m"), + ("hours", "h"), + ("", ""), + ("unknown", ""), + ], +) +def test_normalize_time_unit(value, expected): + from sartorius_logger.parsers import _normalize_time_unit + + result = _normalize_time_unit(value, default_unit="") + + assert result == expected + + +def test_parse_cli_arguments(mocker): + from sartorius_logger.parsers import parse_cli_arguments + + mocked_argparse = mocker.patch( + "sartorius_logger.parsers.argparse.ArgumentParser" + ) + mocked_normalize = mocker.patch( + "sartorius_logger.parsers._normalize_cli_arguments" + ) + + parse_cli_arguments() + + assert mocked_argparse.call_count == 1 + assert mocked_normalize.call_count == 1 + + +def test_normalize_cli_arguments(mocker): + from sartorius_logger.parsers import ( + _normalize_cli_arguments, + Settings, + ) + import pathlib + + Dummy = namedtuple("Dummy", ["duration", "interval", "output", "port"]) + arguments = Dummy("2h", "30min", "New Folder", "usb") + + result = _normalize_cli_arguments(arguments) + + assert isinstance(result, Settings) + assert result.duration == (2, "h", 2 * 60 * 60) + assert result.interval == (30, "m", 30 * 60) + assert result.port == "usb" + assert isinstance(result.directory, pathlib.Path) + assert result.directory.name == "New Folder" + assert result.directory.parent.name == "Desktop" + + +@pytest.mark.parametrize( + "value,expected", + [ + ("", ("example", "Desktop")), + ("Some Folder", ("example", "Desktop", "Some Folder")), + ("/Some/Folder", ("Some", "Folder",)), + ], +) +def test_check_output_directory_path(mocker, value, expected): + from sartorius_logger.parsers import _check_output_directory_path, Path + + mocker.patch.object(Path, "home", return_value=Path("/example")) + mocker.patch.object(Path, "exists", return_value=False) + + result = _check_output_directory_path(value) + + assert result == Path("/").joinpath(*expected) + + +def test_check_output_directory_path_exist_not_dir(mocker): + from sartorius_logger.parsers import _check_output_directory_path, Path + + mocker.patch.object(Path, "home", return_value=Path("/example")) + mocker.patch.object(Path, "exists", return_value=True) + mocker.patch.object(Path, "is_dir", return_value=False) + + result = _check_output_directory_path("/some/existing/folder") + + assert result == Path("/") / "example" / "Desktop"