Browse Source

removed module 'normalisation' in favor of 'dynamic_range'

xmlparsing
Holger Frey 4 years ago
parent
commit
4a3cecbe7b
  1. 1
      sensospot_data/__init__.py
  2. 99
      sensospot_data/dynamic_range.py
  3. 116
      sensospot_data/normalisation.py
  4. 226
      tests/test_dynamic_range.py
  5. 184
      tests/test_normalisation.py
  6. 3
      tests/test_sensovation_data.py

1
sensospot_data/__init__.py

@ -13,6 +13,7 @@ import click @@ -13,6 +13,7 @@ import click
from .utils import split_data_frame, apply_exposure_map # noqa: F401
from .parser import parse_file, parse_folder # noqa: F401
from .parameters import ExposureInfo, get_measurement_params # noqa: F401
from .dynamic_range import blend, create_xdr, normalize_values # noqa: F401
@click.command()

99
sensospot_data/dynamic_range.py

@ -0,0 +1,99 @@ @@ -0,0 +1,99 @@
from pandas.api.types import is_numeric_dtype
from .utils import split_data_frame
from .columns import (
RAW_DATA_POS_ID,
CALC_SPOT_OVERFLOW,
META_DATA_WELL_ROW,
RAW_DATA_SPOT_MEAN,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
RAW_DATA_NORMALIZATION_MAP,
SETTINGS_NORMALIZED_EXPOSURE_TIME,
)
PROBE_MULTI_INDEX = [
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
RAW_DATA_POS_ID,
]
def _check_if_xdr_ready(data_frame):
""" check if a data frame meets the constraints for xdr """
required_columns = {SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME}
if not required_columns.issubset(data_frame.columns):
raise ValueError("XDR: Apply an exposure map first")
if len(data_frame[SETTINGS_EXPOSURE_CHANNEL].unique()) != 1:
raise ValueError("XDR: Mixed Exposure Channels")
if not is_numeric_dtype(data_frame[SETTINGS_EXPOSURE_TIME]):
raise ValueError("XDR: Exposure time is not numerical")
if data_frame[SETTINGS_EXPOSURE_TIME].hasnans:
raise ValueError("XDR: Exposure time contains NaNs")
def _calc_overflow_info(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" add overflow info, based on column and limit """
data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit
return data_frame
def _reduce_overflow(data_frame):
""" the heavy lifting for creating an extended dynamic range """
split_frames = split_data_frame(data_frame, SETTINGS_EXPOSURE_TIME)
# get the exposure times, longest first
exposure_times = sorted(split_frames.keys(), reverse=True)
max_time, *rest_times = exposure_times
result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX)
for next_time in rest_times:
mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712
next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX)
result_frame.loc[mask] = next_frame.loc[mask]
return result_frame.reset_index()
def blend(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" creates an extended dynamic range, eliminating overflowing spots """
_check_if_xdr_ready(data_frame)
data_frame = _calc_overflow_info(data_frame, column, limit)
return _reduce_overflow(data_frame)
def normalize_values(data_frame, normalized_time=None):
"""add exposure time normalized values to a data frame
will use the maximum exposure time, if none is provided
"""
if normalized_time is None:
normalized_time = data_frame[SETTINGS_EXPOSURE_TIME].max()
print(normalized_time)
data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items():
data_frame[normalized_col] = (
data_frame[original_col] / data_frame[SETTINGS_EXPOSURE_TIME]
) * data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME]
return data_frame
def create_xdr(
data_frame,
normalized_time=None,
overflow_column=RAW_DATA_SPOT_MEAN,
overflow_limit=0.5,
):
"""normalize measurement exposures
normalized_time:
if it is None, the max exposure time is used for normalization.
"""
data_frame = blend(data_frame, overflow_column, overflow_limit)
return normalize_values(data_frame, normalized_time)

116
sensospot_data/normalisation.py

@ -1,116 +0,0 @@ @@ -1,116 +0,0 @@
from .columns import (
RAW_DATA_POS_ID,
CALC_SPOT_OVERFLOW,
META_DATA_WELL_ROW,
RAW_DATA_SPOT_MEAN,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
RAW_DATA_NORMALIZATION_MAP,
SETTINGS_NORMALIZED_EXPOSURE_TIME,
)
PROBE_MULTI_INDEX = [
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
RAW_DATA_POS_ID,
]
from .utils import split_data_frame, apply_exposure_map
def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" add overflow info, based on column and limit """
data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit
return data_frame
def reduce_overflow(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
""" reduces the data set per channel, eliminating overflowing spots """
data_frame = _check_overflow_limit(data_frame, column, limit)
split_frames = split_data_frame(data_frame, SETTINGS_EXPOSURE_CHANNEL)
return {
channel_id: _reduce_overflow_in_channel(channel_frame)
for channel_id, channel_frame in split_frames.items()
}
def _reduce_overflow_in_channel(channel_frame):
""" does the heavy lifting for reduce_overflow """
split_frames = split_data_frame(channel_frame, SETTINGS_EXPOSURE_TIME)
if len(split_frames) == 1:
# shortcut, if there is only one exposure in the channel
return channel_frame
exposure_times = sorted(split_frames.keys(), reverse=True)
max_time, *rest_times = exposure_times
result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX)
for next_time in rest_times:
mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712
next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX)
result_frame.loc[mask] = next_frame.loc[mask]
return result_frame.reset_index()
def _infer_normalization_map(split_data_frames):
""" extract a time normalization map from split data frames """
return {
key: frame[SETTINGS_EXPOSURE_TIME].max()
for key, frame in split_data_frames.items()
}
def normalize_exposure_time(split_data_frames):
"""add time normalized values to the split data frames
The max exposure time per channel is used for normalization.
"""
normalization_map = _infer_normalization_map(split_data_frames)
return {
key: normalize_channel(frame, normalization_map[key])
for key, frame in split_data_frames.items()
}
def normalize_channel(channel_frame, normalized_time):
""" add time normalized values to a channel data frames """
channel_frame = channel_frame.copy()
channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items():
channel_frame[normalized_col] = (
channel_frame[original_col] / channel_frame[SETTINGS_EXPOSURE_TIME]
) * channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME]
return channel_frame
def split_channels(
data_frame,
exposure_map=None,
overflow_column=RAW_DATA_SPOT_MEAN,
overflow_limit=0.5,
):
"""augment normalize the measurement exposures
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
The max exposure time per channel is used for normalization.
"""
exposure_data_frame = apply_exposure_map(data_frame, exposure_map)
split_data_frames = reduce_overflow(
exposure_data_frame, overflow_column, overflow_limit
)
return normalize_exposure_time(split_data_frames)

226
tests/test_dynamic_range.py

@ -0,0 +1,226 @@ @@ -0,0 +1,226 @@
import numpy
import pandas
import pytest
def test_check_if_xdr_ready_ok(exposure_df):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = 1
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2
result = _check_if_xdr_ready(exposure_df)
assert result is None
@pytest.mark.parametrize(["run"], [[0], [1], [2]])
def test_check_if_xdr_ready_raises_error_missing_column(exposure_df, run):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
columns = [SETTINGS_EXPOSURE_TIME, SETTINGS_EXPOSURE_CHANNEL, "X"]
extra_col = columns[run]
exposure_df[extra_col] = 1
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_if_xdr_ready_raises_error_mixed_channels(exposure_df):
from sensospot_data.columns import (
META_DATA_EXPOSURE_ID,
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = 1
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = exposure_df[META_DATA_EXPOSURE_ID]
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_if_xdr_ready_raises_error_non_numeric_time(exposure_df):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = "X"
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_if_xdr_ready_raises_error_on_nan(exposure_df):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = numpy.nan
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_overflow_limit_defaults():
from sensospot_data.columns import CALC_SPOT_OVERFLOW, RAW_DATA_SPOT_MEAN
from sensospot_data.dynamic_range import _calc_overflow_info
data_frame = pandas.DataFrame(data={RAW_DATA_SPOT_MEAN: [0.1, 0.5, 0.6]})
result = _calc_overflow_info(data_frame)
assert list(result[CALC_SPOT_OVERFLOW]) == [False, False, True]
def test_check_overflow_limit_custom_limit():
from sensospot_data.columns import CALC_SPOT_OVERFLOW
from sensospot_data.dynamic_range import _calc_overflow_info
data_frame = pandas.DataFrame(data={"X": [4, 2, 3, 4]})
result = _calc_overflow_info(data_frame, "X", 2)
assert list(result[CALC_SPOT_OVERFLOW]) == [True, False, True, True]
def test_reduce_overflow_multiple_times(normalization_data_frame):
from sensospot_data.dynamic_range import (
PROBE_MULTI_INDEX,
_reduce_overflow,
_calc_overflow_info,
)
data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1)
result = _reduce_overflow(data_frame)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_reduce_overflow_only_one_exposure_time(normalization_data_frame):
from sensospot_data.dynamic_range import (
SETTINGS_EXPOSURE_TIME,
_reduce_overflow,
_calc_overflow_info,
)
normalization_data_frame[SETTINGS_EXPOSURE_TIME] = 1
data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1)
result = _reduce_overflow(data_frame)
assert list(result["Value"]) == list(normalization_data_frame["Value"])
def test_blend(normalization_data_frame):
from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, blend
result = blend(normalization_data_frame, "Saturation", 1)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_blend_raises_error(normalization_data_frame):
from sensospot_data.dynamic_range import SETTINGS_EXPOSURE_TIME, blend
normalization_data_frame[SETTINGS_EXPOSURE_TIME] = "A"
with pytest.raises(ValueError):
blend(normalization_data_frame, "Saturation", 1)
def test_normalize_values_no_param(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.dynamic_range import (
PROBE_MULTI_INDEX,
blend,
normalize_values,
)
reduced = blend(normalization_data_frame, "Saturation", 1)
result = normalize_values(reduced)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
assert list(sorted_results[normalized_col]) == expected_values
def test_normalize_values_custom_param(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.dynamic_range import (
PROBE_MULTI_INDEX,
blend,
normalize_values,
)
reduced = blend(normalization_data_frame, "Saturation", 1)
result = normalize_values(reduced, 100)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
assert list(sorted_results[normalized_col]) == expected_values
def test_create_xdr(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, create_xdr
result = create_xdr(normalization_data_frame, 100, "Saturation", 1)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
assert list(sorted_results[normalized_col]) == expected_values

184
tests/test_normalisation.py

@ -1,184 +0,0 @@ @@ -1,184 +0,0 @@
from collections import namedtuple
import pandas
ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"])
def test_check_overflow_limit_defaults():
from sensospot_data.normalisation import _check_overflow_limit
data_frame = pandas.DataFrame(data={"Spot.Mean": [0.1, 0.5, 0.6]})
result = _check_overflow_limit(data_frame)
assert list(result["Calc.Spot.Overflow"]) == [False, False, True]
def test_check_overflow_limit_custom_limit():
from sensospot_data.normalisation import _check_overflow_limit
data_frame = pandas.DataFrame(data={"Spot.Saturation": [4, 2, 3, 4]})
result = _check_overflow_limit(data_frame, "Spot.Saturation", 2)
assert list(result["Calc.Spot.Overflow"]) == [True, False, True, True]
def test_reduce_overflow_in_channel(normalization_data_frame):
from sensospot_data.normalisation import (
_check_overflow_limit,
_reduce_overflow_in_channel,
)
data_frame = _check_overflow_limit(
normalization_data_frame, "Saturation", 1
)
result = _reduce_overflow_in_channel(data_frame)
sorted_results = result.sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_reduce_overflow_in_channel_shortcut(normalization_data_frame):
from sensospot_data.normalisation import (
_check_overflow_limit,
_reduce_overflow_in_channel,
)
normalization_data_frame["Exposure.Time"] = 1
data_frame = _check_overflow_limit(
normalization_data_frame, "Saturation", 1
)
result = _reduce_overflow_in_channel(data_frame)
assert result is data_frame
def test_reduce_overflow(normalization_data_frame):
from sensospot_data.normalisation import reduce_overflow
result = reduce_overflow(normalization_data_frame, "Saturation", 1)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_infer_normalization_map(normalization_data_frame):
from sensospot_data.utils import split_data_frame
from sensospot_data.normalisation import _infer_normalization_map
normalization_data_frame.loc[5, "Exposure.Channel"] = "Cy3"
split_frames = split_data_frame(
normalization_data_frame, "Exposure.Channel"
)
result = _infer_normalization_map(split_frames)
assert result == {"Cy3": 25, "Cy5": 50}
def test_normalize_channel(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.normalisation import reduce_overflow, normalize_channel
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_channel(reduced["Cy5"], 50)
sorted_results = result.sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
list(sorted_results[normalized_col]) == expected_values
def test_normalize_exposure_time(normalization_data_frame):
from sensospot_data.normalisation import (
reduce_overflow,
normalize_exposure_time,
)
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_exposure_time(reduced)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100]
assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values
def test_normalize_exposure_time_infered_map(normalization_data_frame):
from sensospot_data.normalisation import (
reduce_overflow,
normalize_exposure_time,
)
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_exposure_time(reduced)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100]
assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values
def test_normalize_measurement(data_frame_with_params):
from sensospot_data.normalisation import split_channels
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
result = split_channels(data_frame_with_params, exposure_map)
cy3_df, cy5_df = result["Cy3"], result["Cy5"]
assert set(result.keys()) == {"Cy3", "Cy5"}
assert cy3_df["Settings.Normalized.Exposure.Time"].unique() == 100
assert cy5_df["Settings.Normalized.Exposure.Time"].unique() == 150

3
tests/test_sensovation_data.py

@ -4,8 +4,11 @@ @@ -4,8 +4,11 @@
def test_import_api():
from sensospot_data import ExposureInfo # noqa: F401
from sensospot_data import run # noqa: F401
from sensospot_data import blend # noqa: F401
from sensospot_data import create_xdr # noqa: F401
from sensospot_data import parse_file # noqa: F401
from sensospot_data import parse_folder # noqa: F401
from sensospot_data import normalize_values # noqa: F401
from sensospot_data import split_data_frame # noqa: F401
from sensospot_data import apply_exposure_map # noqa: F401
from sensospot_data import get_measurement_params # noqa: F401

Loading…
Cancel
Save