Browse Source

normalization is working prior to refactoring

xmlparsing
Holger Frey 4 years ago
parent
commit
ee6d7ff750
  1. 6
      sensospot_data/columns.py
  2. 177
      sensospot_data/normalisation.py
  3. 6
      tests/conftest.py
  4. 292
      tests/test_normailsation.py

6
sensospot_data/columns.py

@ -72,9 +72,11 @@ SETTINGS_EXPOSURE_TIME = "Exposure.Time"
# calculated value for dynamic range normalization # calculated value for dynamic range normalization
CALC_SPOT_OVERFLOW = "Calc.Spot.Overflow" CALC_SPOT_OVERFLOW = "Calc.Spot.Overflow"
# settings for normalized exposure time
SETTINGS_NORMALIZED_EXPOSURE_TIME = "Settings.Normalized.Exposure.Time"
# normalized columns # normalized columns
n_prefix = "Calc.Normalized." n_prefix = "Calc.Normalized."
CALC_NORMALIZED_EXPOSURE_TIME = f"{n_prefix}{SETTINGS_EXPOSURE_TIME}"
CALC_NORMALIZED_BKG_MEAN = f"{n_prefix}{RAW_DATA_BKG_MEAN}" CALC_NORMALIZED_BKG_MEAN = f"{n_prefix}{RAW_DATA_BKG_MEAN}"
CALC_NORMALIZED_SPOT_MEAN = f"{n_prefix}{RAW_DATA_SPOT_MEAN}" CALC_NORMALIZED_SPOT_MEAN = f"{n_prefix}{RAW_DATA_SPOT_MEAN}"
CALC_NORMALIZED_BKG_MEDIAN = f"{n_prefix}{RAW_DATA_BKG_MEDIAN}" CALC_NORMALIZED_BKG_MEDIAN = f"{n_prefix}{RAW_DATA_BKG_MEDIAN}"
@ -85,7 +87,7 @@ CALC_NORMALIZED_BKG_SUM = f"{n_prefix}{RAW_DATA_BKG_SUM}"
CALC_NORMALIZED_SPOT_SUM = f"{n_prefix}{RAW_DATA_SPOT_SUM}" CALC_NORMALIZED_SPOT_SUM = f"{n_prefix}{RAW_DATA_SPOT_SUM}"
# what columns to convert for normalization # what columns to convert for normalization
COLUMN_NORMALIZATION_MAP = { RAW_DATA_NORMALIZATION_MAP = {
RAW_DATA_BKG_MEAN: CALC_NORMALIZED_BKG_MEAN, RAW_DATA_BKG_MEAN: CALC_NORMALIZED_BKG_MEAN,
RAW_DATA_SPOT_MEAN: CALC_NORMALIZED_SPOT_MEAN, RAW_DATA_SPOT_MEAN: CALC_NORMALIZED_SPOT_MEAN,
RAW_DATA_BKG_MEDIAN: CALC_NORMALIZED_BKG_MEDIAN, RAW_DATA_BKG_MEDIAN: CALC_NORMALIZED_BKG_MEDIAN,

177
sensospot_data/normalisation.py

@ -0,0 +1,177 @@
import numpy
from .columns import (
RAW_DATA_POS_ID,
CALC_SPOT_OVERFLOW,
META_DATA_WELL_ROW,
RAW_DATA_SPOT_MEAN,
META_DATA_EXPOSURE_ID,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
META_DATA_PARAMETERS_TIME,
SETTINGS_EXPOSURE_CHANNEL,
RAW_DATA_NORMALIZATION_MAP,
META_DATA_PARAMETERS_CHANNEL,
SETTINGS_NORMALIZED_EXPOSURE_TIME,
)
PROBE_MULTI_INDEX = [
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
RAW_DATA_POS_ID,
]
def _split_data_frame(data_frame, column):
""" splits a data frame on unique column values """
values = data_frame[column].unique()
masks = {value: (data_frame[column] == value) for value in values}
return {value: data_frame[mask] for value, mask in masks.items()}
def _infer_exposure_from_parameters(data_frame):
"""infer the exposures from measurement parameters
will raise a ValueError if the parameters contain NaNs
"""
df = data_frame # shorthand for cleaner code
if (
df[META_DATA_PARAMETERS_CHANNEL].hasnans
or df[META_DATA_PARAMETERS_TIME].hasnans
):
raise ValueError("Exposure Map: measurement parameters incomplete")
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL]
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME]
return df
def apply_exposure_map(data_frame, exposure_map=None):
"""applies the parameters of a exposure map to the data frame
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
will raise an ValueError, if the provided exposure map does not map to the
exposure ids.
"""
if exposure_map is None:
return _infer_exposure_from_parameters(data_frame)
existing = set(data_frame[META_DATA_EXPOSURE_ID].unique())
provided = set(exposure_map.keys())
if existing != provided:
raise ValueError(
f"Exposure Map differs from data frame: {provided} != {existing}"
)
data_frame[SETTINGS_EXPOSURE_CHANNEL] = numpy.nan
data_frame[SETTINGS_EXPOSURE_TIME] = numpy.nan
for exposure_id, exposure_info in exposure_map.items():
mask = data_frame[META_DATA_EXPOSURE_ID] == exposure_id
data_frame.loc[mask, SETTINGS_EXPOSURE_CHANNEL] = exposure_info.channel
data_frame.loc[mask, SETTINGS_EXPOSURE_TIME] = exposure_info.time
return data_frame
def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" add overflow info, based on column and limit """
data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit
return data_frame
def reduce_overflow(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" reduces the data set per channel, eliminating overflowing spots """
data_frame = _check_overflow_limit(data_frame, column, limit)
split_frames = _split_data_frame(data_frame, SETTINGS_EXPOSURE_CHANNEL)
return {
channel_id: _reduce_overflow_in_channel(channel_frame)
for channel_id, channel_frame in split_frames.items()
}
def _reduce_overflow_in_channel(channel_frame):
""" does the heavy lifting for reduce_overflow """
split_frames = _split_data_frame(channel_frame, SETTINGS_EXPOSURE_TIME)
if len(split_frames) == 1:
# shortcut, if there is only one exposure in the channel
return channel_frame
exposure_times = sorted(split_frames.keys(), reverse=True)
max_time, *rest_times = exposure_times
result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX)
for next_time in rest_times:
mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712
next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX)
result_frame.loc[mask] = next_frame.loc[mask]
return result_frame.reset_index()
def _infer_normalization_map(split_data_frames):
""" extract a time normalization map from split data frames """
return {
key: frame[SETTINGS_EXPOSURE_TIME].max()
for key, frame in split_data_frames.items()
}
def normalize_exposure_time(split_data_frames):
"""add time normalized values to the split data frames
The max exposure time per channel is used for normalization.
"""
normalization_map = _infer_normalization_map(split_data_frames)
return {
key: normalize_channel(frame, normalization_map[key])
for key, frame in split_data_frames.items()
}
def normalize_channel(channel_frame, normalized_time):
""" add time normalized values to a channel data frames """
channel_frame = channel_frame.copy()
channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items():
channel_frame[normalized_col] = (
channel_frame[original_col] / channel_frame[SETTINGS_EXPOSURE_TIME]
) * channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME]
return channel_frame
def split_channels(
data_frame,
exposure_map=None,
overflow_column=RAW_DATA_SPOT_MEAN,
overflow_limit=0.5,
):
"""augment normalize the measurement exposures
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
The max exposure time per channel is used for normalization.
"""
exposure_data_frame = apply_exposure_map(data_frame, exposure_map)
split_data_frames = reduce_overflow(
exposure_data_frame, overflow_column, overflow_limit
)
return normalize_exposure_time(split_data_frames)

6
tests/conftest.py

@ -9,7 +9,7 @@ EXAMPLE_DIR_WO_PARAMS = "mtp_wo_parameters"
EXAMPLE_DIR_WITH_PARAMS = "mtp_with_parameters" EXAMPLE_DIR_WITH_PARAMS = "mtp_with_parameters"
@pytest.fixture @pytest.fixture(scope="session")
def example_dir(request): def example_dir(request):
root_dir = Path(request.config.rootdir) root_dir = Path(request.config.rootdir)
yield root_dir / "example_data" yield root_dir / "example_data"
@ -40,7 +40,7 @@ def dir_for_caching(tmpdir, example_file):
@pytest.fixture @pytest.fixture
def normalization_data_frame(): def normalization_data_frame():
from sensospot_data.columns import COLUMN_NORMALIZATION from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
overflow_test_values = [ overflow_test_values = [
(1, 1, 1, 50, 1, 0), (1, 1, 1, 50, 1, 0),
@ -94,7 +94,7 @@ def normalization_data_frame():
data_frame = pandas.DataFrame(overflow_test_data) data_frame = pandas.DataFrame(overflow_test_data)
data_frame["Exposure.Channel"] = "Cy5" data_frame["Exposure.Channel"] = "Cy5"
for value_column in COLUMN_NORMALIZATION.keys(): for value_column in RAW_DATA_NORMALIZATION_MAP.keys():
data_frame[value_column] = data_frame["Value"] data_frame[value_column] = data_frame["Value"]
yield data_frame yield data_frame

292
tests/test_normailsation.py

@ -0,0 +1,292 @@
from collections import namedtuple
import pandas
import pytest
from .conftest import EXAMPLE_DIR_WO_PARAMS, EXAMPLE_DIR_WITH_PARAMS
ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"])
@pytest.fixture(scope="session")
def data_frame_with_params(example_dir):
from sensospot_data.parser import parse_folder
return parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
@pytest.fixture(scope="session")
def data_frame_without_params(example_dir):
from sensospot_data.parser import parse_folder
return parse_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
@pytest.fixture
def df_wp(data_frame_with_params):
return data_frame_with_params.copy()
@pytest.fixture
def df_wop(data_frame_without_params):
return data_frame_without_params.copy()
def test_split_data_frame(df_wp):
from sensospot_data.normalisation import _split_data_frame
result = _split_data_frame(df_wp, "Well.Row")
assert set(result.keys()) == set("ABC")
for key, value_df in result.items():
assert set(value_df["Well.Row"].unique()) == {key}
def test_infer_exposure_from_parameters(df_wp):
from sensospot_data.normalisation import _infer_exposure_from_parameters
result = _infer_exposure_from_parameters(df_wp)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_infer_exposure_from_parameters_raises_error(df_wop):
from sensospot_data.normalisation import _infer_exposure_from_parameters
with pytest.raises(ValueError) as excinfo:
_infer_exposure_from_parameters(df_wop)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_apply_exposure_map(df_wp):
from sensospot_data.normalisation import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
result = apply_exposure_map(df_wp, exposure_map)
for key, value in exposure_map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["Exposure.Channel"].unique()) == {value.channel}
assert set(partial["Exposure.Time"].unique()) == {value.time}
def test_apply_exposure_map_raises_error(df_wp):
from sensospot_data.normalisation import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
"X": ExposureSetting("Cy5", 150),
}
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(df_wp, exposure_map)
assert str(excinfo.value).startswith("Exposure Map differs")
def test_apply_exposure_map_from_parameters(df_wp):
from sensospot_data.normalisation import apply_exposure_map
result = apply_exposure_map(df_wp, None)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_apply_exposure_map_from_parameters_raises_error(df_wop):
from sensospot_data.normalisation import apply_exposure_map
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(df_wop, None)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_check_overflow_limit_defaults():
from sensospot_data.normalisation import _check_overflow_limit
data_frame = pandas.DataFrame(data={"Spot.Mean": [0.1, 0.5, 0.6]})
result = _check_overflow_limit(data_frame)
assert list(result["Calc.Spot.Overflow"]) == [False, False, True]
def test_check_overflow_limit_custom_limit():
from sensospot_data.normalisation import _check_overflow_limit
data_frame = pandas.DataFrame(data={"Spot.Saturation": [4, 2, 3, 4]})
result = _check_overflow_limit(data_frame, "Spot.Saturation", 2)
assert list(result["Calc.Spot.Overflow"]) == [True, False, True, True]
def test_reduce_overflow_in_channel(normalization_data_frame):
from sensospot_data.normalisation import (
_check_overflow_limit,
_reduce_overflow_in_channel,
)
data_frame = _check_overflow_limit(
normalization_data_frame, "Saturation", 1
)
result = _reduce_overflow_in_channel(data_frame)
sorted_results = result.sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_reduce_overflow_in_channel_shortcut(normalization_data_frame):
from sensospot_data.normalisation import (
_check_overflow_limit,
_reduce_overflow_in_channel,
)
normalization_data_frame["Exposure.Time"] = 1
data_frame = _check_overflow_limit(
normalization_data_frame, "Saturation", 1
)
result = _reduce_overflow_in_channel(data_frame)
assert result is data_frame
def test_reduce_overflow(normalization_data_frame):
from sensospot_data.normalisation import reduce_overflow
result = reduce_overflow(normalization_data_frame, "Saturation", 1)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_infer_normalization_map(normalization_data_frame):
from sensospot_data.normalisation import (
_split_data_frame,
_infer_normalization_map,
)
normalization_data_frame.loc[5, "Exposure.Channel"] = "Cy3"
split_frames = _split_data_frame(
normalization_data_frame, "Exposure.Channel"
)
result = _infer_normalization_map(split_frames)
assert result == {"Cy3": 25, "Cy5": 50}
def test_normalize_channel(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.normalisation import reduce_overflow, normalize_channel
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_channel(reduced["Cy5"], 50)
sorted_results = result.sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
list(sorted_results[normalized_col]) == expected_values
def test_normalize_exposure_time(normalization_data_frame):
from sensospot_data.normalisation import (
reduce_overflow,
normalize_exposure_time,
)
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_exposure_time(reduced)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100]
assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values
def test_normalize_exposure_time_infered_map(normalization_data_frame):
from sensospot_data.normalisation import (
reduce_overflow,
normalize_exposure_time,
)
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_exposure_time(reduced)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100]
assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values
def test_normalize_measurement(df_wp):
from sensospot_data.normalisation import split_channels
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
result = split_channels(df_wp, exposure_map)
cy3_df, cy5_df = result["Cy3"], result["Cy5"]
assert set(result.keys()) == {"Cy3", "Cy5"}
assert cy3_df["Settings.Normalized.Exposure.Time"].unique() == 100
assert cy5_df["Settings.Normalized.Exposure.Time"].unique() == 150
Loading…
Cancel
Save