From 0290c0a9baecd0de3a7c76fbe28eca74dab146b0 Mon Sep 17 00:00:00 2001 From: Holger Frey Date: Tue, 15 Mar 2022 10:04:56 +0100 Subject: [PATCH] removed "utils" and "dynamic_range" this two modules should be added to a new project. This project should concentrate on on just parsing the data. --- CHANGES.md | 6 + README.md | 54 ++----- sensospot_data/__init__.py | 10 +- sensospot_data/dynamic_range.py | 110 -------------- sensospot_data/parameters.py | 34 ++++- sensospot_data/utils.py | 140 ------------------ tests/test_dynamic_range.py | 249 -------------------------------- tests/test_parameters.py | 59 ++++++++ tests/test_sensovation_data.py | 6 - tests/test_utils.py | 238 ------------------------------ 10 files changed, 112 insertions(+), 794 deletions(-) delete mode 100644 sensospot_data/dynamic_range.py delete mode 100644 sensospot_data/utils.py delete mode 100644 tests/test_dynamic_range.py delete mode 100644 tests/test_utils.py diff --git a/CHANGES.md b/CHANGES.md index dd8d206..9e44fd4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +0.6.0 - doing splits +-------------------- + + - the modules `utils` and `dynamic_range` were deleted and will be moved into a separate project + - the resulting output file format is now a tab-delimered csv for more compability + 0.5.0 - real life fixes ----------------------- diff --git a/README.md b/README.md index 5f20926..7267348 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,7 @@ Sensospot Data Parser ===================== -Parsing the numerical output from Sensovation Sensospot image analysis and some -other useful functions for working with the data. +Parsing the numerical output from Sensovation Sensospot image analysis. ## Example: @@ -13,21 +12,14 @@ other useful functions for working with the data. # read the raw data of a folder raw_data = sensospot_data.parse_folder() - # apply an exposure map to add more data: - # key relates to column "Exposure.Id" - # values are (Exposure.Channel, Exposure.Time) - exposure_map = { - 1: ("Cy3", 100), - 2: ("Cy5", 150), - 3: ("Cy5", 15), - } - enhanced_data = sensospot_data.apply_exposure_map(raw_data, exposure_map) - - # split the measurement according to channels - channels = sensospot_data.split(enhanced_data "Exposure.Channel") - - # merge the two cy5 measurements together, creating an extended dynamic range - cy5_xdr = sensospot_data.create_xdr(channels["cy5"], normalized_time=25) + sorted(raw_data.columns) == [ + 'Bkg.Area', 'Bkg.Mean', 'Bkg.Median', 'Bkg.StdDev', 'Bkg.Sum', + 'Exposure.Id', + 'Parameters.Channel', 'Parameters.Time', + 'Pos.Id', 'Pos.Nom.X', 'Pos.Nom.Y', 'Pos.X', 'Pos.Y', + 'Spot.Area', 'Spot.Diameter', 'Spot.Found', 'Spot.Mean', 'Spot.Median', 'Spot.Saturation', 'Spot.StdDev', 'Spot.Sum', + 'Well.Column', 'Well.Name', 'Well.Row'] + ] ``` ## Avaliable functions: @@ -40,31 +32,7 @@ from .parser import parse_file, parse_folder # noqa: F401 - **parse_file(path_to_csv_file)** Parses the csv file into a pandas data frame and will add additional some meta data from the file name. Is internally also used by `parse_folder()` - - **split(data_frame, column)** - Splits a data frame based on the unique values of a column. Will return a - dict, with the unique values as keys and the corresponding data frame as - value - - **apply_map(data_frame, map, index_col)** - Adds information provided in the nested dictionary `map` to a data frame, - based on the values in the data_frame column `index_col`. - - **apply_exposure_map(data_frame, exposure_map)** - Adds information about the channel and exposure time to a data frame, based - on the exposure id. Will get bonus karma points, if the named tuple - `ExposureInfo` is used: - `{1:ExposureInfo("Cy3", 100), 2:ExposureInfo("Cy3", 100), }` - - **ExposureInfo(exposure_channel, exposure_time)** - A named tuple for defining an exposure map. Usage will increase readability - and karma points. - - **blend(data_frame, [column="Spot.Saturation", limit=2])** - If provided with a data frame with multiple exposure times for the same - exposure channel, the function will blend theese two times together based - on given column and limit. - - **normalize_values(data_frame, [normalized_time=None])** - Adds new columns to the data frame with intensity values recalculated to the - normalized exposure time. If no time is given, the max exposure time is used. - - **create_xdr(data_frame, [normalized_time=None, column="Spot.Saturation", limit=2])** - This combines the methods `blend()` and `normalize_values()` into one call. - What a joy! + ## CLI @@ -76,7 +44,7 @@ Arguments: SOURCE: Folder with Sensospot measurement Options: - -o, --outfile TEXT Output file name, relative to SOURCE, defaults to 'raw_data.h5' + -o, --outfile TEXT Output file name, relative to SOURCE, defaults to 'collected_data.csv' --help Show this message and exit. ``` diff --git a/sensospot_data/__init__.py b/sensospot_data/__init__.py index af73fe1..9dad0a0 100644 --- a/sensospot_data/__init__.py +++ b/sensospot_data/__init__.py @@ -3,17 +3,15 @@ Parsing the numerical output from Sensovations Sensospot image analysis. """ -__version__ = "0.5.4" +__version__ = "0.6.0" from pathlib import Path import click -from .utils import split, apply_map, apply_exposure_map # noqa: F401 from .parser import parse_file, parse_folder # noqa: F401 from .parameters import ExposureInfo # noqa: F401 -from .dynamic_range import blend, create_xdr, normalize_values # noqa: F401 @click.command() @@ -30,7 +28,7 @@ from .dynamic_range import blend, create_xdr, normalize_values # noqa: F401 @click.option( "-o", "--outfile", - default="raw_data.h5", + default="collected_data.csv", help="Output file name", ) @click.option( @@ -44,5 +42,5 @@ def run(source, outfile, quiet=False): source_path = Path(source) # read the raw data of a folder raw_data = parse_folder(source_path, quiet=quiet) - hdf5_path = source_path / outfile - raw_data.to_hdf(hdf5_path, key="raw_data", format="table") + csv_file = source_path / outfile + raw_data.to_csv(csv_file, sep="\t") diff --git a/sensospot_data/dynamic_range.py b/sensospot_data/dynamic_range.py deleted file mode 100644 index aa65bbc..0000000 --- a/sensospot_data/dynamic_range.py +++ /dev/null @@ -1,110 +0,0 @@ -from pandas.api.types import is_numeric_dtype - -from .utils import split -from .columns import ( - RAW_DATA_POS_ID, - RAW_DATA_SPOT_SAT, - CALC_SPOT_OVERFLOW, - META_DATA_WELL_ROW, - META_DATA_WELL_COLUMN, - SETTINGS_EXPOSURE_TIME, - SETTINGS_EXPOSURE_CHANNEL, - RAW_DATA_NORMALIZATION_MAP, - SETTINGS_NORMALIZED_EXPOSURE_TIME, -) - -PROBE_MULTI_INDEX = [ - META_DATA_WELL_ROW, - META_DATA_WELL_COLUMN, - RAW_DATA_POS_ID, -] - - -def _check_if_xdr_ready(data_frame): - """check if a data frame meets the constraints for xdr""" - required_columns = {SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME} - if not required_columns.issubset(data_frame.columns): - raise ValueError("XDR: Apply an exposure map first") - if len(data_frame[SETTINGS_EXPOSURE_CHANNEL].unique()) != 1: - raise ValueError("XDR: Mixed Exposure Channels") - if not is_numeric_dtype(data_frame[SETTINGS_EXPOSURE_TIME]): - raise ValueError("XDR: Exposure time is not numerical") - if data_frame[SETTINGS_EXPOSURE_TIME].hasnans: - raise ValueError("XDR: Exposure time contains NaNs") - - -def _calc_overflow_info(data_frame, column=RAW_DATA_SPOT_SAT, limit=2): - """add overflow info, based on column and limit""" - data_frame.loc[:, CALC_SPOT_OVERFLOW] = data_frame[column] > limit - return data_frame - - -def _reduce_overflow(data_frame): - """the heavy lifting for creating an extended dynamic range""" - - split_frames = split(data_frame, SETTINGS_EXPOSURE_TIME) - - # get the exposure times, longest first - exposure_times = sorted(split_frames.keys(), reverse=True) - max_time, *rest_times = exposure_times - - result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX) - - for next_time in rest_times: - mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712 - next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX) - rf_index = set(result_frame.index) - nf_index = set(next_frame.index) - diff = rf_index - nf_index | nf_index - rf_index - if diff: - num = len(diff) - raise ValueError( - f"XDR: Scan Data is incomplete, differs on {num} probes" - ) - result_frame.loc[mask] = next_frame.loc[mask] - - return result_frame.reset_index() - - -def blend(data_frame, column=RAW_DATA_SPOT_SAT, limit=2): - """creates an extended dynamic range, eliminating overflowing spots""" - _check_if_xdr_ready(data_frame) - if CALC_SPOT_OVERFLOW not in data_frame.columns: - data_frame = _calc_overflow_info(data_frame, column, limit) - return _reduce_overflow(data_frame) - - -def normalize_values(data_frame, normalized_time=None): - """add exposure time normalized values to a data frame - - will use the maximum exposure time, if none is provided - and the column SETTINGS_NORMALIZED_EXPOSURE_TIME was not - set before. - """ - if normalized_time: - data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time - elif SETTINGS_NORMALIZED_EXPOSURE_TIME not in data_frame.columns: - normalized_time = data_frame[SETTINGS_EXPOSURE_TIME].max() - data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time - - for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items(): - data_frame[normalized_col] = ( - data_frame[original_col] / data_frame[SETTINGS_EXPOSURE_TIME] - ) * data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] - - return data_frame - - -def create_xdr( - data_frame, - normalized_time=None, - column=RAW_DATA_SPOT_SAT, - limit=2, -): - """normalize measurement exposures - - normalized_time: - if it is None, the max exposure time is used for normalization. - """ - data_frame = blend(data_frame, column, limit) - return normalize_values(data_frame, normalized_time) diff --git a/sensospot_data/parameters.py b/sensospot_data/parameters.py index f7dcd61..f4367ba 100644 --- a/sensospot_data/parameters.py +++ b/sensospot_data/parameters.py @@ -7,9 +7,9 @@ from pathlib import Path from collections import namedtuple import numpy +import pandas from defusedxml import ElementTree -from .utils import apply_map from .columns import ( META_DATA_EXPOSURE_ID, META_DATA_PARAMETERS_TIME, @@ -61,7 +61,37 @@ def _add_measurement_params(data_frame, params): """adds measurement parameters to a data frame""" columns = [META_DATA_PARAMETERS_CHANNEL, META_DATA_PARAMETERS_TIME] map = {k: dict(zip(columns, v)) for k, v in params.items()} - return apply_map(data_frame, map, META_DATA_EXPOSURE_ID) + return _apply_map(data_frame, map, META_DATA_EXPOSURE_ID) + + +def _apply_map(data_frame, map, index_col): + """adds a nested dictionary to a data frame on a specific index column + + map: + keys: must be the same as the values in the index column, + values: dictionary with new column names as keys and the values + + example: + + >>> df = DataFrame(data={"MyIndex": [10, 10, 20]}) + >>> map = { + ... 10: {"NewCol": "foo"}, + ... 20: {"NewCol": "Bar"}, + ... } + >>> apply_map(df, map, "MyIndex") + MyIndex NewCol + 0 10 foo + 1 10 foo + 2 20 bar + + """ + map_df = pandas.DataFrame.from_dict(map, orient="index") + return data_frame.merge( + map_df, + how="left", + left_on=index_col, + right_index=True, + ) def add_optional_measurement_parameters(data_frame, folder): diff --git a/sensospot_data/utils.py b/sensospot_data/utils.py deleted file mode 100644 index 90b7f52..0000000 --- a/sensospot_data/utils.py +++ /dev/null @@ -1,140 +0,0 @@ -from collections.abc import Mapping, Sequence - -import pandas - -from .columns import ( - META_DATA_WELL_ROW, - META_DATA_EXPOSURE_ID, - META_DATA_WELL_COLUMN, - SETTINGS_EXPOSURE_TIME, - META_DATA_PARAMETERS_TIME, - SETTINGS_EXPOSURE_CHANNEL, - META_DATA_PARAMETERS_CHANNEL, -) - -DEFAULT_AGGREGATION_INDEX = [ - META_DATA_EXPOSURE_ID, - META_DATA_WELL_ROW, - META_DATA_WELL_COLUMN, -] - - -def split(data_frame, column): - """splits a data frame on unique column values""" - values = data_frame[column].unique() - masks = {value: (data_frame[column] == value) for value in values} - return {value: data_frame[mask] for value, mask in masks.items()} - - -def _is_list_or_tuple(something): - """returns true if something is a list or tuple""" - if isinstance(something, Sequence): - return not isinstance(something, str) - return False - - -def _is_numerical(something): - """returns true if something is an int or float""" - return isinstance(something, int) or isinstance(something, float) - - -def _check_valid_exposure_map_entry(entry): - """raises a ValueError, if an exposure map entry is not suitable""" - if not _is_list_or_tuple(entry): - raise ValueError("Eposure Map: entries must be tuples or lists") - if not len(entry) == 2: - raise ValueError("Eposure Map: entries must consist of two items") - if not _is_numerical(entry[1]): - raise ValueError("Exposure Map: second entry must be numerical") - - -def _check_exposure_map(data_frame, exposure_map): - """checks if an exposure maps fits the requirements - - Will raise an ValueError if requirements are not met - """ - if not isinstance(exposure_map, Mapping): - raise ValueError("Exposure Map: map must be a dict") - exposure_ids_in_df = set(data_frame[META_DATA_EXPOSURE_ID].unique()) - exposure_ids_in_map = set(exposure_map.keys()) - if exposure_ids_in_df != exposure_ids_in_map: - msg = ( - f"Exposure Ids {exposure_ids_in_df} don't match " - f"provided map {exposure_ids_in_map}" - ) - raise ValueError(msg) - for entry in exposure_map.values(): - _check_valid_exposure_map_entry(entry) - - -def _set_exposure_data_from_parameters(data_frame): - """infer the exposures from measurement parameters - - will raise a ValueError if the parameters contain NaNs - """ - df = data_frame # shorthand for cleaner code - - if ( - df[META_DATA_PARAMETERS_CHANNEL].hasnans - or df[META_DATA_PARAMETERS_TIME].hasnans - ): - raise ValueError("Exposure Map: measurement parameters incomplete") - - df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL] - df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME] - return df - - -def apply_exposure_map(data_frame, exposure_map=None): - """applies the parameters of a exposure map to the data frame - - exposure map: - keys: must be the same as the exposure ids, - values: objects with at least time and channel attributes - - if the exposure map is None, the values from the optionally parsed - measurement parameters are used. - - will raise an ValueError, if the provided exposure map does not map to the - exposure ids. - """ - - if exposure_map is None: - return _set_exposure_data_from_parameters(data_frame) - - _check_exposure_map(data_frame, exposure_map) - - columns = [SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME] - map = {k: dict(zip(columns, v)) for k, v in exposure_map.items()} - - return apply_map(data_frame, map, META_DATA_EXPOSURE_ID) - - -def apply_map(data_frame, map, index_col): - """adds a nested dictionary to a data frame on a specific index column - - map: - keys: must be the same as the values in the index column, - values: dictionary with new column names as keys and the values - - example: - - >>> df = DataFrame(data={"MyIndex": [10, 10, 20]}) - >>> map = { - ... 10: {"NewCol": "foo"}, - ... 20: {"NewCol": "Bar"}, - ... } - >>> apply_map(df, map, "MyIndex") - MyIndex NewCol - 0 10 foo - 1 10 foo - 2 20 bar - - """ - map_df = pandas.DataFrame.from_dict(map, orient="index") - return data_frame.merge( - map_df, - how="left", - left_on=index_col, - right_index=True, - ) diff --git a/tests/test_dynamic_range.py b/tests/test_dynamic_range.py deleted file mode 100644 index 34fe23b..0000000 --- a/tests/test_dynamic_range.py +++ /dev/null @@ -1,249 +0,0 @@ -import numpy -import pandas -import pytest - - -def test_check_if_xdr_ready_ok(exposure_df): - from sensospot_data.columns import ( - SETTINGS_EXPOSURE_TIME, - SETTINGS_EXPOSURE_CHANNEL, - ) - from sensospot_data.dynamic_range import _check_if_xdr_ready - - exposure_df[SETTINGS_EXPOSURE_TIME] = 1 - exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2 - - result = _check_if_xdr_ready(exposure_df) - - assert result is None - - -@pytest.mark.parametrize(["run"], [[0], [1], [2]]) -def test_check_if_xdr_ready_raises_error_missing_column(exposure_df, run): - from sensospot_data.columns import ( - SETTINGS_EXPOSURE_TIME, - SETTINGS_EXPOSURE_CHANNEL, - ) - from sensospot_data.dynamic_range import _check_if_xdr_ready - - columns = [SETTINGS_EXPOSURE_TIME, SETTINGS_EXPOSURE_CHANNEL, "X"] - extra_col = columns[run] - - exposure_df[extra_col] = 1 - - with pytest.raises(ValueError): - _check_if_xdr_ready(exposure_df) - - -def test_check_if_xdr_ready_raises_error_mixed_channels(exposure_df): - from sensospot_data.columns import ( - META_DATA_EXPOSURE_ID, - SETTINGS_EXPOSURE_TIME, - SETTINGS_EXPOSURE_CHANNEL, - ) - from sensospot_data.dynamic_range import _check_if_xdr_ready - - exposure_df[SETTINGS_EXPOSURE_TIME] = 1 - exposure_df[SETTINGS_EXPOSURE_CHANNEL] = exposure_df[META_DATA_EXPOSURE_ID] - - with pytest.raises(ValueError): - _check_if_xdr_ready(exposure_df) - - -def test_check_if_xdr_ready_raises_error_non_numeric_time(exposure_df): - from sensospot_data.columns import ( - SETTINGS_EXPOSURE_TIME, - SETTINGS_EXPOSURE_CHANNEL, - ) - from sensospot_data.dynamic_range import _check_if_xdr_ready - - exposure_df[SETTINGS_EXPOSURE_TIME] = "X" - exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2 - - with pytest.raises(ValueError): - _check_if_xdr_ready(exposure_df) - - -def test_check_if_xdr_ready_raises_error_on_nan(exposure_df): - from sensospot_data.columns import ( - SETTINGS_EXPOSURE_TIME, - SETTINGS_EXPOSURE_CHANNEL, - ) - from sensospot_data.dynamic_range import _check_if_xdr_ready - - exposure_df[SETTINGS_EXPOSURE_TIME] = numpy.nan - exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2 - - with pytest.raises(ValueError): - _check_if_xdr_ready(exposure_df) - - -def test_check_overflow_limit_defaults(): - from sensospot_data.columns import RAW_DATA_SPOT_SAT, CALC_SPOT_OVERFLOW - from sensospot_data.dynamic_range import _calc_overflow_info - - data_frame = pandas.DataFrame(data={RAW_DATA_SPOT_SAT: [1, 2, 3]}) - - result = _calc_overflow_info(data_frame) - - assert list(result[CALC_SPOT_OVERFLOW]) == [False, False, True] - - -def test_check_overflow_limit_custom_limit(): - from sensospot_data.columns import CALC_SPOT_OVERFLOW - from sensospot_data.dynamic_range import _calc_overflow_info - - data_frame = pandas.DataFrame(data={"X": [4, 2, 3, 4]}) - - result = _calc_overflow_info(data_frame, "X", 2) - - assert list(result[CALC_SPOT_OVERFLOW]) == [True, False, True, True] - - -def test_reduce_overflow_multiple_times(normalization_data_frame): - from sensospot_data.dynamic_range import ( - PROBE_MULTI_INDEX, - _reduce_overflow, - _calc_overflow_info, - ) - - data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1) - result = _reduce_overflow(data_frame) - - sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) - - assert list(sorted_results["Value"]) == [ - 1, - 2, - 3, - 1, - 10, - 10, - 10, - 10, - 100, - 100, - 100, - 100, - ] - - -def test_reduce_overflow_only_one_exposure_time(normalization_data_frame): - from sensospot_data.dynamic_range import ( - SETTINGS_EXPOSURE_TIME, - _reduce_overflow, - _calc_overflow_info, - ) - - normalization_data_frame[SETTINGS_EXPOSURE_TIME] = 1 - - data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1) - result = _reduce_overflow(data_frame) - - assert list(result["Value"]) == list(normalization_data_frame["Value"]) - - -def test_blend(normalization_data_frame): - from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, blend - - result = blend(normalization_data_frame, "Saturation", 1) - - sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) - - assert list(sorted_results["Value"]) == [ - 1, - 2, - 3, - 1, - 10, - 10, - 10, - 10, - 100, - 100, - 100, - 100, - ] - - -def test_blend_raises_error(normalization_data_frame): - from sensospot_data.dynamic_range import SETTINGS_EXPOSURE_TIME, blend - - normalization_data_frame[SETTINGS_EXPOSURE_TIME] = "A" - - with pytest.raises(ValueError): - blend(normalization_data_frame, "Saturation", 1) - - -def test_normalize_values_no_param(normalization_data_frame): - from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP - from sensospot_data.dynamic_range import ( - PROBE_MULTI_INDEX, - blend, - normalize_values, - ) - - reduced = blend(normalization_data_frame, "Saturation", 1) - - result = normalize_values(reduced) - - sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) - expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100] - - for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): - assert list(sorted_results[normalized_col]) == expected_values - - -def test_normalize_values_custom_param(normalization_data_frame): - from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP - from sensospot_data.dynamic_range import ( - PROBE_MULTI_INDEX, - blend, - normalize_values, - ) - - reduced = blend(normalization_data_frame, "Saturation", 1) - - result = normalize_values(reduced, 100) - - sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) - expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200] - - for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): - assert list(sorted_results[normalized_col]) == expected_values - - -def test_normalize_values_preset_param(normalization_data_frame): - from sensospot_data.columns import ( - RAW_DATA_NORMALIZATION_MAP, - SETTINGS_NORMALIZED_EXPOSURE_TIME, - ) - from sensospot_data.dynamic_range import ( - PROBE_MULTI_INDEX, - blend, - normalize_values, - ) - - reduced = blend(normalization_data_frame, "Saturation", 1) - reduced[SETTINGS_NORMALIZED_EXPOSURE_TIME] = 100 - - result = normalize_values(reduced) - - sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) - expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200] - - for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): - assert list(sorted_results[normalized_col]) == expected_values - - -def test_create_xdr(normalization_data_frame): - from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP - from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, create_xdr - - result = create_xdr(normalization_data_frame, 100, "Saturation", 1) - - sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) - expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200] - - for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): - assert list(sorted_results[normalized_col]) == expected_values diff --git a/tests/test_parameters.py b/tests/test_parameters.py index 815c511..dc4a7c3 100644 --- a/tests/test_parameters.py +++ b/tests/test_parameters.py @@ -119,3 +119,62 @@ def test_add_optional_measurement_parameters_without_params_file( one_exposure_data_frame = exposure_df.loc[mask] assert one_exposure_data_frame["Parameters.Channel"].hasnans assert one_exposure_data_frame["Parameters.Time"].hasnans + + +def test_apply_map(exposure_df): + from sensospot_data.parameters import _apply_map + + map = { + 1: {"SomeColumn": "A", "OtherColumn": 9}, + 2: {"SomeColumn": "B", "OtherColumn": 8}, + 3: {"SomeColumn": "C", "OtherColumn": 7}, + } + + result = _apply_map(exposure_df, map, "Exposure.Id") + + for key, value in map.items(): + mask = result["Exposure.Id"] == key + partial = result.loc[mask] + assert set(partial["SomeColumn"].unique()) == {value["SomeColumn"]} + assert set(partial["OtherColumn"].unique()) == {value["OtherColumn"]} + + +def test_apply_map_keys_not_in_df(exposure_df): + from sensospot_data.parameters import _apply_map + + map = { + 1: {"some_col": "A", "other_col": 9}, + 2: {"some_col": "B", "other_col": 8}, + 3: {"some_col": "C", "other_col": 7}, + 4: {"some_col": "D", "other_col": 6}, + } + + result = _apply_map(exposure_df, map, "Exposure.Id") + + for key in (1, 2, 3): + value = map[key] + mask = result["Exposure.Id"] == key + partial = result.loc[mask] + assert set(partial["some_col"].unique()) == {value["some_col"]} + assert set(partial["other_col"].unique()) == {value["other_col"]} + + assert "D" not in set(result["some_col"].unique()) + assert "6" not in set(result["other_col"].unique()) + + +def test_apply_map_not_all_keys_map_to_df(exposure_df): + from sensospot_data.parameters import _apply_map + + map = { + 1: {"some_col": "A", "other_col": 9}, + 3: {"some_col": "C", "other_col": 7}, + } + + result = _apply_map(exposure_df, map, "Exposure.Id") + + assert not result.iloc[0].hasnans + assert result.iloc[1].hasnans + assert not result.iloc[2].hasnans + + assert result["some_col"].hasnans + assert result["other_col"].hasnans diff --git a/tests/test_sensovation_data.py b/tests/test_sensovation_data.py index 484bd5a..6fd50af 100644 --- a/tests/test_sensovation_data.py +++ b/tests/test_sensovation_data.py @@ -4,11 +4,5 @@ def test_import_api(): from sensospot_data import ExposureInfo # noqa: F401 from sensospot_data import run # noqa: F401 - from sensospot_data import blend # noqa: F401 - from sensospot_data import split # noqa: F401 - from sensospot_data import apply_map # noqa: F401 - from sensospot_data import create_xdr # noqa: F401 from sensospot_data import parse_file # noqa: F401 from sensospot_data import parse_folder # noqa: F401 - from sensospot_data import normalize_values # noqa: F401 - from sensospot_data import apply_exposure_map # noqa: F401 diff --git a/tests/test_utils.py b/tests/test_utils.py deleted file mode 100644 index 5913376..0000000 --- a/tests/test_utils.py +++ /dev/null @@ -1,238 +0,0 @@ -from collections import namedtuple - -import pytest - -ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"]) - - -def test_split(data_frame_with_params): - from sensospot_data.utils import split - - result = split(data_frame_with_params, "Well.Row") - - assert set(result.keys()) == set("ABC") - for key, value_df in result.items(): - assert set(value_df["Well.Row"].unique()) == {key} - - -@pytest.mark.parametrize( - "value,expected", - [ - [[1, 2], True], - [(1, 2), True], - [{1, 2}, False], - [{1: 2}, False], - ["1, 2", False], - [None, False], - ], -) -def test_is_list_or_tuple(value, expected): - from sensospot_data.utils import _is_list_or_tuple - - result = _is_list_or_tuple(value) - - assert result is expected - - -@pytest.mark.parametrize( - "value,expected", - [ - [1, True], - [1.2, True], - [{1, 2}, False], - [{1: 2}, False], - ["1", False], - [None, False], - ], -) -def test_is_numerical(value, expected): - from sensospot_data.utils import _is_numerical - - result = _is_numerical(value) - - assert result is expected - - -def test_check_valid_exposure_map_entry_ok(): - from sensospot_data.utils import _check_valid_exposure_map_entry - - result = _check_valid_exposure_map_entry((2, 1)) - - assert result is None - - -@pytest.mark.parametrize( - "value", [[], [1], (1, 2, 3), {"a": 1, "b": 2}, ("A", "B")] -) -def test_check_valid_exposure_map_entry_raises_error(value): - from sensospot_data.utils import _check_valid_exposure_map_entry - - with pytest.raises(ValueError): - _check_valid_exposure_map_entry(value) - - -def test_check_exposure_map_ok(exposure_df): - from sensospot_data.utils import _check_exposure_map - - exposure_map = {1: ("A", 10), 2: ("B", 20), 3: ("C", 30)} - - result = _check_exposure_map(exposure_df, exposure_map) - - assert result is None - - -def test_check_exposure_map_wrong_type(exposure_df): - from sensospot_data.utils import _check_exposure_map - - exposure_map = [] - - with pytest.raises(ValueError): - _check_exposure_map(exposure_df, exposure_map) - - -def test_check_exposure_map_wrong_ids(exposure_df): - from sensospot_data.utils import _check_exposure_map - - exposure_map = {1: ("A", 10), 2: ("B", 20), 4: ("D", 40)} - - with pytest.raises(ValueError): - _check_exposure_map(exposure_df, exposure_map) - - -def test_check_exposure_map_invalid_entries(exposure_df): - from sensospot_data.utils import _check_exposure_map - - exposure_map = {1: ("A", 10), 2: ("B", 20), 3: "ERROR"} - - with pytest.raises(ValueError): - _check_exposure_map(exposure_df, exposure_map) - - -def test_infer_exposure_from_parameters(data_frame_with_params): - from sensospot_data.utils import _set_exposure_data_from_parameters - - result = _set_exposure_data_from_parameters(data_frame_with_params) - - assert all(result["Exposure.Channel"] == result["Parameters.Channel"]) - assert all(result["Exposure.Time"] == result["Parameters.Time"]) - - -def test_infer_exposure_from_parameters_raises_error( - data_frame_without_params, -): - from sensospot_data.utils import _set_exposure_data_from_parameters - - with pytest.raises(ValueError) as excinfo: - _set_exposure_data_from_parameters(data_frame_without_params) - - assert str(excinfo.value).startswith("Exposure Map: measurement") - - -def test_apply_exposure_map(data_frame_with_params): - from sensospot_data.utils import apply_exposure_map - - exposure_map = { - 1: ExposureSetting("Cy3", 100), - 2: ExposureSetting("Cy5", 15), - 3: ExposureSetting("Cy5", 150), - } - - result = apply_exposure_map(data_frame_with_params, exposure_map) - - for key, value in exposure_map.items(): - mask = result["Exposure.Id"] == key - partial = result.loc[mask] - assert set(partial["Exposure.Channel"].unique()) == {value.channel} - assert set(partial["Exposure.Time"].unique()) == {value.time} - - -def test_apply_exposure_map_raises_error(data_frame_with_params): - from sensospot_data.utils import apply_exposure_map - - exposure_map = { - 1: ExposureSetting("Cy3", 100), - 2: ExposureSetting("Cy5", 15), - "X": ExposureSetting("Cy5", 150), - } - - with pytest.raises(ValueError): - apply_exposure_map(data_frame_with_params, exposure_map) - - -def test_apply_exposure_map_from_parameters(data_frame_with_params): - from sensospot_data.utils import apply_exposure_map - - result = apply_exposure_map(data_frame_with_params, None) - - assert all(result["Exposure.Channel"] == result["Parameters.Channel"]) - assert all(result["Exposure.Time"] == result["Parameters.Time"]) - - -def test_apply_exposure_map_from_parameters_raises_error( - data_frame_without_params, -): - from sensospot_data.utils import apply_exposure_map - - with pytest.raises(ValueError) as excinfo: - apply_exposure_map(data_frame_without_params, None) - - assert str(excinfo.value).startswith("Exposure Map: measurement") - - -def test_apply_map(exposure_df): - from sensospot_data.utils import apply_map - - map = { - 1: {"SomeColumn": "A", "OtherColumn": 9}, - 2: {"SomeColumn": "B", "OtherColumn": 8}, - 3: {"SomeColumn": "C", "OtherColumn": 7}, - } - - result = apply_map(exposure_df, map, "Exposure.Id") - - for key, value in map.items(): - mask = result["Exposure.Id"] == key - partial = result.loc[mask] - assert set(partial["SomeColumn"].unique()) == {value["SomeColumn"]} - assert set(partial["OtherColumn"].unique()) == {value["OtherColumn"]} - - -def test_apply_map_keys_not_in_df(exposure_df): - from sensospot_data.utils import apply_map - - map = { - 1: {"some_col": "A", "other_col": 9}, - 2: {"some_col": "B", "other_col": 8}, - 3: {"some_col": "C", "other_col": 7}, - 4: {"some_col": "D", "other_col": 6}, - } - - result = apply_map(exposure_df, map, "Exposure.Id") - - for key in (1, 2, 3): - value = map[key] - mask = result["Exposure.Id"] == key - partial = result.loc[mask] - assert set(partial["some_col"].unique()) == {value["some_col"]} - assert set(partial["other_col"].unique()) == {value["other_col"]} - - assert "D" not in set(result["some_col"].unique()) - assert "6" not in set(result["other_col"].unique()) - - -def test_apply_map_not_all_keys_map_to_df(exposure_df): - from sensospot_data.utils import apply_map - - map = { - 1: {"some_col": "A", "other_col": 9}, - 3: {"some_col": "C", "other_col": 7}, - } - - result = apply_map(exposure_df, map, "Exposure.Id") - - assert not result.iloc[0].hasnans - assert result.iloc[1].hasnans - assert not result.iloc[2].hasnans - - assert result["some_col"].hasnans - assert result["other_col"].hasnans