diff --git a/sensospot_data/columns.py b/sensospot_data/columns.py index 98ea7e1..22b5220 100644 --- a/sensospot_data/columns.py +++ b/sensospot_data/columns.py @@ -72,9 +72,11 @@ SETTINGS_EXPOSURE_TIME = "Exposure.Time" # calculated value for dynamic range normalization CALC_SPOT_OVERFLOW = "Calc.Spot.Overflow" +# settings for normalized exposure time +SETTINGS_NORMALIZED_EXPOSURE_TIME = "Settings.Normalized.Exposure.Time" + # normalized columns n_prefix = "Calc.Normalized." -CALC_NORMALIZED_EXPOSURE_TIME = f"{n_prefix}{SETTINGS_EXPOSURE_TIME}" CALC_NORMALIZED_BKG_MEAN = f"{n_prefix}{RAW_DATA_BKG_MEAN}" CALC_NORMALIZED_SPOT_MEAN = f"{n_prefix}{RAW_DATA_SPOT_MEAN}" CALC_NORMALIZED_BKG_MEDIAN = f"{n_prefix}{RAW_DATA_BKG_MEDIAN}" @@ -85,7 +87,7 @@ CALC_NORMALIZED_BKG_SUM = f"{n_prefix}{RAW_DATA_BKG_SUM}" CALC_NORMALIZED_SPOT_SUM = f"{n_prefix}{RAW_DATA_SPOT_SUM}" # what columns to convert for normalization -COLUMN_NORMALIZATION_MAP = { +RAW_DATA_NORMALIZATION_MAP = { RAW_DATA_BKG_MEAN: CALC_NORMALIZED_BKG_MEAN, RAW_DATA_SPOT_MEAN: CALC_NORMALIZED_SPOT_MEAN, RAW_DATA_BKG_MEDIAN: CALC_NORMALIZED_BKG_MEDIAN, diff --git a/sensospot_data/normalisation.py b/sensospot_data/normalisation.py new file mode 100644 index 0000000..8e37bcc --- /dev/null +++ b/sensospot_data/normalisation.py @@ -0,0 +1,177 @@ +import numpy + +from .columns import ( + RAW_DATA_POS_ID, + CALC_SPOT_OVERFLOW, + META_DATA_WELL_ROW, + RAW_DATA_SPOT_MEAN, + META_DATA_EXPOSURE_ID, + META_DATA_WELL_COLUMN, + SETTINGS_EXPOSURE_TIME, + META_DATA_PARAMETERS_TIME, + SETTINGS_EXPOSURE_CHANNEL, + RAW_DATA_NORMALIZATION_MAP, + META_DATA_PARAMETERS_CHANNEL, + SETTINGS_NORMALIZED_EXPOSURE_TIME, +) + +PROBE_MULTI_INDEX = [ + META_DATA_WELL_ROW, + META_DATA_WELL_COLUMN, + RAW_DATA_POS_ID, +] + + +def _split_data_frame(data_frame, column): + """ splits a data frame on unique column values """ + values = data_frame[column].unique() + masks = {value: (data_frame[column] == value) for value in values} + return {value: data_frame[mask] for value, mask in masks.items()} + + +def _infer_exposure_from_parameters(data_frame): + """infer the exposures from measurement parameters + + will raise a ValueError if the parameters contain NaNs + """ + df = data_frame # shorthand for cleaner code + + if ( + df[META_DATA_PARAMETERS_CHANNEL].hasnans + or df[META_DATA_PARAMETERS_TIME].hasnans + ): + raise ValueError("Exposure Map: measurement parameters incomplete") + + df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL] + df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME] + return df + + +def apply_exposure_map(data_frame, exposure_map=None): + """applies the parameters of a exposure map to the data frame + + exposure map: + keys: must be the same as the exposure ids, + values: objects with at least time and channel attributes + + if the exposure map is None, the values from the optionally parsed + measurement parameters are used. + + will raise an ValueError, if the provided exposure map does not map to the + exposure ids. + """ + + if exposure_map is None: + return _infer_exposure_from_parameters(data_frame) + + existing = set(data_frame[META_DATA_EXPOSURE_ID].unique()) + provided = set(exposure_map.keys()) + if existing != provided: + raise ValueError( + f"Exposure Map differs from data frame: {provided} != {existing}" + ) + + data_frame[SETTINGS_EXPOSURE_CHANNEL] = numpy.nan + data_frame[SETTINGS_EXPOSURE_TIME] = numpy.nan + for exposure_id, exposure_info in exposure_map.items(): + mask = data_frame[META_DATA_EXPOSURE_ID] == exposure_id + data_frame.loc[mask, SETTINGS_EXPOSURE_CHANNEL] = exposure_info.channel + data_frame.loc[mask, SETTINGS_EXPOSURE_TIME] = exposure_info.time + return data_frame + + +def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5): + """ add overflow info, based on column and limit """ + data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit + return data_frame + + +def reduce_overflow(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5): + """ reduces the data set per channel, eliminating overflowing spots """ + data_frame = _check_overflow_limit(data_frame, column, limit) + + split_frames = _split_data_frame(data_frame, SETTINGS_EXPOSURE_CHANNEL) + + return { + channel_id: _reduce_overflow_in_channel(channel_frame) + for channel_id, channel_frame in split_frames.items() + } + + +def _reduce_overflow_in_channel(channel_frame): + """ does the heavy lifting for reduce_overflow """ + + split_frames = _split_data_frame(channel_frame, SETTINGS_EXPOSURE_TIME) + + if len(split_frames) == 1: + # shortcut, if there is only one exposure in the channel + return channel_frame + + exposure_times = sorted(split_frames.keys(), reverse=True) + max_time, *rest_times = exposure_times + + result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX) + + for next_time in rest_times: + mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712 + next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX) + result_frame.loc[mask] = next_frame.loc[mask] + + return result_frame.reset_index() + + +def _infer_normalization_map(split_data_frames): + """ extract a time normalization map from split data frames """ + return { + key: frame[SETTINGS_EXPOSURE_TIME].max() + for key, frame in split_data_frames.items() + } + + +def normalize_exposure_time(split_data_frames): + """add time normalized values to the split data frames + + The max exposure time per channel is used for normalization. + """ + normalization_map = _infer_normalization_map(split_data_frames) + return { + key: normalize_channel(frame, normalization_map[key]) + for key, frame in split_data_frames.items() + } + + +def normalize_channel(channel_frame, normalized_time): + """ add time normalized values to a channel data frames """ + channel_frame = channel_frame.copy() + channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time + + for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items(): + channel_frame[normalized_col] = ( + channel_frame[original_col] / channel_frame[SETTINGS_EXPOSURE_TIME] + ) * channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] + + return channel_frame + + +def split_channels( + data_frame, + exposure_map=None, + overflow_column=RAW_DATA_SPOT_MEAN, + overflow_limit=0.5, +): + """augment normalize the measurement exposures + + exposure map: + keys: must be the same as the exposure ids, + values: objects with at least time and channel attributes + if the exposure map is None, the values from the optionally parsed + measurement parameters are used. + + The max exposure time per channel is used for normalization. + """ + + exposure_data_frame = apply_exposure_map(data_frame, exposure_map) + split_data_frames = reduce_overflow( + exposure_data_frame, overflow_column, overflow_limit + ) + return normalize_exposure_time(split_data_frames) diff --git a/tests/conftest.py b/tests/conftest.py index 735e87d..310ce99 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,7 @@ EXAMPLE_DIR_WO_PARAMS = "mtp_wo_parameters" EXAMPLE_DIR_WITH_PARAMS = "mtp_with_parameters" -@pytest.fixture +@pytest.fixture(scope="session") def example_dir(request): root_dir = Path(request.config.rootdir) yield root_dir / "example_data" @@ -40,7 +40,7 @@ def dir_for_caching(tmpdir, example_file): @pytest.fixture def normalization_data_frame(): - from sensospot_data.columns import COLUMN_NORMALIZATION + from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP overflow_test_values = [ (1, 1, 1, 50, 1, 0), @@ -94,7 +94,7 @@ def normalization_data_frame(): data_frame = pandas.DataFrame(overflow_test_data) data_frame["Exposure.Channel"] = "Cy5" - for value_column in COLUMN_NORMALIZATION.keys(): + for value_column in RAW_DATA_NORMALIZATION_MAP.keys(): data_frame[value_column] = data_frame["Value"] yield data_frame diff --git a/tests/test_normailsation.py b/tests/test_normailsation.py new file mode 100644 index 0000000..b9160d3 --- /dev/null +++ b/tests/test_normailsation.py @@ -0,0 +1,292 @@ +from collections import namedtuple + +import pandas +import pytest + +from .conftest import EXAMPLE_DIR_WO_PARAMS, EXAMPLE_DIR_WITH_PARAMS + +ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"]) + + +@pytest.fixture(scope="session") +def data_frame_with_params(example_dir): + from sensospot_data.parser import parse_folder + + return parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS) + + +@pytest.fixture(scope="session") +def data_frame_without_params(example_dir): + from sensospot_data.parser import parse_folder + + return parse_folder(example_dir / EXAMPLE_DIR_WO_PARAMS) + + +@pytest.fixture +def df_wp(data_frame_with_params): + return data_frame_with_params.copy() + + +@pytest.fixture +def df_wop(data_frame_without_params): + return data_frame_without_params.copy() + + +def test_split_data_frame(df_wp): + from sensospot_data.normalisation import _split_data_frame + + result = _split_data_frame(df_wp, "Well.Row") + + assert set(result.keys()) == set("ABC") + for key, value_df in result.items(): + assert set(value_df["Well.Row"].unique()) == {key} + + +def test_infer_exposure_from_parameters(df_wp): + from sensospot_data.normalisation import _infer_exposure_from_parameters + + result = _infer_exposure_from_parameters(df_wp) + + assert all(result["Exposure.Channel"] == result["Parameters.Channel"]) + assert all(result["Exposure.Time"] == result["Parameters.Time"]) + + +def test_infer_exposure_from_parameters_raises_error(df_wop): + from sensospot_data.normalisation import _infer_exposure_from_parameters + + with pytest.raises(ValueError) as excinfo: + _infer_exposure_from_parameters(df_wop) + + assert str(excinfo.value).startswith("Exposure Map: measurement") + + +def test_apply_exposure_map(df_wp): + from sensospot_data.normalisation import apply_exposure_map + + exposure_map = { + 1: ExposureSetting("Cy3", 100), + 2: ExposureSetting("Cy5", 15), + 3: ExposureSetting("Cy5", 150), + } + + result = apply_exposure_map(df_wp, exposure_map) + + for key, value in exposure_map.items(): + mask = result["Exposure.Id"] == key + partial = result.loc[mask] + assert set(partial["Exposure.Channel"].unique()) == {value.channel} + assert set(partial["Exposure.Time"].unique()) == {value.time} + + +def test_apply_exposure_map_raises_error(df_wp): + from sensospot_data.normalisation import apply_exposure_map + + exposure_map = { + 1: ExposureSetting("Cy3", 100), + 2: ExposureSetting("Cy5", 15), + "X": ExposureSetting("Cy5", 150), + } + + with pytest.raises(ValueError) as excinfo: + apply_exposure_map(df_wp, exposure_map) + + assert str(excinfo.value).startswith("Exposure Map differs") + + +def test_apply_exposure_map_from_parameters(df_wp): + from sensospot_data.normalisation import apply_exposure_map + + result = apply_exposure_map(df_wp, None) + + assert all(result["Exposure.Channel"] == result["Parameters.Channel"]) + assert all(result["Exposure.Time"] == result["Parameters.Time"]) + + +def test_apply_exposure_map_from_parameters_raises_error(df_wop): + from sensospot_data.normalisation import apply_exposure_map + + with pytest.raises(ValueError) as excinfo: + apply_exposure_map(df_wop, None) + + assert str(excinfo.value).startswith("Exposure Map: measurement") + + +def test_check_overflow_limit_defaults(): + from sensospot_data.normalisation import _check_overflow_limit + + data_frame = pandas.DataFrame(data={"Spot.Mean": [0.1, 0.5, 0.6]}) + + result = _check_overflow_limit(data_frame) + + assert list(result["Calc.Spot.Overflow"]) == [False, False, True] + + +def test_check_overflow_limit_custom_limit(): + from sensospot_data.normalisation import _check_overflow_limit + + data_frame = pandas.DataFrame(data={"Spot.Saturation": [4, 2, 3, 4]}) + + result = _check_overflow_limit(data_frame, "Spot.Saturation", 2) + + assert list(result["Calc.Spot.Overflow"]) == [True, False, True, True] + + +def test_reduce_overflow_in_channel(normalization_data_frame): + from sensospot_data.normalisation import ( + _check_overflow_limit, + _reduce_overflow_in_channel, + ) + + data_frame = _check_overflow_limit( + normalization_data_frame, "Saturation", 1 + ) + result = _reduce_overflow_in_channel(data_frame) + + sorted_results = result.sort_values( + by=["Well.Row", "Well.Column", "Pos.Id"] + ) + + assert list(sorted_results["Value"]) == [ + 1, + 2, + 3, + 1, + 10, + 10, + 10, + 10, + 100, + 100, + 100, + 100, + ] + + +def test_reduce_overflow_in_channel_shortcut(normalization_data_frame): + from sensospot_data.normalisation import ( + _check_overflow_limit, + _reduce_overflow_in_channel, + ) + + normalization_data_frame["Exposure.Time"] = 1 + + data_frame = _check_overflow_limit( + normalization_data_frame, "Saturation", 1 + ) + result = _reduce_overflow_in_channel(data_frame) + + assert result is data_frame + + +def test_reduce_overflow(normalization_data_frame): + from sensospot_data.normalisation import reduce_overflow + + result = reduce_overflow(normalization_data_frame, "Saturation", 1) + + assert "Cy5" in result + + sorted_results = result["Cy5"].sort_values( + by=["Well.Row", "Well.Column", "Pos.Id"] + ) + + assert list(sorted_results["Value"]) == [ + 1, + 2, + 3, + 1, + 10, + 10, + 10, + 10, + 100, + 100, + 100, + 100, + ] + + +def test_infer_normalization_map(normalization_data_frame): + from sensospot_data.normalisation import ( + _split_data_frame, + _infer_normalization_map, + ) + + normalization_data_frame.loc[5, "Exposure.Channel"] = "Cy3" + split_frames = _split_data_frame( + normalization_data_frame, "Exposure.Channel" + ) + + result = _infer_normalization_map(split_frames) + + assert result == {"Cy3": 25, "Cy5": 50} + + +def test_normalize_channel(normalization_data_frame): + from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP + from sensospot_data.normalisation import reduce_overflow, normalize_channel + + reduced = reduce_overflow(normalization_data_frame, "Saturation", 1) + result = normalize_channel(reduced["Cy5"], 50) + + sorted_results = result.sort_values( + by=["Well.Row", "Well.Column", "Pos.Id"] + ) + expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200] + + for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): + list(sorted_results[normalized_col]) == expected_values + + +def test_normalize_exposure_time(normalization_data_frame): + from sensospot_data.normalisation import ( + reduce_overflow, + normalize_exposure_time, + ) + + reduced = reduce_overflow(normalization_data_frame, "Saturation", 1) + result = normalize_exposure_time(reduced) + + assert "Cy5" in result + + sorted_results = result["Cy5"].sort_values( + by=["Well.Row", "Well.Column", "Pos.Id"] + ) + expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100] + + assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values + + +def test_normalize_exposure_time_infered_map(normalization_data_frame): + from sensospot_data.normalisation import ( + reduce_overflow, + normalize_exposure_time, + ) + + reduced = reduce_overflow(normalization_data_frame, "Saturation", 1) + result = normalize_exposure_time(reduced) + + assert "Cy5" in result + + sorted_results = result["Cy5"].sort_values( + by=["Well.Row", "Well.Column", "Pos.Id"] + ) + expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100] + + assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values + + +def test_normalize_measurement(df_wp): + from sensospot_data.normalisation import split_channels + + exposure_map = { + 1: ExposureSetting("Cy3", 100), + 2: ExposureSetting("Cy5", 15), + 3: ExposureSetting("Cy5", 150), + } + + result = split_channels(df_wp, exposure_map) + cy3_df, cy5_df = result["Cy3"], result["Cy5"] + + assert set(result.keys()) == {"Cy3", "Cy5"} + assert cy3_df["Settings.Normalized.Exposure.Time"].unique() == 100 + assert cy5_df["Settings.Normalized.Exposure.Time"].unique() == 150