Browse Source

moved exposure maps out of normalization module

xmlparsing
Holger Frey 4 years ago
parent
commit
89c04e9b93
  1. 2
      sensospot_data/__init__.py
  2. 57
      sensospot_data/normalisation.py
  3. 61
      sensospot_data/utils.py
  4. 34
      tests/conftest.py
  5. 105
      tests/test_normalisation.py
  6. 3
      tests/test_sensovation_data.py
  7. 86
      tests/test_utils.py

2
sensospot_data/__init__.py

@ -10,9 +10,9 @@ from pathlib import Path
import click import click
from .utils import split_data_frame, apply_exposure_map # noqa: F401
from .parser import parse_file, parse_folder # noqa: F401 from .parser import parse_file, parse_folder # noqa: F401
from .parameters import ExposureInfo, get_measurement_params # noqa: F401 from .parameters import ExposureInfo, get_measurement_params # noqa: F401
from .utils import split_data_frame
@click.command() @click.command()

57
sensospot_data/normalisation.py

@ -1,17 +1,12 @@
import numpy
from .columns import ( from .columns import (
RAW_DATA_POS_ID, RAW_DATA_POS_ID,
CALC_SPOT_OVERFLOW, CALC_SPOT_OVERFLOW,
META_DATA_WELL_ROW, META_DATA_WELL_ROW,
RAW_DATA_SPOT_MEAN, RAW_DATA_SPOT_MEAN,
META_DATA_EXPOSURE_ID,
META_DATA_WELL_COLUMN, META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME, SETTINGS_EXPOSURE_TIME,
META_DATA_PARAMETERS_TIME,
SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_CHANNEL,
RAW_DATA_NORMALIZATION_MAP, RAW_DATA_NORMALIZATION_MAP,
META_DATA_PARAMETERS_CHANNEL,
SETTINGS_NORMALIZED_EXPOSURE_TIME, SETTINGS_NORMALIZED_EXPOSURE_TIME,
) )
@ -21,57 +16,7 @@ PROBE_MULTI_INDEX = [
RAW_DATA_POS_ID, RAW_DATA_POS_ID,
] ]
from .utils import split_data_frame from .utils import split_data_frame, apply_exposure_map
def _infer_exposure_from_parameters(data_frame):
"""infer the exposures from measurement parameters
will raise a ValueError if the parameters contain NaNs
"""
df = data_frame # shorthand for cleaner code
if (
df[META_DATA_PARAMETERS_CHANNEL].hasnans
or df[META_DATA_PARAMETERS_TIME].hasnans
):
raise ValueError("Exposure Map: measurement parameters incomplete")
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL]
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME]
return df
def apply_exposure_map(data_frame, exposure_map=None):
"""applies the parameters of a exposure map to the data frame
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
will raise an ValueError, if the provided exposure map does not map to the
exposure ids.
"""
if exposure_map is None:
return _infer_exposure_from_parameters(data_frame)
existing = set(data_frame[META_DATA_EXPOSURE_ID].unique())
provided = set(exposure_map.keys())
if existing != provided:
raise ValueError(
f"Exposure Map differs from data frame: {provided} != {existing}"
)
data_frame[SETTINGS_EXPOSURE_CHANNEL] = numpy.nan
data_frame[SETTINGS_EXPOSURE_TIME] = numpy.nan
for exposure_id, exposure_info in exposure_map.items():
mask = data_frame[META_DATA_EXPOSURE_ID] == exposure_id
data_frame.loc[mask, SETTINGS_EXPOSURE_CHANNEL] = exposure_info.channel
data_frame.loc[mask, SETTINGS_EXPOSURE_TIME] = exposure_info.time
return data_frame
def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5): def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):

61
sensospot_data/utils.py

@ -1,6 +1,67 @@
import numpy
from .columns import (
META_DATA_EXPOSURE_ID,
SETTINGS_EXPOSURE_TIME,
META_DATA_PARAMETERS_TIME,
SETTINGS_EXPOSURE_CHANNEL,
META_DATA_PARAMETERS_CHANNEL,
)
def split_data_frame(data_frame, column): def split_data_frame(data_frame, column):
""" splits a data frame on unique column values """ """ splits a data frame on unique column values """
values = data_frame[column].unique() values = data_frame[column].unique()
masks = {value: (data_frame[column] == value) for value in values} masks = {value: (data_frame[column] == value) for value in values}
return {value: data_frame[mask] for value, mask in masks.items()} return {value: data_frame[mask] for value, mask in masks.items()}
def _set_exposure_data_from_parameters(data_frame):
"""infer the exposures from measurement parameters
will raise a ValueError if the parameters contain NaNs
"""
df = data_frame # shorthand for cleaner code
if (
df[META_DATA_PARAMETERS_CHANNEL].hasnans
or df[META_DATA_PARAMETERS_TIME].hasnans
):
raise ValueError("Exposure Map: measurement parameters incomplete")
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL]
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME]
return df
def apply_exposure_map(data_frame, exposure_map=None):
"""applies the parameters of a exposure map to the data frame
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
will raise an ValueError, if the provided exposure map does not map to the
exposure ids.
"""
if exposure_map is None:
return _set_exposure_data_from_parameters(data_frame)
existing = set(data_frame[META_DATA_EXPOSURE_ID].unique())
provided = set(exposure_map.keys())
if existing != provided:
raise ValueError(
f"Exposure Map differs from data frame: {provided} != {existing}"
)
data_frame[SETTINGS_EXPOSURE_CHANNEL] = numpy.nan
data_frame[SETTINGS_EXPOSURE_TIME] = numpy.nan
for exposure_id, exposure_info in exposure_map.items():
mask = data_frame[META_DATA_EXPOSURE_ID] == exposure_id
data_frame.loc[mask, SETTINGS_EXPOSURE_CHANNEL] = exposure_info.channel
data_frame.loc[mask, SETTINGS_EXPOSURE_TIME] = exposure_info.time
return data_frame

34
tests/conftest.py

@ -28,16 +28,6 @@ def exposure_df():
yield DataFrame(data={"Exposure.Id": [1, 2, 3]}) yield DataFrame(data={"Exposure.Id": [1, 2, 3]})
@pytest.fixture
def dir_for_caching(tmpdir, example_file):
import shutil
temp_path = Path(tmpdir)
dest = temp_path / example_file.name
shutil.copy(example_file, dest)
yield temp_path
@pytest.fixture @pytest.fixture
def normalization_data_frame(): def normalization_data_frame():
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
@ -98,3 +88,27 @@ def normalization_data_frame():
data_frame[value_column] = data_frame["Value"] data_frame[value_column] = data_frame["Value"]
yield data_frame yield data_frame
@pytest.fixture(scope="session")
def parsed_data_frame_with_params(example_dir):
from sensospot_data.parser import parse_folder
return parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
@pytest.fixture(scope="session")
def parsed_data_frame_without_params(example_dir):
from sensospot_data.parser import parse_folder
return parse_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
@pytest.fixture
def data_frame_with_params(parsed_data_frame_with_params):
return parsed_data_frame_with_params.copy()
@pytest.fixture
def data_frame_without_params(parsed_data_frame_without_params):
return parsed_data_frame_without_params.copy()

105
tests/test_normalisation.py

@ -1,107 +1,10 @@
from collections import namedtuple from collections import namedtuple
import pandas import pandas
import pytest
from .conftest import EXAMPLE_DIR_WO_PARAMS, EXAMPLE_DIR_WITH_PARAMS
ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"]) ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"])
@pytest.fixture(scope="session")
def data_frame_with_params(example_dir):
from sensospot_data.parser import parse_folder
return parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
@pytest.fixture(scope="session")
def data_frame_without_params(example_dir):
from sensospot_data.parser import parse_folder
return parse_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
@pytest.fixture
def df_wp(data_frame_with_params):
return data_frame_with_params.copy()
@pytest.fixture
def df_wop(data_frame_without_params):
return data_frame_without_params.copy()
def test_infer_exposure_from_parameters(df_wp):
from sensospot_data.normalisation import _infer_exposure_from_parameters
result = _infer_exposure_from_parameters(df_wp)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_infer_exposure_from_parameters_raises_error(df_wop):
from sensospot_data.normalisation import _infer_exposure_from_parameters
with pytest.raises(ValueError) as excinfo:
_infer_exposure_from_parameters(df_wop)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_apply_exposure_map(df_wp):
from sensospot_data.normalisation import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
result = apply_exposure_map(df_wp, exposure_map)
for key, value in exposure_map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["Exposure.Channel"].unique()) == {value.channel}
assert set(partial["Exposure.Time"].unique()) == {value.time}
def test_apply_exposure_map_raises_error(df_wp):
from sensospot_data.normalisation import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
"X": ExposureSetting("Cy5", 150),
}
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(df_wp, exposure_map)
assert str(excinfo.value).startswith("Exposure Map differs")
def test_apply_exposure_map_from_parameters(df_wp):
from sensospot_data.normalisation import apply_exposure_map
result = apply_exposure_map(df_wp, None)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_apply_exposure_map_from_parameters_raises_error(df_wop):
from sensospot_data.normalisation import apply_exposure_map
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(df_wop, None)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_check_overflow_limit_defaults(): def test_check_overflow_limit_defaults():
from sensospot_data.normalisation import _check_overflow_limit from sensospot_data.normalisation import _check_overflow_limit
@ -198,9 +101,7 @@ def test_reduce_overflow(normalization_data_frame):
def test_infer_normalization_map(normalization_data_frame): def test_infer_normalization_map(normalization_data_frame):
from sensospot_data.utils import split_data_frame from sensospot_data.utils import split_data_frame
from sensospot_data.normalisation import ( from sensospot_data.normalisation import _infer_normalization_map
_infer_normalization_map,
)
normalization_data_frame.loc[5, "Exposure.Channel"] = "Cy3" normalization_data_frame.loc[5, "Exposure.Channel"] = "Cy3"
split_frames = split_data_frame( split_frames = split_data_frame(
@ -266,7 +167,7 @@ def test_normalize_exposure_time_infered_map(normalization_data_frame):
assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values assert list(sorted_results["Calc.Normalized.Spot.Mean"]) == expected_values
def test_normalize_measurement(df_wp): def test_normalize_measurement(data_frame_with_params):
from sensospot_data.normalisation import split_channels from sensospot_data.normalisation import split_channels
exposure_map = { exposure_map = {
@ -275,7 +176,7 @@ def test_normalize_measurement(df_wp):
3: ExposureSetting("Cy5", 150), 3: ExposureSetting("Cy5", 150),
} }
result = split_channels(df_wp, exposure_map) result = split_channels(data_frame_with_params, exposure_map)
cy3_df, cy5_df = result["Cy3"], result["Cy5"] cy3_df, cy5_df = result["Cy3"], result["Cy5"]
assert set(result.keys()) == {"Cy3", "Cy5"} assert set(result.keys()) == {"Cy3", "Cy5"}

3
tests/test_sensovation_data.py

@ -6,5 +6,6 @@ def test_import_api():
from sensospot_data import run # noqa: F401 from sensospot_data import run # noqa: F401
from sensospot_data import parse_file # noqa: F401 from sensospot_data import parse_file # noqa: F401
from sensospot_data import parse_folder # noqa: F401 from sensospot_data import parse_folder # noqa: F401
from sensospot_data import get_measurement_params # noqa: F401
from sensospot_data import split_data_frame # noqa: F401 from sensospot_data import split_data_frame # noqa: F401
from sensospot_data import apply_exposure_map # noqa: F401
from sensospot_data import get_measurement_params # noqa: F401

86
tests/test_utils.py

@ -1,14 +1,88 @@
from collections import namedtuple
from .conftest import EXAMPLE_DIR_WITH_PARAMS import pytest
ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"])
def test_split_data_frame(example_dir):
from sensospot_data.parser import parse_folder
from sensospot_data.utils import split_data_frame
data_frame = parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS) def test_split_data_frame(data_frame_with_params):
from sensospot_data.utils import split_data_frame
result = split_data_frame(data_frame, "Well.Row") result = split_data_frame(data_frame_with_params, "Well.Row")
assert set(result.keys()) == set("ABC") assert set(result.keys()) == set("ABC")
for key, value_df in result.items(): for key, value_df in result.items():
assert set(value_df["Well.Row"].unique()) == {key} assert set(value_df["Well.Row"].unique()) == {key}
def test_infer_exposure_from_parameters(data_frame_with_params):
from sensospot_data.utils import _set_exposure_data_from_parameters
result = _set_exposure_data_from_parameters(data_frame_with_params)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_infer_exposure_from_parameters_raises_error(
data_frame_without_params,
):
from sensospot_data.utils import _set_exposure_data_from_parameters
with pytest.raises(ValueError) as excinfo:
_set_exposure_data_from_parameters(data_frame_without_params)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_apply_exposure_map(data_frame_with_params):
from sensospot_data.utils import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
result = apply_exposure_map(data_frame_with_params, exposure_map)
for key, value in exposure_map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["Exposure.Channel"].unique()) == {value.channel}
assert set(partial["Exposure.Time"].unique()) == {value.time}
def test_apply_exposure_map_raises_error(data_frame_with_params):
from sensospot_data.utils import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
"X": ExposureSetting("Cy5", 150),
}
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(data_frame_with_params, exposure_map)
assert str(excinfo.value).startswith("Exposure Map differs")
def test_apply_exposure_map_from_parameters(data_frame_with_params):
from sensospot_data.utils import apply_exposure_map
result = apply_exposure_map(data_frame_with_params, None)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_apply_exposure_map_from_parameters_raises_error(
data_frame_without_params,
):
from sensospot_data.utils import apply_exposure_map
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(data_frame_without_params, None)
assert str(excinfo.value).startswith("Exposure Map: measurement")

Loading…
Cancel
Save