From ef00e807004d957dfb50d548c0a0de3c3f0e4e3a Mon Sep 17 00:00:00 2001 From: Holger Frey Date: Thu, 12 Mar 2020 23:03:26 +0100 Subject: [PATCH] moved main code into .parser submodule this should lead to a cleaner structur when a cli module is added. Also the public facing methods are now clearly defined. --- sensovation_data_parser/__init__.py | 202 +--------- sensovation_data_parser/parser.py | 211 ++++++++++ tests/test_parser.py | 552 ++++++++++++++++++++++++++ tests/test_sensovation_data_parser.py | 534 +------------------------ 4 files changed, 776 insertions(+), 723 deletions(-) create mode 100644 sensovation_data_parser/parser.py create mode 100644 tests/test_parser.py diff --git a/sensovation_data_parser/__init__.py b/sensovation_data_parser/__init__.py index f386e74..0211431 100644 --- a/sensovation_data_parser/__init__.py +++ b/sensovation_data_parser/__init__.py @@ -6,200 +6,10 @@ Parsing the numerical output from Sensovation image analysis. __version__ = "0.0.1" -import re -from pathlib import Path -from collections import namedtuple - -import pandas -from defusedxml import ElementTree - -REGEX_WELL = re.compile( - r""" - (?P([A-Z]+)) # row name containing one or more letters - (?P(\d+)) # column, one or more decimals - """, - re.VERBOSE | re.IGNORECASE, +from .parser import ( # noqa: F401 + ExposureInfo, + parse_file, + parse_folder, + process_folder, + parse_multiple_files, ) - -COLUMNS_TO_DROP = ["Rect.", "Contour"] -COLUMNS_RENAME_MAP = { - " ID ": "Pos.Id", - "Found": "Spot.Found", - "Dia.": "Spot.Diameter", -} - -CACHE_FILE_NAME = "cached_data.h5" -CACHE_TABLE_NAME = f"raw_data_v{__version__}" - -FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) -ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"]) - - -def _guess_decimal_separator(file_handle): - """ guesses the decimal spearator of a opened data file """ - file_handle.seek(0) - headers = next(file_handle) # noqa: F841 - data = next(file_handle) - separator = "," if data.count(",") > data.count(".") else "." - file_handle.seek(0) - return separator - - -def _parse_csv(data_file): - """ parse a csv sensovation data file """ - data_path = Path(data_file) - with data_path.open("r") as handle: - decimal_sep = _guess_decimal_separator(handle) - return pandas.read_csv(handle, sep="\t", decimal=decimal_sep) - - -def _extract_measurement_info(data_file): - """ extract measurement meta data from a file name """ - data_path = Path(data_file) - *rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841 - matched = REGEX_WELL.match(well) - if matched is None: - raise ValueError(f"not a valid well: '{well}'") - row = matched["row"].upper() - column = int(matched["column"]) - exposure = int(exposure) - return FileInfo(row, column, exposure) - - -def _cleanup_data_columns(data_frame): - """ renames some data columns for consistency and drops unused columns """ - renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP) - return renamed.drop(columns=COLUMNS_TO_DROP) - - -def parse_file(data_file): - """ parses one data file and adds metadata to result """ - measurement_info = _extract_measurement_info(data_file) - data_frame = _parse_csv(data_file) - data_frame["Field.Row"] = measurement_info.row - data_frame["Field.Column"] = measurement_info.column - data_frame["Exposure.Id"] = measurement_info.exposure - return _cleanup_data_columns(data_frame) - - -def parse_multiple_files(file_list): - """ parses a list of file paths to one combined dataframe """ - if not file_list: - raise ValueError("Empty file list provided") - collection = (parse_file(path) for path in file_list) - data_frame = next(collection) - for next_frame in collection: - data_frame = data_frame.append(next_frame, ignore_index=True) - return data_frame - - -def _list_csv_files(folder): - """ returns all csv files in a folder """ - folder_path = Path(folder) - files = (item for item in folder_path.iterdir() if item.is_file()) - visible = (item for item in files if not item.stem.startswith(".")) - return (item for item in visible if item.suffix.lower() == ".csv") - - -def _sanity_check(data_frame): - """ checks some basic constrains of a combined data frame """ - field_rows = len(data_frame["Field.Row"].unique()) - field_cols = len(data_frame["Field.Column"].unique()) - exposures = len(data_frame["Exposure.Id"].unique()) - spot_positions = len(data_frame["Pos.Id"].unique()) - expected_rows = field_rows * field_cols * exposures * spot_positions - if expected_rows != len(data_frame): - raise ValueError("Measurements are missing") - return data_frame - - -def parse_folder(folder): - """ parses all csv files in a folder to one large dataframe """ - file_list = _list_csv_files(folder) - data_frame = parse_multiple_files(file_list) - return data_frame - - -def _search_channel_info_file(folder): - """ searches for a exposure settings file in a folder """ - folder_path = Path(folder) - params_folder = folder_path / "Parameters" - if not params_folder.is_dir(): - return None - param_files = list(params_folder.glob("**/*.svexp")) - if len(param_files) == 1: - return param_files[0] - else: - return None - - -def _parse_channel_info(channel_file): - """ parses the cannel informations from a settings file """ - file_path = Path(channel_file) - with file_path.open("r") as file_handle: - tree = ElementTree.parse(file_handle) - result = {} - for child in tree.find("Channels"): - # child.tag == "ChannelConfig1" - exposure = int(child.tag[-1]) - channel_description = child.attrib["Description"] - # channel_description == "Cy3/Cy5 Green" - channel = channel_description.rsplit(" ", 1)[-1] - time = int(child.attrib["ExposureTimeMs"]) - result[exposure] = ExposureInfo(channel.lower(), time) - return result - - -def _get_valid_exposure_info(folder, data_frame, exposure_info=None): - """ returns valid exposure information """ - available_exposures = set(data_frame["Exposure.Id"].unique()) - if exposure_info is None: - params_file = _search_channel_info_file(folder) - if params_file is not None: - exposure_info = _parse_channel_info(params_file) - if exposure_info is not None: - if available_exposures == set(exposure_info.keys()): - return exposure_info - return {c: ExposureInfo(None, None) for c in available_exposures} - - -def _augment_exposure_info(data_frame, exposure_info): - data_frame["Exposure.Channel"] = "" - data_frame["Exposure.Time"] = 0 - for exposure_id, info in exposure_info.items(): - mask = data_frame["Exposure.Id"] == exposure_id - data_frame.loc[mask, "Exposure.Channel"] = info.channel - data_frame.loc[mask, "Exposure.Time"] = info.time - return data_frame - - -def _process_folder(folder, exposures=None): - """ parses all csv files in a folder, adds some checks and more data """ - data_frame = parse_folder(folder) - exposures = _get_valid_exposure_info(folder, data_frame, exposures) - data_frame = _augment_exposure_info(data_frame, exposures) - data_frame["Field.Row"] = data_frame["Field.Row"].astype("category") - data_frame["Exposure.Channel"] = data_frame["Exposure.Channel"].astype( - "category" - ) - return data_frame - - -def process_folder(folder, exposures=None, use_cache=True): - """ parses all csv files in a folder, adds some checks and more data """ - hdf5_path = folder / CACHE_FILE_NAME - if use_cache: - try: - return pandas.read_hdf(hdf5_path, CACHE_TABLE_NAME) - except (FileNotFoundError, KeyError): - # either file or table doesn't exist - pass - data_frame = _process_folder(folder, exposures) - if use_cache: - try: - data_frame.to_hdf(hdf5_path, CACHE_TABLE_NAME, format="table") - except OSError: - # capturing high level OSError - # read only filesystems don't throw a more specific exception - pass - return data_frame diff --git a/sensovation_data_parser/parser.py b/sensovation_data_parser/parser.py new file mode 100644 index 0000000..84756eb --- /dev/null +++ b/sensovation_data_parser/parser.py @@ -0,0 +1,211 @@ +""" Sensovation Data Parser + +Parsing the numerical output from Sensovation image analysis. +""" + +import re +from pathlib import Path +from collections import namedtuple + +import pandas +from defusedxml import ElementTree + +REGEX_WELL = re.compile( + r""" + (?P([A-Z]+)) # row name containing one or more letters + (?P(\d+)) # column, one or more decimals + """, + re.VERBOSE | re.IGNORECASE, +) + +COLUMNS_TO_DROP = ["Rect.", "Contour"] +COLUMNS_RENAME_MAP = { + " ID ": "Pos.Id", + "Found": "Spot.Found", + "Dia.": "Spot.Diameter", +} + +CACHE_FILE_NAME = "raw_data.h5" + +FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) +ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"]) + + +def _get_cache_table_name(): + """ automatic hdf5 table name, avoids a circular import """ + from . import __version__ + + return f"v{__version__}" + + +def _guess_decimal_separator(file_handle): + """ guesses the decimal spearator of a opened data file """ + file_handle.seek(0) + headers = next(file_handle) # noqa: F841 + data = next(file_handle) + separator = "," if data.count(",") > data.count(".") else "." + file_handle.seek(0) + return separator + + +def _parse_csv(data_file): + """ parse a csv sensovation data file """ + data_path = Path(data_file) + with data_path.open("r") as handle: + decimal_sep = _guess_decimal_separator(handle) + return pandas.read_csv(handle, sep="\t", decimal=decimal_sep) + + +def _extract_measurement_info(data_file): + """ extract measurement meta data from a file name """ + data_path = Path(data_file) + *rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841 + matched = REGEX_WELL.match(well) + if matched is None: + raise ValueError(f"not a valid well: '{well}'") + row = matched["row"].upper() + column = int(matched["column"]) + exposure = int(exposure) + return FileInfo(row, column, exposure) + + +def _cleanup_data_columns(data_frame): + """ renames some data columns for consistency and drops unused columns """ + renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP) + return renamed.drop(columns=COLUMNS_TO_DROP) + + +def parse_file(data_file): + """ parses one data file and adds metadata to result """ + measurement_info = _extract_measurement_info(data_file) + data_frame = _parse_csv(data_file) + data_frame["Field.Row"] = measurement_info.row + data_frame["Field.Column"] = measurement_info.column + data_frame["Exposure.Id"] = measurement_info.exposure + return _cleanup_data_columns(data_frame) + + +def parse_multiple_files(file_list): + """ parses a list of file paths to one combined dataframe """ + if not file_list: + raise ValueError("Empty file list provided") + collection = (parse_file(path) for path in file_list) + data_frame = next(collection) + for next_frame in collection: + data_frame = data_frame.append(next_frame, ignore_index=True) + return data_frame + + +def _list_csv_files(folder): + """ returns all csv files in a folder """ + folder_path = Path(folder) + files = (item for item in folder_path.iterdir() if item.is_file()) + visible = (item for item in files if not item.stem.startswith(".")) + return (item for item in visible if item.suffix.lower() == ".csv") + + +def _sanity_check(data_frame): + """ checks some basic constrains of a combined data frame """ + field_rows = len(data_frame["Field.Row"].unique()) + field_cols = len(data_frame["Field.Column"].unique()) + exposures = len(data_frame["Exposure.Id"].unique()) + spot_positions = len(data_frame["Pos.Id"].unique()) + expected_rows = field_rows * field_cols * exposures * spot_positions + if expected_rows != len(data_frame): + raise ValueError("Measurements are missing") + return data_frame + + +def parse_folder(folder): + """ parses all csv files in a folder to one large dataframe """ + file_list = _list_csv_files(folder) + data_frame = parse_multiple_files(file_list) + return data_frame + + +def _search_channel_info_file(folder): + """ searches for a exposure settings file in a folder """ + folder_path = Path(folder) + params_folder = folder_path / "Parameters" + if not params_folder.is_dir(): + return None + param_files = list(params_folder.glob("**/*.svexp")) + if len(param_files) == 1: + return param_files[0] + else: + return None + + +def _parse_channel_info(channel_file): + """ parses the cannel informations from a settings file """ + file_path = Path(channel_file) + with file_path.open("r") as file_handle: + tree = ElementTree.parse(file_handle) + result = {} + for child in tree.find("Channels"): + # child.tag == "ChannelConfig1" + exposure = int(child.tag[-1]) + channel_description = child.attrib["Description"] + # channel_description == "Cy3/Cy5 Green" + channel = channel_description.rsplit(" ", 1)[-1] + time = int(child.attrib["ExposureTimeMs"]) + result[exposure] = ExposureInfo(channel.lower(), time) + return result + + +def _get_valid_exposure_map(folder, data_frame, exposure_map=None): + """ returns valid exposure information """ + available_exposures = set(data_frame["Exposure.Id"].unique()) + if exposure_map is None: + params_file = _search_channel_info_file(folder) + if params_file is not None: + exposure_map = _parse_channel_info(params_file) + if exposure_map is not None: + if available_exposures == set(exposure_map.keys()): + return exposure_map + return {c: ExposureInfo(None, None) for c in available_exposures} + + +def _augment_exposure_map(data_frame, exposure_map): + data_frame["Exposure.Channel"] = "" + data_frame["Exposure.Time"] = 0 + for exposure_id, info in exposure_map.items(): + channel, time = info + mask = data_frame["Exposure.Id"] == exposure_id + data_frame.loc[mask, "Exposure.Channel"] = channel + data_frame.loc[mask, "Exposure.Time"] = time + return data_frame + + +def _process_folder(folder, exposures=None): + """ parses all csv files in a folder, adds some checks and more data """ + data_frame = parse_folder(folder) + exposures = _get_valid_exposure_map(folder, data_frame, exposures) + data_frame = _augment_exposure_map(data_frame, exposures) + data_frame["Field.Row"] = data_frame["Field.Row"].astype("category") + data_frame["Exposure.Channel"] = data_frame["Exposure.Channel"].astype( + "category" + ) + return data_frame + + +def process_folder(folder, exposures=None, use_cache=True): + """ parses all csv files in a folder, adds some checks and more data """ + hdf5_path = folder / CACHE_FILE_NAME + if use_cache: + try: + return pandas.read_hdf(hdf5_path, _get_cache_table_name()) + except (FileNotFoundError, KeyError): + # either file or table doesn't exist + pass + data_frame = _process_folder(folder, exposures) + if use_cache: + try: + data_frame.to_hdf( + hdf5_path, _get_cache_table_name(), format="table" + ) + except OSError: + # capturing high level OSError + # read only filesystems don't throw a more specific exception + pass + return data_frame diff --git a/tests/test_parser.py b/tests/test_parser.py new file mode 100644 index 0000000..3bcdfe9 --- /dev/null +++ b/tests/test_parser.py @@ -0,0 +1,552 @@ +""" Stub file for testing the project """ + +from pathlib import Path + +import numpy +import pytest + +EXAMPLE_DIR_WO_PARAMS = "mtp_wo_parameters" +EXAMPLE_DIR_WITH_PARAMS = "mtp_with_parameters" + + +@pytest.fixture +def example_dir(request): + root_dir = Path(request.config.rootdir) + yield root_dir / "example_data" + + +@pytest.fixture +def example_file(example_dir): + data_dir = example_dir / EXAMPLE_DIR_WO_PARAMS + yield data_dir / "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv" + + +@pytest.fixture +def exposure_df(): + from pandas import DataFrame + + yield DataFrame(data={"Exposure.Id": [1, 2, 3]}) + + +@pytest.fixture +def dir_for_caching(tmpdir, example_file): + import shutil + + temp_path = Path(tmpdir) + dest = temp_path / example_file.name + shutil.copy(example_file, dest) + yield temp_path + + +@pytest.mark.parametrize( + "sub_dir, file_name", + [ + ( + EXAMPLE_DIR_WO_PARAMS, + "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", + ), + ( + EXAMPLE_DIR_WITH_PARAMS, + "160210_SG2-010-001_Regen_cy3100_1_A1_1.csv", + ), + ], +) +def test_parse_csv(example_dir, sub_dir, file_name): + from sensovation_data_parser.parser import _parse_csv + + result = _parse_csv(example_dir / sub_dir / file_name) + + columns = { + " ID ": numpy.int64, + "Pos.X": numpy.int64, + "Pos.Y": numpy.int64, + "Bkg.Mean": float, + "Spot.Mean": float, + "Bkg.Median": float, + "Spot.Median": float, + "Bkg.StdDev": float, + "Spot.StdDev": float, + "Bkg.Sum": numpy.int64, + "Spot.Sum": numpy.int64, + "Bkg.Area": numpy.int64, + "Spot.Area": numpy.int64, + "Spot.Sat. (%)": numpy.int64, + "Found": numpy.bool_, + "Pos.Nom.X": numpy.int64, + "Pos.Nom.Y": numpy.int64, + "Dia.": numpy.int64, + "Rect.": str, + "Contour": object, # ignore the type of contour + } + + assert set(result.columns) == set(columns.keys()) + assert len(result[" ID "].unique()) == 100 + assert len(result) == 100 + for column, value_type in columns.items(): + assert isinstance(result[column][0], value_type) + + +def test_parse_csv_no_array(example_dir): + from sensovation_data_parser.parser import _parse_csv + + result = _parse_csv(example_dir / "no_array_A1_1.csv") + + assert len(result) == 1 + assert result[" ID "][0] == 0 + + +@pytest.mark.parametrize( + "input, expected", [("", "."), ("..,", "."), (".,,", ","), ("..,,", "."),] +) +def test_guess_decimal_separator_returns_correct_separator(input, expected): + from sensovation_data_parser.parser import _guess_decimal_separator + from io import StringIO + + handle = StringIO(f"header\n{input}\n") + result = _guess_decimal_separator(handle) + + assert result == expected + + +def test_guess_decimal_separator_rewinds_handle(): + from sensovation_data_parser.parser import _guess_decimal_separator + from io import StringIO + + handle = StringIO(f"header\n{input}\n") + _guess_decimal_separator(handle) + + assert next(handle) == "header\n" + + +def test_well_regex_ok(): + from sensovation_data_parser.parser import REGEX_WELL + + result = REGEX_WELL.match("AbC123") + + assert result["row"] == "AbC" + assert result["column"] == "123" + + +@pytest.mark.parametrize("input", ["", "A", "1", "1A", "-1", "A-"]) +def test_well_regex_no_match(input): + from sensovation_data_parser.parser import REGEX_WELL + + result = REGEX_WELL.match(input) + + assert result is None + + +@pytest.mark.parametrize( + "filename, expected", + [("A1_1.csv", ("A", 1, 1)), ("test/measurement_1_H12_2", ("H", 12, 2)),], +) +def test_extract_measurement_info_ok(filename, expected): + from sensovation_data_parser.parser import _extract_measurement_info + + result = _extract_measurement_info(filename) + + assert result == expected + + +@pytest.mark.parametrize("filename", ["wrong_exposure_A1_B", "no_well_XX_1"]) +def test_extract_measurement_info_raises_error(filename): + from sensovation_data_parser.parser import _extract_measurement_info + + with pytest.raises(ValueError): + _extract_measurement_info(filename) + + +def test_cleanup_data_columns(): + from sensovation_data_parser.parser import _cleanup_data_columns + from pandas import DataFrame + + columns = ["Rect.", "Contour", " ID ", "Found", "Dia."] + data = {col: [i] for i, col in enumerate(columns)} + data_frame = DataFrame(data=data) + + result = _cleanup_data_columns(data_frame) + + assert set(result.columns) == {"Pos.Id", "Spot.Found", "Spot.Diameter"} + assert result["Pos.Id"][0] == 2 + assert result["Spot.Found"][0] == 3 + assert result["Spot.Diameter"][0] == 4 + + +def test_parse_file(example_file): + from sensovation_data_parser.parser import parse_file + + result = parse_file(example_file) + + columns = { + "Pos.Id", + "Pos.X", + "Pos.Y", + "Bkg.Mean", + "Spot.Mean", + "Bkg.Median", + "Spot.Median", + "Bkg.StdDev", + "Spot.StdDev", + "Bkg.Sum", + "Spot.Sum", + "Bkg.Area", + "Spot.Area", + "Spot.Sat. (%)", + "Spot.Found", + "Pos.Nom.X", + "Pos.Nom.Y", + "Spot.Diameter", + "Field.Row", + "Field.Column", + "Exposure.Id", + } + + assert set(result.columns) == columns + assert result["Field.Row"][0] == "A" + assert result["Field.Column"][0] == 1 + assert result["Exposure.Id"][0] == 1 + + +@pytest.mark.parametrize( + "file_list", + [ + [ + "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", + "160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv", + ], + ["160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv"], + ], +) +def testparse_multiple_files_ok(example_dir, file_list): + from sensovation_data_parser.parser import parse_multiple_files + + sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS + files = [sub_dir / file for file in file_list] + + data_frame = parse_multiple_files(files) + print(data_frame["Exposure.Id"].unique()) + + assert len(data_frame) == 100 * len(files) + assert len(data_frame["Exposure.Id"].unique()) == len(files) + + +def testparse_multiple_files_empty_file_list(): + from sensovation_data_parser.parser import parse_multiple_files + + with pytest.raises(ValueError): + parse_multiple_files([]) + + +def testparse_multiple_files_empty_array(example_dir): + from sensovation_data_parser.parser import parse_multiple_files + + files = [example_dir / "no_array_A1_1.csv"] + + data_frame = parse_multiple_files(files) + print(data_frame["Exposure.Id"].unique()) + + assert len(data_frame) == 1 + + +def test_list_csv_files(example_dir): + from sensovation_data_parser.parser import _list_csv_files + + result = list(_list_csv_files(example_dir / EXAMPLE_DIR_WITH_PARAMS)) + + assert len(result) == 36 * 3 + assert all(str(item).endswith(".csv") for item in result) + assert all(not item.stem.startswith(".") for item in result) + + +def test_parse_folder(example_dir): + from sensovation_data_parser.parser import parse_folder + + data_frame = parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS) + + assert len(data_frame) == 36 * 3 * 100 + assert len(data_frame["Field.Row"].unique()) == 3 + assert len(data_frame["Field.Column"].unique()) == 12 + assert len(data_frame["Exposure.Id"].unique()) == 3 + assert len(data_frame["Pos.Id"].unique()) == 100 + + +def test_sanity_check_ok(example_dir): + from sensovation_data_parser.parser import ( + _sanity_check, + parse_multiple_files, + ) + + sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS + file_list = [ + "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", + "160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv", + ] + files = [sub_dir / file for file in file_list] + data_frame = parse_multiple_files(files) + + result = _sanity_check(data_frame) + + assert len(result) == len(data_frame) + + +def test_sanity_check_raises_value_error(example_dir): + from sensovation_data_parser.parser import ( + _sanity_check, + parse_multiple_files, + ) + + sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS + file_list = [ + "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", + "160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv", + ] + files = [sub_dir / file for file in file_list] + data_frame = parse_multiple_files(files) + data_frame = data_frame.drop(data_frame.index[1]) + + with pytest.raises(ValueError): + _sanity_check(data_frame) + + +def test_search_channel_info_file_ok(example_dir): + from sensovation_data_parser.parser import _search_channel_info_file + + result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS) + + assert result.suffix == ".svexp" + + +def test_search_channel_info_file_no_parameters_folder(example_dir): + from sensovation_data_parser.parser import _search_channel_info_file + + result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WO_PARAMS) + + assert result is None + + +def test_search_channel_info_file_no_parameters_file(tmpdir): + from sensovation_data_parser.parser import _search_channel_info_file + + params_dir = tmpdir / "Parameters" + params_dir.mkdir() + + result = _search_channel_info_file(tmpdir) + + assert result is None + + +def test_parse_channel_info(example_dir): + from sensovation_data_parser.parser import ( + _search_channel_info_file, + _parse_channel_info, + ) + + params = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS) + result = _parse_channel_info(params) + + assert set(result.keys()) == {1, 2, 3} + assert result[1] == ("green", 100) + assert result[2] == ("red", 150) + assert result[3] == ("red", 15) + + +def test_get_valid_exposure_map_provided_ok(exposure_df): + from sensovation_data_parser.parser import ( + _get_valid_exposure_map, + ExposureInfo, + ) + + dummy_value = ExposureInfo(None, None) + exposure_map = {1: dummy_value, 2: dummy_value, 3: dummy_value} + + result = _get_valid_exposure_map( + "/nonexistent", exposure_df, exposure_map=exposure_map + ) + + assert result == exposure_map + + +def test_get_valid_exposure_map_provided_not_ok(exposure_df): + from sensovation_data_parser.parser import _get_valid_exposure_map + + exposure_map = {1: None, 2: None} + + result = _get_valid_exposure_map( + "/nonexistent", exposure_df, exposure_map=exposure_map + ) + + assert set(result.keys()) == {1, 2, 3} + assert all(v == (None, None) for v in result.values()) + + +def test_get_valid_exposure_map_info_from_file_ok(example_dir, exposure_df): + from sensovation_data_parser.parser import _get_valid_exposure_map + + result = _get_valid_exposure_map( + example_dir / EXAMPLE_DIR_WITH_PARAMS, exposure_df, exposure_map=None + ) + + assert set(result.keys()) == {1, 2, 3} + assert result[1] == ("green", 100) + assert result[2] == ("red", 150) + assert result[3] == ("red", 15) + + +def test_get_valid_exposure_map_info_from_file_not_ok( + example_dir, exposure_df +): + from sensovation_data_parser.parser import _get_valid_exposure_map + + data_frame = exposure_df.drop(exposure_df.index[1]) + + result = _get_valid_exposure_map( + example_dir / EXAMPLE_DIR_WITH_PARAMS, data_frame, exposure_map=None + ) + + assert set(result.keys()) == {1, 3} + assert all(v == (None, None) for v in result.values()) + + +def test_augment_exposure_map(exposure_df): + from sensovation_data_parser.parser import ( + _augment_exposure_map, + ExposureInfo, + ) + + exposure_map = { + 1: ExposureInfo("red", 10), + 2: ExposureInfo("green", 20), + 3: ExposureInfo("blue", 50), + } + + result = _augment_exposure_map(exposure_df, exposure_map) + + assert result["Exposure.Id"][0] == 1 + assert result["Exposure.Channel"][0] == "red" + assert result["Exposure.Time"][0] == 10 + assert result["Exposure.Id"][1] == 2 + assert result["Exposure.Channel"][1] == "green" + assert result["Exposure.Time"][1] == 20 + assert result["Exposure.Id"][2] == 3 + assert result["Exposure.Channel"][2] == "blue" + assert result["Exposure.Time"][2] == 50 + + +def test_process_folder_with_exposure_map(example_dir): + from sensovation_data_parser.parser import _process_folder + + result = _process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS) + + assert len(result) == 36 * 100 * 3 + + expected = [(1, "green", 100), (2, "red", 150), (3, "red", 15)] + for exposure_id, channel, time in expected: + mask = result["Exposure.Id"] == exposure_id + example_row = result.loc[mask].iloc[1] + assert example_row["Exposure.Channel"] == channel + assert example_row["Exposure.Time"] == time + + +def test_process_folder_without_exposure_map(example_dir): + from sensovation_data_parser.parser import _process_folder + from pandas import isnull + + result = _process_folder(example_dir / EXAMPLE_DIR_WO_PARAMS) + + assert len(result) == 96 * 100 * 3 + + for exposure_id in range(1, 4): + mask = result["Exposure.Id"] == exposure_id + example_row = result.loc[mask].iloc[1] + print(type(example_row["Exposure.Channel"])) + assert isnull(example_row["Exposure.Channel"]) + assert isnull(example_row["Exposure.Time"]) + + +def test_process_folder_creates_cache(dir_for_caching): + from sensovation_data_parser.parser import ( + process_folder, + CACHE_FILE_NAME, + ) + + cache_path = dir_for_caching / CACHE_FILE_NAME + assert not cache_path.is_file() + + result = process_folder(dir_for_caching) + + assert len(result) == 100 + assert cache_path.is_file() + + +def test_process_folder_reads_from_cache(dir_for_caching, example_file): + from sensovation_data_parser.parser import process_folder + + process_folder(dir_for_caching) + + csv_file = dir_for_caching / example_file.name + csv_file.unlink() + + result = process_folder(dir_for_caching) + assert len(result) == 100 + + +def test_process_folder_read_cache_fails_silently( + dir_for_caching, exposure_df +): + from sensovation_data_parser.parser import ( + process_folder, + CACHE_FILE_NAME, + ) + + cache_path = dir_for_caching / CACHE_FILE_NAME + exposure_df.to_hdf(cache_path, "unknown table") + + result = process_folder(dir_for_caching) + + assert result["Field.Row"][0] == "A" + + +def test_get_cache_table_name(): + from sensovation_data_parser.parser import _get_cache_table_name + from sensovation_data_parser import __version__ + + result = _get_cache_table_name() + + assert result.startswith("v") + assert result[1:] == __version__ + + +def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df): + from sensovation_data_parser.parser import ( + process_folder, + _get_cache_table_name, + CACHE_FILE_NAME, + ) + + cache_path = dir_for_caching / CACHE_FILE_NAME + exposure_df.to_hdf(cache_path, _get_cache_table_name()) + + result = process_folder(dir_for_caching, use_cache=False) + + assert result["Field.Row"][0] == "A" + + +def test_process_folder_writes_cache(dir_for_caching): + from sensovation_data_parser.parser import ( + process_folder, + CACHE_FILE_NAME, + ) + + process_folder(dir_for_caching, use_cache=True) + + cache_path = dir_for_caching / CACHE_FILE_NAME + assert cache_path.is_file() + + +def test_process_folder_writes_cache_no_cache_arg(dir_for_caching): + from sensovation_data_parser.parser import process_folder, CACHE_FILE_NAME + + process_folder(dir_for_caching, use_cache=False) + + cache_path = dir_for_caching / CACHE_FILE_NAME + assert not cache_path.is_file() diff --git a/tests/test_sensovation_data_parser.py b/tests/test_sensovation_data_parser.py index a08ba0a..7870bb8 100644 --- a/tests/test_sensovation_data_parser.py +++ b/tests/test_sensovation_data_parser.py @@ -1,529 +1,9 @@ -""" Stub file for testing the project """ +""" testing the __ini__ file """ -from pathlib import Path -import numpy -import pytest - -EXAMPLE_DIR_WO_PARAMS = "mtp_wo_parameters" -EXAMPLE_DIR_WITH_PARAMS = "mtp_with_parameters" - - -@pytest.fixture -def example_dir(request): - root_dir = Path(request.config.rootdir) - yield root_dir / "example_data" - - -@pytest.fixture -def example_file(example_dir): - data_dir = example_dir / EXAMPLE_DIR_WO_PARAMS - yield data_dir / "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv" - - -@pytest.fixture -def exposure_df(): - from pandas import DataFrame - - yield DataFrame(data={"Exposure.Id": [1, 2, 3]}) - - -@pytest.fixture -def dir_for_caching(tmpdir, example_file): - import shutil - - temp_path = Path(tmpdir) - dest = temp_path / example_file.name - shutil.copy(example_file, dest) - yield temp_path - - -@pytest.mark.parametrize( - "sub_dir, file_name", - [ - ( - EXAMPLE_DIR_WO_PARAMS, - "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", - ), - ( - EXAMPLE_DIR_WITH_PARAMS, - "160210_SG2-010-001_Regen_cy3100_1_A1_1.csv", - ), - ], -) -def test_parse_csv(example_dir, sub_dir, file_name): - from sensovation_data_parser import _parse_csv - - result = _parse_csv(example_dir / sub_dir / file_name) - - columns = { - " ID ": numpy.int64, - "Pos.X": numpy.int64, - "Pos.Y": numpy.int64, - "Bkg.Mean": float, - "Spot.Mean": float, - "Bkg.Median": float, - "Spot.Median": float, - "Bkg.StdDev": float, - "Spot.StdDev": float, - "Bkg.Sum": numpy.int64, - "Spot.Sum": numpy.int64, - "Bkg.Area": numpy.int64, - "Spot.Area": numpy.int64, - "Spot.Sat. (%)": numpy.int64, - "Found": numpy.bool_, - "Pos.Nom.X": numpy.int64, - "Pos.Nom.Y": numpy.int64, - "Dia.": numpy.int64, - "Rect.": str, - "Contour": object, # ignore the type of contour - } - - assert set(result.columns) == set(columns.keys()) - assert len(result[" ID "].unique()) == 100 - assert len(result) == 100 - for column, value_type in columns.items(): - assert isinstance(result[column][0], value_type) - - -def test_parse_csv_no_array(example_dir): - from sensovation_data_parser import _parse_csv - - result = _parse_csv(example_dir / "no_array_A1_1.csv") - - assert len(result) == 1 - assert result[" ID "][0] == 0 - - -@pytest.mark.parametrize( - "input, expected", [("", "."), ("..,", "."), (".,,", ","), ("..,,", "."),] -) -def test_guess_decimal_separator_returns_correct_separator(input, expected): - from sensovation_data_parser import _guess_decimal_separator - from io import StringIO - - handle = StringIO(f"header\n{input}\n") - result = _guess_decimal_separator(handle) - - assert result == expected - - -def test_guess_decimal_separator_rewinds_handle(): - from sensovation_data_parser import _guess_decimal_separator - from io import StringIO - - handle = StringIO(f"header\n{input}\n") - _guess_decimal_separator(handle) - - assert next(handle) == "header\n" - - -def test_well_regex_ok(): - from sensovation_data_parser import REGEX_WELL - - result = REGEX_WELL.match("AbC123") - - assert result["row"] == "AbC" - assert result["column"] == "123" - - -@pytest.mark.parametrize("input", ["", "A", "1", "1A", "-1", "A-"]) -def test_well_regex_no_match(input): - from sensovation_data_parser import REGEX_WELL - - result = REGEX_WELL.match(input) - - assert result is None - - -@pytest.mark.parametrize( - "filename, expected", - [("A1_1.csv", ("A", 1, 1)), ("test/measurement_1_H12_2", ("H", 12, 2)),], -) -def test_extract_measurement_info_ok(filename, expected): - from sensovation_data_parser import _extract_measurement_info - - result = _extract_measurement_info(filename) - - assert result == expected - - -@pytest.mark.parametrize("filename", ["wrong_exposure_A1_B", "no_well_XX_1"]) -def test_extract_measurement_info_raises_error(filename): - from sensovation_data_parser import _extract_measurement_info - - with pytest.raises(ValueError): - _extract_measurement_info(filename) - - -def test_cleanup_data_columns(): - from sensovation_data_parser import _cleanup_data_columns - from pandas import DataFrame - - columns = ["Rect.", "Contour", " ID ", "Found", "Dia."] - data = {col: [i] for i, col in enumerate(columns)} - data_frame = DataFrame(data=data) - - result = _cleanup_data_columns(data_frame) - - assert set(result.columns) == {"Pos.Id", "Spot.Found", "Spot.Diameter"} - assert result["Pos.Id"][0] == 2 - assert result["Spot.Found"][0] == 3 - assert result["Spot.Diameter"][0] == 4 - - -def test_parse_file(example_file): - from sensovation_data_parser import parse_file - - result = parse_file(example_file) - - columns = { - "Pos.Id", - "Pos.X", - "Pos.Y", - "Bkg.Mean", - "Spot.Mean", - "Bkg.Median", - "Spot.Median", - "Bkg.StdDev", - "Spot.StdDev", - "Bkg.Sum", - "Spot.Sum", - "Bkg.Area", - "Spot.Area", - "Spot.Sat. (%)", - "Spot.Found", - "Pos.Nom.X", - "Pos.Nom.Y", - "Spot.Diameter", - "Field.Row", - "Field.Column", - "Exposure.Id", - } - - assert set(result.columns) == columns - assert result["Field.Row"][0] == "A" - assert result["Field.Column"][0] == 1 - assert result["Exposure.Id"][0] == 1 - - -@pytest.mark.parametrize( - "file_list", - [ - [ - "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", - "160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv", - ], - ["160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv"], - ], -) -def testparse_multiple_files_ok(example_dir, file_list): - from sensovation_data_parser import parse_multiple_files - - sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS - files = [sub_dir / file for file in file_list] - - data_frame = parse_multiple_files(files) - print(data_frame["Exposure.Id"].unique()) - - assert len(data_frame) == 100 * len(files) - assert len(data_frame["Exposure.Id"].unique()) == len(files) - - -def testparse_multiple_files_empty_file_list(): - from sensovation_data_parser import parse_multiple_files - - with pytest.raises(ValueError): - parse_multiple_files([]) - - -def testparse_multiple_files_empty_array(example_dir): - from sensovation_data_parser import parse_multiple_files - - files = [example_dir / "no_array_A1_1.csv"] - - data_frame = parse_multiple_files(files) - print(data_frame["Exposure.Id"].unique()) - - assert len(data_frame) == 1 - - -def test_list_csv_files(example_dir): - from sensovation_data_parser import _list_csv_files - - result = list(_list_csv_files(example_dir / EXAMPLE_DIR_WITH_PARAMS)) - - assert len(result) == 36 * 3 - assert all(str(item).endswith(".csv") for item in result) - assert all(not item.stem.startswith(".") for item in result) - - -def test_parse_folder(example_dir): - from sensovation_data_parser import parse_folder - - data_frame = parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS) - - assert len(data_frame) == 36 * 3 * 100 - assert len(data_frame["Field.Row"].unique()) == 3 - assert len(data_frame["Field.Column"].unique()) == 12 - assert len(data_frame["Exposure.Id"].unique()) == 3 - assert len(data_frame["Pos.Id"].unique()) == 100 - - -def test_sanity_check_ok(example_dir): - from sensovation_data_parser import _sanity_check, parse_multiple_files - - sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS - file_list = [ - "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", - "160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv", - ] - files = [sub_dir / file for file in file_list] - data_frame = parse_multiple_files(files) - - result = _sanity_check(data_frame) - - assert len(result) == len(data_frame) - - -def test_sanity_check_raises_value_error(example_dir): - from sensovation_data_parser import _sanity_check, parse_multiple_files - - sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS - file_list = [ - "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv", - "160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv", - ] - files = [sub_dir / file for file in file_list] - data_frame = parse_multiple_files(files) - data_frame = data_frame.drop(data_frame.index[1]) - - with pytest.raises(ValueError): - _sanity_check(data_frame) - - -def test_search_channel_info_file_ok(example_dir): - from sensovation_data_parser import _search_channel_info_file - - result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS) - - assert result.suffix == ".svexp" - - -def test_search_channel_info_file_no_parameters_folder(example_dir): - from sensovation_data_parser import _search_channel_info_file - - result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WO_PARAMS) - - assert result is None - - -def test_search_channel_info_file_no_parameters_file(tmpdir): - from sensovation_data_parser import _search_channel_info_file - - params_dir = tmpdir / "Parameters" - params_dir.mkdir() - - result = _search_channel_info_file(tmpdir) - - assert result is None - - -def test_parse_channel_info(example_dir): - from sensovation_data_parser import ( - _search_channel_info_file, - _parse_channel_info, - ) - - params = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS) - result = _parse_channel_info(params) - - assert set(result.keys()) == {1, 2, 3} - assert result[1] == ("green", 100) - assert result[2] == ("red", 150) - assert result[3] == ("red", 15) - - -def test_get_valid_exposure_info_provided_ok(exposure_df): - from sensovation_data_parser import _get_valid_exposure_info - - exposure_info = {1: None, 2: None, 3: None} - - result = _get_valid_exposure_info( - "/nonexistent", exposure_df, exposure_info=exposure_info - ) - - assert result == exposure_info - - -def test_get_valid_exposure_info_provided_not_ok(exposure_df): - from sensovation_data_parser import _get_valid_exposure_info - - exposure_info = {1: None, 2: None} - - result = _get_valid_exposure_info( - "/nonexistent", exposure_df, exposure_info=exposure_info - ) - - assert set(result.keys()) == {1, 2, 3} - assert all(v == (None, None) for v in result.values()) - - -def test_get_valid_exposure_info_info_from_file_ok(example_dir, exposure_df): - from sensovation_data_parser import _get_valid_exposure_info - - result = _get_valid_exposure_info( - example_dir / EXAMPLE_DIR_WITH_PARAMS, exposure_df, exposure_info=None - ) - - assert set(result.keys()) == {1, 2, 3} - assert result[1] == ("green", 100) - assert result[2] == ("red", 150) - assert result[3] == ("red", 15) - - -def test_get_valid_exposure_info_info_from_file_not_ok( - example_dir, exposure_df -): - from sensovation_data_parser import _get_valid_exposure_info - - data_frame = exposure_df.drop(exposure_df.index[1]) - - result = _get_valid_exposure_info( - example_dir / EXAMPLE_DIR_WITH_PARAMS, data_frame, exposure_info=None - ) - - assert set(result.keys()) == {1, 3} - assert all(v == (None, None) for v in result.values()) - - -def test_augment_exposure_info(exposure_df): - from sensovation_data_parser import _augment_exposure_info, ExposureInfo - - exposure_info = { - 1: ExposureInfo("red", 10), - 2: ExposureInfo("green", 20), - 3: ExposureInfo("blue", 50), - } - - result = _augment_exposure_info(exposure_df, exposure_info) - - assert result["Exposure.Id"][0] == 1 - assert result["Exposure.Channel"][0] == "red" - assert result["Exposure.Time"][0] == 10 - assert result["Exposure.Id"][1] == 2 - assert result["Exposure.Channel"][1] == "green" - assert result["Exposure.Time"][1] == 20 - assert result["Exposure.Id"][2] == 3 - assert result["Exposure.Channel"][2] == "blue" - assert result["Exposure.Time"][2] == 50 - - -def test_process_folder_with_exposure_info(example_dir): - from sensovation_data_parser import _process_folder - - result = _process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS) - - assert len(result) == 36 * 100 * 3 - - expected = [(1, "green", 100), (2, "red", 150), (3, "red", 15)] - for exposure_id, channel, time in expected: - mask = result["Exposure.Id"] == exposure_id - example_row = result.loc[mask].iloc[1] - assert example_row["Exposure.Channel"] == channel - assert example_row["Exposure.Time"] == time - - -def test_process_folder_without_exposure_info(example_dir): - from sensovation_data_parser import _process_folder - from pandas import isnull - - result = _process_folder(example_dir / EXAMPLE_DIR_WO_PARAMS) - - assert len(result) == 96 * 100 * 3 - - for exposure_id in range(1, 4): - mask = result["Exposure.Id"] == exposure_id - example_row = result.loc[mask].iloc[1] - print(type(example_row["Exposure.Channel"])) - assert isnull(example_row["Exposure.Channel"]) - assert isnull(example_row["Exposure.Time"]) - - -def test_process_folder_creates_cache(dir_for_caching): - from sensovation_data_parser import ( - process_folder, - CACHE_FILE_NAME, - ) - - cache_path = dir_for_caching / CACHE_FILE_NAME - assert not cache_path.is_file() - - result = process_folder(dir_for_caching) - - assert len(result) == 100 - assert cache_path.is_file() - - -def test_process_folder_reads_from_cache(dir_for_caching, example_file): - from sensovation_data_parser import process_folder - - process_folder(dir_for_caching) - - csv_file = dir_for_caching / example_file.name - csv_file.unlink() - - result = process_folder(dir_for_caching) - assert len(result) == 100 - - -def test_process_folder_read_cache_fails_silently( - dir_for_caching, exposure_df -): - from sensovation_data_parser import ( - process_folder, - CACHE_FILE_NAME, - ) - - cache_path = dir_for_caching / CACHE_FILE_NAME - exposure_df.to_hdf(cache_path, "unknown table") - - result = process_folder(dir_for_caching) - - assert result["Field.Row"][0] == "A" - - -def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df): - from sensovation_data_parser import ( - process_folder, - CACHE_FILE_NAME, - CACHE_TABLE_NAME, - ) - - cache_path = dir_for_caching / CACHE_FILE_NAME - exposure_df.to_hdf(cache_path, CACHE_TABLE_NAME) - - result = process_folder(dir_for_caching, use_cache=False) - - assert result["Field.Row"][0] == "A" - - -def test_process_folder_writes_cache(dir_for_caching): - from sensovation_data_parser import ( - process_folder, - CACHE_FILE_NAME, - ) - - process_folder(dir_for_caching, use_cache=True) - - cache_path = dir_for_caching / CACHE_FILE_NAME - assert cache_path.is_file() - - -def test_process_folder_writes_cache_no_cache_arg(dir_for_caching): - from sensovation_data_parser import process_folder, CACHE_FILE_NAME - - process_folder(dir_for_caching, use_cache=False) - - cache_path = dir_for_caching / CACHE_FILE_NAME - assert not cache_path.is_file() +def test_import_api(): + from sensovation_data_parser import ExposureInfo # noqa: F401 + from sensovation_data_parser import parse_file # noqa: F401 + from sensovation_data_parser import parse_multiple_files # noqa: F401 + from sensovation_data_parser import parse_folder # noqa: F401 + from sensovation_data_parser import process_folder # noqa: F401