diff --git a/sensospot_data/__init__.py b/sensospot_data/__init__.py index 255d17d..bb075a0 100644 --- a/sensospot_data/__init__.py +++ b/sensospot_data/__init__.py @@ -13,6 +13,7 @@ import click from .utils import split_data_frame, apply_exposure_map # noqa: F401 from .parser import parse_file, parse_folder # noqa: F401 from .parameters import ExposureInfo, get_measurement_params # noqa: F401 +from .dynamic_range import blend, create_xdr, normalize_values # noqa: F401 @click.command() diff --git a/sensospot_data/dynamic_range.py b/sensospot_data/dynamic_range.py new file mode 100644 index 0000000..b095957 --- /dev/null +++ b/sensospot_data/dynamic_range.py @@ -0,0 +1,99 @@ +from pandas.api.types import is_numeric_dtype + +from .utils import split_data_frame +from .columns import ( + RAW_DATA_POS_ID, + CALC_SPOT_OVERFLOW, + META_DATA_WELL_ROW, + RAW_DATA_SPOT_MEAN, + META_DATA_WELL_COLUMN, + SETTINGS_EXPOSURE_TIME, + SETTINGS_EXPOSURE_CHANNEL, + RAW_DATA_NORMALIZATION_MAP, + SETTINGS_NORMALIZED_EXPOSURE_TIME, +) + +PROBE_MULTI_INDEX = [ + META_DATA_WELL_ROW, + META_DATA_WELL_COLUMN, + RAW_DATA_POS_ID, +] + + +def _check_if_xdr_ready(data_frame): + """ check if a data frame meets the constraints for xdr """ + required_columns = {SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME} + if not required_columns.issubset(data_frame.columns): + raise ValueError("XDR: Apply an exposure map first") + if len(data_frame[SETTINGS_EXPOSURE_CHANNEL].unique()) != 1: + raise ValueError("XDR: Mixed Exposure Channels") + if not is_numeric_dtype(data_frame[SETTINGS_EXPOSURE_TIME]): + raise ValueError("XDR: Exposure time is not numerical") + if data_frame[SETTINGS_EXPOSURE_TIME].hasnans: + raise ValueError("XDR: Exposure time contains NaNs") + + +def _calc_overflow_info(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5): + """ add overflow info, based on column and limit """ + data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit + return data_frame + + +def _reduce_overflow(data_frame): + """ the heavy lifting for creating an extended dynamic range """ + + split_frames = split_data_frame(data_frame, SETTINGS_EXPOSURE_TIME) + + # get the exposure times, longest first + exposure_times = sorted(split_frames.keys(), reverse=True) + max_time, *rest_times = exposure_times + + result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX) + + for next_time in rest_times: + mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712 + next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX) + result_frame.loc[mask] = next_frame.loc[mask] + + return result_frame.reset_index() + + +def blend(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5): + """ creates an extended dynamic range, eliminating overflowing spots """ + _check_if_xdr_ready(data_frame) + data_frame = _calc_overflow_info(data_frame, column, limit) + return _reduce_overflow(data_frame) + + +def normalize_values(data_frame, normalized_time=None): + """add exposure time normalized values to a data frame + + will use the maximum exposure time, if none is provided + """ + if normalized_time is None: + normalized_time = data_frame[SETTINGS_EXPOSURE_TIME].max() + print(normalized_time) + + data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time + + for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items(): + data_frame[normalized_col] = ( + data_frame[original_col] / data_frame[SETTINGS_EXPOSURE_TIME] + ) * data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] + + return data_frame + + +def create_xdr( + data_frame, + normalized_time=None, + overflow_column=RAW_DATA_SPOT_MEAN, + overflow_limit=0.5, +): + """normalize measurement exposures + + normalized_time: + if it is None, the max exposure time is used for normalization. + """ + data_frame = blend(data_frame, overflow_column, overflow_limit) + return normalize_values(data_frame, normalized_time) diff --git a/sensospot_data/normalisation.py b/sensospot_data/normalisation.py deleted file mode 100644 index 4e066be..0000000 --- a/sensospot_data/normalisation.py +++ /dev/null @@ -1,116 +0,0 @@ -from .columns import ( - RAW_DATA_POS_ID, - CALC_SPOT_OVERFLOW, - META_DATA_WELL_ROW, - RAW_DATA_SPOT_MEAN, - META_DATA_WELL_COLUMN, - SETTINGS_EXPOSURE_TIME, - SETTINGS_EXPOSURE_CHANNEL, - RAW_DATA_NORMALIZATION_MAP, - SETTINGS_NORMALIZED_EXPOSURE_TIME, -) - -PROBE_MULTI_INDEX = [ - META_DATA_WELL_ROW, - META_DATA_WELL_COLUMN, - RAW_DATA_POS_ID, -] - -from .utils import split_data_frame, apply_exposure_map - - -def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5): - """ add overflow info, based on column and limit """ - data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit - return data_frame - - -def reduce_overflow(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5): - """ reduces the data set per channel, eliminating overflowing spots """ - data_frame = _check_overflow_limit(data_frame, column, limit) - - split_frames = split_data_frame(data_frame, SETTINGS_EXPOSURE_CHANNEL) - - return { - channel_id: _reduce_overflow_in_channel(channel_frame) - for channel_id, channel_frame in split_frames.items() - } - - -def _reduce_overflow_in_channel(channel_frame): - """ does the heavy lifting for reduce_overflow """ - - split_frames = split_data_frame(channel_frame, SETTINGS_EXPOSURE_TIME) - - if len(split_frames) == 1: - # shortcut, if there is only one exposure in the channel - return channel_frame - - exposure_times = sorted(split_frames.keys(), reverse=True) - max_time, *rest_times = exposure_times - - result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX) - - for next_time in rest_times: - mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712 - next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX) - result_frame.loc[mask] = next_frame.loc[mask] - - return result_frame.reset_index() - - -def _infer_normalization_map(split_data_frames): - """ extract a time normalization map from split data frames """ - return { - key: frame[SETTINGS_EXPOSURE_TIME].max() - for key, frame in split_data_frames.items() - } - - -def normalize_exposure_time(split_data_frames): - """add time normalized values to the split data frames - - The max exposure time per channel is used for normalization. - """ - normalization_map = _infer_normalization_map(split_data_frames) - return { - key: normalize_channel(frame, normalization_map[key]) - for key, frame in split_data_frames.items() - } - - -def normalize_channel(channel_frame, normalized_time): - """ add time normalized values to a channel data frames """ - channel_frame = channel_frame.copy() - channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time - - for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items(): - channel_frame[normalized_col] = ( - channel_frame[original_col] / channel_frame[SETTINGS_EXPOSURE_TIME] - ) * channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] - - return channel_frame - - -def split_channels( - data_frame, - exposure_map=None, - overflow_column=RAW_DATA_SPOT_MEAN, - overflow_limit=0.5, -): - """augment normalize the measurement exposures - - exposure map: - keys: must be the same as the exposure ids, - values: objects with at least time and channel attributes - if the exposure map is None, the values from the optionally parsed - measurement parameters are used. - - The max exposure time per channel is used for normalization. - """ - - exposure_data_frame = apply_exposure_map(data_frame, exposure_map) - split_data_frames = reduce_overflow( - exposure_data_frame, overflow_column, overflow_limit - ) - return normalize_exposure_time(split_data_frames) diff --git a/tests/test_dynamic_range.py b/tests/test_dynamic_range.py new file mode 100644 index 0000000..ade94f7 --- /dev/null +++ b/tests/test_dynamic_range.py @@ -0,0 +1,226 @@ +import numpy +import pandas +import pytest + + +def test_check_if_xdr_ready_ok(exposure_df): + from sensospot_data.columns import ( + SETTINGS_EXPOSURE_TIME, + SETTINGS_EXPOSURE_CHANNEL, + ) + from sensospot_data.dynamic_range import _check_if_xdr_ready + + exposure_df[SETTINGS_EXPOSURE_TIME] = 1 + exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2 + + result = _check_if_xdr_ready(exposure_df) + + assert result is None + + +@pytest.mark.parametrize(["run"], [[0], [1], [2]]) +def test_check_if_xdr_ready_raises_error_missing_column(exposure_df, run): + from sensospot_data.columns import ( + SETTINGS_EXPOSURE_TIME, + SETTINGS_EXPOSURE_CHANNEL, + ) + from sensospot_data.dynamic_range import _check_if_xdr_ready + + columns = [SETTINGS_EXPOSURE_TIME, SETTINGS_EXPOSURE_CHANNEL, "X"] + extra_col = columns[run] + + exposure_df[extra_col] = 1 + + with pytest.raises(ValueError): + _check_if_xdr_ready(exposure_df) + + +def test_check_if_xdr_ready_raises_error_mixed_channels(exposure_df): + from sensospot_data.columns import ( + META_DATA_EXPOSURE_ID, + SETTINGS_EXPOSURE_TIME, + SETTINGS_EXPOSURE_CHANNEL, + ) + from sensospot_data.dynamic_range import _check_if_xdr_ready + + exposure_df[SETTINGS_EXPOSURE_TIME] = 1 + exposure_df[SETTINGS_EXPOSURE_CHANNEL] = exposure_df[META_DATA_EXPOSURE_ID] + + with pytest.raises(ValueError): + _check_if_xdr_ready(exposure_df) + + +def test_check_if_xdr_ready_raises_error_non_numeric_time(exposure_df): + from sensospot_data.columns import ( + SETTINGS_EXPOSURE_TIME, + SETTINGS_EXPOSURE_CHANNEL, + ) + from sensospot_data.dynamic_range import _check_if_xdr_ready + + exposure_df[SETTINGS_EXPOSURE_TIME] = "X" + exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2 + + with pytest.raises(ValueError): + _check_if_xdr_ready(exposure_df) + + +def test_check_if_xdr_ready_raises_error_on_nan(exposure_df): + from sensospot_data.columns import ( + SETTINGS_EXPOSURE_TIME, + SETTINGS_EXPOSURE_CHANNEL, + ) + from sensospot_data.dynamic_range import _check_if_xdr_ready + + exposure_df[SETTINGS_EXPOSURE_TIME] = numpy.nan + exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2 + + with pytest.raises(ValueError): + _check_if_xdr_ready(exposure_df) + + +def test_check_overflow_limit_defaults(): + from sensospot_data.columns import CALC_SPOT_OVERFLOW, RAW_DATA_SPOT_MEAN + from sensospot_data.dynamic_range import _calc_overflow_info + + data_frame = pandas.DataFrame(data={RAW_DATA_SPOT_MEAN: [0.1, 0.5, 0.6]}) + + result = _calc_overflow_info(data_frame) + + assert list(result[CALC_SPOT_OVERFLOW]) == [False, False, True] + + +def test_check_overflow_limit_custom_limit(): + from sensospot_data.columns import CALC_SPOT_OVERFLOW + from sensospot_data.dynamic_range import _calc_overflow_info + + data_frame = pandas.DataFrame(data={"X": [4, 2, 3, 4]}) + + result = _calc_overflow_info(data_frame, "X", 2) + + assert list(result[CALC_SPOT_OVERFLOW]) == [True, False, True, True] + + +def test_reduce_overflow_multiple_times(normalization_data_frame): + from sensospot_data.dynamic_range import ( + PROBE_MULTI_INDEX, + _reduce_overflow, + _calc_overflow_info, + ) + + data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1) + result = _reduce_overflow(data_frame) + + sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) + + assert list(sorted_results["Value"]) == [ + 1, + 2, + 3, + 1, + 10, + 10, + 10, + 10, + 100, + 100, + 100, + 100, + ] + + +def test_reduce_overflow_only_one_exposure_time(normalization_data_frame): + from sensospot_data.dynamic_range import ( + SETTINGS_EXPOSURE_TIME, + _reduce_overflow, + _calc_overflow_info, + ) + + normalization_data_frame[SETTINGS_EXPOSURE_TIME] = 1 + + data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1) + result = _reduce_overflow(data_frame) + + assert list(result["Value"]) == list(normalization_data_frame["Value"]) + + +def test_blend(normalization_data_frame): + from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, blend + + result = blend(normalization_data_frame, "Saturation", 1) + + sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) + + assert list(sorted_results["Value"]) == [ + 1, + 2, + 3, + 1, + 10, + 10, + 10, + 10, + 100, + 100, + 100, + 100, + ] + + +def test_blend_raises_error(normalization_data_frame): + from sensospot_data.dynamic_range import SETTINGS_EXPOSURE_TIME, blend + + normalization_data_frame[SETTINGS_EXPOSURE_TIME] = "A" + + with pytest.raises(ValueError): + blend(normalization_data_frame, "Saturation", 1) + + +def test_normalize_values_no_param(normalization_data_frame): + from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP + from sensospot_data.dynamic_range import ( + PROBE_MULTI_INDEX, + blend, + normalize_values, + ) + + reduced = blend(normalization_data_frame, "Saturation", 1) + + result = normalize_values(reduced) + + sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) + expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100] + + for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): + assert list(sorted_results[normalized_col]) == expected_values + + +def test_normalize_values_custom_param(normalization_data_frame): + from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP + from sensospot_data.dynamic_range import ( + PROBE_MULTI_INDEX, + blend, + normalize_values, + ) + + reduced = blend(normalization_data_frame, "Saturation", 1) + + result = normalize_values(reduced, 100) + + sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) + expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200] + + for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): + assert list(sorted_results[normalized_col]) == expected_values + + +def test_create_xdr(normalization_data_frame): + from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP + from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, create_xdr + + result = create_xdr(normalization_data_frame, 100, "Saturation", 1) + + sorted_results = result.sort_values(by=PROBE_MULTI_INDEX) + expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200] + + for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): + assert list(sorted_results[normalized_col]) == expected_values diff --git a/tests/test_normalisation.py b/tests/test_normalisation.py deleted file mode 100644 index a38c662..0000000 --- a/tests/test_normalisation.py +++ /dev/null @@ -1,184 +0,0 @@ -from collections import namedtuple - -import pandas - -ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"]) - - -def test_check_overflow_limit_defaults(): - from sensospot_data.normalisation import _check_overflow_limit - - data_frame = pandas.DataFrame(data={"Spot.Mean": [0.1, 0.5, 0.6]}) - - result = _check_overflow_limit(data_frame) - - assert list(result["Calc.Spot.Overflow"]) == [False, False, True] - - -def test_check_overflow_limit_custom_limit(): - from sensospot_data.normalisation import _check_overflow_limit - - data_frame = pandas.DataFrame(data={"Spot.Saturation": [4, 2, 3, 4]}) - - result = _check_overflow_limit(data_frame, "Spot.Saturation", 2) - - assert list(result["Calc.Spot.Overflow"]) == [True, False, True, True] - - -def test_reduce_overflow_in_channel(normalization_data_frame): - from sensospot_data.normalisation import ( - _check_overflow_limit, - _reduce_overflow_in_channel, - ) - - data_frame = _check_overflow_limit( - normalization_data_frame, "Saturation", 1 - ) - result = _reduce_overflow_in_channel(data_frame) - - sorted_results = result.sort_values( - by=["Well.Row", "Well.Column", "Pos.Id"] - ) - - assert list(sorted_results["Value"]) == [ - 1, - 2, - 3, - 1, - 10, - 10, - 10, - 10, - 100, - 100, - 100, - 100, - ] - - -def test_reduce_overflow_in_channel_shortcut(normalization_data_frame): - from sensospot_data.normalisation import ( - _check_overflow_limit, - _reduce_overflow_in_channel, - ) - - normalization_data_frame["Exposure.Time"] = 1 - - data_frame = _check_overflow_limit( - normalization_data_frame, "Saturation", 1 - ) - result = _reduce_overflow_in_channel(data_frame) - - assert result is data_frame - - -def test_reduce_overflow(normalization_data_frame): - from sensospot_data.normalisation import reduce_overflow - - result = reduce_overflow(normalization_data_frame, "Saturation", 1) - - assert "Cy5" in result - - sorted_results = result["Cy5"].sort_values( - by=["Well.Row", "Well.Column", "Pos.Id"] - ) - - assert list(sorted_results["Value"]) == [ - 1, - 2, - 3, - 1, - 10, - 10, - 10, - 10, - 100, - 100, - 100, - 100, - ] - - -def test_infer_normalization_map(normalization_data_frame): - from sensospot_data.utils import split_data_frame - from sensospot_data.normalisation import _infer_normalization_map - - normalization_data_frame.loc[5, "Exposure.Channel"] = "Cy3" - split_frames = split_data_frame( - normalization_data_frame, "Exposure.Channel" - ) - - result = _infer_normalization_map(split_frames) - - assert result == {"Cy3": 25, "Cy5": 50} - - -def test_normalize_channel(normalization_data_frame): - from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP - from sensospot_data.normalisation import reduce_overflow, normalize_channel - - reduced = reduce_overflow(normalization_data_frame, "Saturation", 1) - result = normalize_channel(reduced["Cy5"], 50) - - sorted_results = result.sort_values( - by=["Well.Row", "Well.Column", "Pos.Id"] - ) - expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200] - - for normalized_col in RAW_DATA_NORMALIZATION_MAP.values(): - list(sorted_results[normalized_col]) == expected_values - - -def test_normalize_exposure_time(normalization_data_frame): - from sensospot_data.normalisation import ( - reduce_overflow, - normalize_exposure_time, - ) - - reduced = reduce_overflow(normalization_data_frame, "Saturation", 1) - result = normalize_exposure_time(reduced) - - assert "Cy5" in result - - sorted_results = result["Cy5"].sort_values( - by=["Well.Row", "Well.Column", "Pos.Id"] - ) - expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100] - - assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values - - -def test_normalize_exposure_time_infered_map(normalization_data_frame): - from sensospot_data.normalisation import ( - reduce_overflow, - normalize_exposure_time, - ) - - reduced = reduce_overflow(normalization_data_frame, "Saturation", 1) - result = normalize_exposure_time(reduced) - - assert "Cy5" in result - - sorted_results = result["Cy5"].sort_values( - by=["Well.Row", "Well.Column", "Pos.Id"] - ) - expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100] - - assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values - - -def test_normalize_measurement(data_frame_with_params): - from sensospot_data.normalisation import split_channels - - exposure_map = { - 1: ExposureSetting("Cy3", 100), - 2: ExposureSetting("Cy5", 15), - 3: ExposureSetting("Cy5", 150), - } - - result = split_channels(data_frame_with_params, exposure_map) - cy3_df, cy5_df = result["Cy3"], result["Cy5"] - - assert set(result.keys()) == {"Cy3", "Cy5"} - assert cy3_df["Settings.Normalized.Exposure.Time"].unique() == 100 - assert cy5_df["Settings.Normalized.Exposure.Time"].unique() == 150 diff --git a/tests/test_sensovation_data.py b/tests/test_sensovation_data.py index 5ff25dc..65d36be 100644 --- a/tests/test_sensovation_data.py +++ b/tests/test_sensovation_data.py @@ -4,8 +4,11 @@ def test_import_api(): from sensospot_data import ExposureInfo # noqa: F401 from sensospot_data import run # noqa: F401 + from sensospot_data import blend # noqa: F401 + from sensospot_data import create_xdr # noqa: F401 from sensospot_data import parse_file # noqa: F401 from sensospot_data import parse_folder # noqa: F401 + from sensospot_data import normalize_values # noqa: F401 from sensospot_data import split_data_frame # noqa: F401 from sensospot_data import apply_exposure_map # noqa: F401 from sensospot_data import get_measurement_params # noqa: F401