Browse Source

added measurement normalization

xmlparsing
Holger Frey 4 years ago
parent
commit
9acf7d9c0a
  1. 2
      .pre-commit-config.yaml
  2. 4
      Makefile
  3. 1
      sensospot_data/__init__.py
  4. 61
      sensospot_data/columns.py
  5. 182
      sensospot_data/normalisation.py
  6. 24
      sensospot_data/parameters.py
  7. 34
      sensospot_data/parser.py
  8. 63
      tests/conftest.py
  9. 294
      tests/test_normailsation.py
  10. 7
      tests/test_parameters.py
  11. 47
      tests/test_parser.py
  12. 1
      tests/test_sensovation_data_parser.py

2
.pre-commit-config.yaml

@ -27,7 +27,7 @@ repos: @@ -27,7 +27,7 @@ repos:
pass_filenames: false
- id: flake8
name: flake8
entry: flake8 --ignore E231 sensospot_data tests
entry: flake8 --ignore E231,W503 sensospot_data tests
language: system
pass_filenames: false
- id: pytest

4
Makefile

@ -54,10 +54,10 @@ lint: ## reformat with black and check style with flake8 @@ -54,10 +54,10 @@ lint: ## reformat with black and check style with flake8
isort -rc sensospot_data
isort -rc tests
black sensospot_data tests
flake8 --ignore E231 sensospot_data tests
flake8 --ignore E231,W503 sensospot_data tests
test: ## run tests quickly with the default Python
pytest tests -x --disable-warnings -k "not app"
pytest tests -x --disable-warnings
coverage: ## full test suite, check code coverage and open coverage report
pytest tests --cov=sensospot_data

1
sensospot_data/__init__.py

@ -15,3 +15,4 @@ from .parser import ( # noqa: F401 @@ -15,3 +15,4 @@ from .parser import ( # noqa: F401
process_folder,
parse_multiple_files,
)
from .normalisation import normalize_measurement # noqa: F401

61
sensospot_data/columns.py

@ -0,0 +1,61 @@ @@ -0,0 +1,61 @@
""" Column name definitions """
# original, unmodified column names
COL_NAME_POS_X = "Pos.X"
COL_NAME_POS_Y = "Pos.Y"
COL_NAME_BKG_MEAN = "Bkg.Mean"
COL_NAME_SPOT_MEAN = "Spot.Mean"
COL_NAME_BKG_MEDIAN = "Bkg.Median"
COL_NAME_SPOT_MEDIAN = "Spot.Median"
COL_NAME_BKG_STDDEV = "Bkg.StdDev"
COL_NAME_SPOT_STDDEV = "Spot.StdDev"
COL_NAME_BKG_SUM = "Bkg.Sum"
COL_NAME_SPOT_SUM = "Spot.Sum"
COL_NAME_BKG_AREA = "Bkg.Area"
COL_NAME_SPOT_AREA = "Spot.Area"
COL_NAME_SPOT_SAT = "Spot.Sat. (%)"
COL_NAME_POS_NOM_X = "Pos.Nom.X"
COL_NAME_POS_NOM_Y = "Pos.Nom.Y"
# replacement column names
COL_NAME_POS_ID = "Pos.Id"
COL_NAME_SPOT_FOUND = "Spot.Found"
COL_NAME_SPOT_DIAMETER = "Spot.Diameter"
# additional column
COL_NAME_SPOT_OVERFLOW = "Spot.Overflow"
# well information
COL_NAME_WELL_ROW = "Well.Row"
COL_NAME_WELL_COLUMN = "Well.Column"
# parsed measurement parameter information
COL_NAME_PARAMETERS_CHANNEL = "Parameters.Channel"
COL_NAME_PARAMETERS_TIME = "Parameters.Time"
# applied exposure info
COL_NAME_EXPOSURE_ID = "Exposure.Id"
COL_NAME_EXPOSURE_CHANNEL = "Exposure.Channel"
COL_NAME_EXPOSURE_TIME = "Exposure.Time"
# normalized columns
COL_NAME_NORMALIZED_EXPOSURE_TIME = f"Normalized.{COL_NAME_EXPOSURE_TIME}"
COL_NAME_NORMALIZED_BKG_MEAN = f"Normalized.{COL_NAME_BKG_MEAN}"
COL_NAME_NORMALIZED_SPOT_MEAN = f"Normalized.{COL_NAME_SPOT_MEAN}"
COL_NAME_NORMALIZED_BKG_MEDIAN = f"Normalized.{COL_NAME_BKG_MEDIAN}"
COL_NAME_NORMALIZED_SPOT_MEDIAN = f"Normalized.{COL_NAME_SPOT_MEDIAN}"
COL_NAME_NORMALIZED_BKG_STDDEV = f"Normalized.{COL_NAME_BKG_STDDEV}"
COL_NAME_NORMALIZED_SPOT_STDDEV = f"Normalized.{COL_NAME_SPOT_STDDEV}"
COL_NAME_NORMALIZED_BKG_SUM = f"Normalized.{COL_NAME_BKG_SUM}"
COL_NAME_NORMALIZED_SPOT_SUM = f"Normalized.{COL_NAME_SPOT_SUM}"
COLUMN_NORMALIZATION = {
COL_NAME_BKG_MEAN: COL_NAME_NORMALIZED_BKG_MEAN,
COL_NAME_SPOT_MEAN: COL_NAME_NORMALIZED_SPOT_MEAN,
COL_NAME_BKG_MEDIAN: COL_NAME_NORMALIZED_BKG_MEDIAN,
COL_NAME_SPOT_MEDIAN: COL_NAME_NORMALIZED_SPOT_MEDIAN,
COL_NAME_BKG_STDDEV: COL_NAME_NORMALIZED_BKG_STDDEV,
COL_NAME_SPOT_STDDEV: COL_NAME_NORMALIZED_SPOT_STDDEV,
COL_NAME_BKG_SUM: COL_NAME_NORMALIZED_BKG_SUM,
COL_NAME_SPOT_SUM: COL_NAME_NORMALIZED_SPOT_SUM,
}

182
sensospot_data/normalisation.py

@ -0,0 +1,182 @@ @@ -0,0 +1,182 @@
import numpy
from .columns import (
COL_NAME_POS_ID,
COL_NAME_WELL_ROW,
COL_NAME_SPOT_MEAN,
COL_NAME_EXPOSURE_ID,
COL_NAME_WELL_COLUMN,
COLUMN_NORMALIZATION,
COL_NAME_EXPOSURE_TIME,
COL_NAME_SPOT_OVERFLOW,
COL_NAME_PARAMETERS_TIME,
COL_NAME_EXPOSURE_CHANNEL,
COL_NAME_PARAMETERS_CHANNEL,
COL_NAME_NORMALIZED_EXPOSURE_TIME,
)
def _split_data_frame(data_frame, column):
""" splits a data frame on unique column values """
values = data_frame[column].unique()
masks = {value: (data_frame[column] == value) for value in values}
return {value: data_frame[mask] for value, mask in masks.items()}
def _infer_exposure_from_parameters(data_frame):
""" infer the exposures from measurement parameters
will raise a ValueError if the parameters contain NaNs
"""
df = data_frame # shorthand for cleaner code
if (
df[COL_NAME_PARAMETERS_CHANNEL].hasnans
or df[COL_NAME_PARAMETERS_TIME].hasnans
):
raise ValueError("Exposure Map: measurement parameters incomplete")
df[COL_NAME_EXPOSURE_CHANNEL] = df[COL_NAME_PARAMETERS_CHANNEL]
df[COL_NAME_EXPOSURE_TIME] = df[COL_NAME_PARAMETERS_TIME]
return df
def apply_exposure_map(data_frame, exposure_map=None):
""" applies the parameters of a exposure map to the data frame
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
will raise an ValueError, if the provided exposure map does not map to the
exposure ids.
"""
if exposure_map is None:
return _infer_exposure_from_parameters(data_frame)
existing = set(data_frame[COL_NAME_EXPOSURE_ID].unique())
provided = set(exposure_map.keys())
if existing != provided:
raise ValueError(
f"Exposure Map differs from data frame: {provided} != {existing}"
)
data_frame[COL_NAME_EXPOSURE_CHANNEL] = numpy.nan
data_frame[COL_NAME_EXPOSURE_TIME] = numpy.nan
for exposure_id, exposure_info in exposure_map.items():
mask = data_frame[COL_NAME_EXPOSURE_ID] == exposure_id
data_frame.loc[mask, COL_NAME_EXPOSURE_CHANNEL] = exposure_info.channel
data_frame.loc[mask, COL_NAME_EXPOSURE_TIME] = exposure_info.time
return data_frame
def _check_overflow_limit(data_frame, column=COL_NAME_SPOT_MEAN, limit=0.5):
""" add overflow info, based on column and limit """
data_frame[COL_NAME_SPOT_OVERFLOW] = data_frame[column] > limit
return data_frame
def reduce_overflow(data_frame, column=COL_NAME_SPOT_MEAN, limit=0.5):
""" reduces the data set per channel, eliminating overflowing spots """
data_frame = _check_overflow_limit(data_frame, column, limit)
split_frames = _split_data_frame(data_frame, COL_NAME_EXPOSURE_CHANNEL)
return {
channel_id: _reduce_overflow_in_channel(channel_frame)
for channel_id, channel_frame in split_frames.items()
}
def _reduce_overflow_in_channel(channel_frame):
""" does the heavy lifting for reduce_overflow """
split_frames = _split_data_frame(channel_frame, COL_NAME_EXPOSURE_TIME)
if len(split_frames) == 1:
# shortcut, if there is only one exposure in the channel
return channel_frame
exposure_times = sorted(split_frames.keys(), reverse=True)
max_time, *rest_times = exposure_times
multi_index = [COL_NAME_WELL_ROW, COL_NAME_WELL_COLUMN, COL_NAME_POS_ID]
result_frame = split_frames[max_time].set_index(multi_index)
for next_time in rest_times:
mask = result_frame[COL_NAME_SPOT_OVERFLOW] == True # noqa: E712
next_frame = split_frames[next_time].set_index(multi_index)
result_frame.loc[mask] = next_frame.loc[mask]
return result_frame.reset_index()
def _infer_normalization_map(split_data_frames):
""" extract a time normalization map from split data frames """
return {
key: frame[COL_NAME_EXPOSURE_TIME].max()
for key, frame in split_data_frames.items()
}
def normalize_exposure_time(split_data_frames, normalization_map=None):
""" add time normalized values to the split data frames
normalization_map:
keys: channel identifier (e.g. "Cy5")
values: target exposure time for normalization
If normalization_map is None, the max exposure time per channel is used
"""
complete_map = _infer_normalization_map(split_data_frames)
if normalization_map is not None:
complete_map.update(normalization_map)
return {
key: _normalize_exposure(frame, complete_map[key])
for key, frame in split_data_frames.items()
}
def _normalize_exposure(channel_frame, normalized_time):
""" add time normalized values to a channel data frames """
channel_frame[COL_NAME_NORMALIZED_EXPOSURE_TIME] = normalized_time
for original_col, normalized_col in COLUMN_NORMALIZATION.items():
channel_frame[normalized_col] = (
channel_frame[original_col] / channel_frame[COL_NAME_EXPOSURE_TIME]
) * channel_frame[COL_NAME_NORMALIZED_EXPOSURE_TIME]
return channel_frame
def normalize_measurement(
data_frame,
exposure_map=None,
normalization_map=None,
overflow_column=COL_NAME_SPOT_MEAN,
overflow_limit=0.5,
):
""" augment normalize the measurement exposures
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
normalization_map:
keys: channel identifier (e.g. "Cy5")
values: target exposure time for normalization
If normalization_map is None, the max exposure time per channel is used
"""
exposure_data_frame = apply_exposure_map(data_frame, exposure_map)
split_data_frames = reduce_overflow(
exposure_data_frame, overflow_column, overflow_limit
)
return normalize_exposure_time(split_data_frames, normalization_map)

24
sensospot_data/parameters.py

@ -9,6 +9,12 @@ from collections import namedtuple @@ -9,6 +9,12 @@ from collections import namedtuple
import numpy
from defusedxml import ElementTree
from .columns import (
COL_NAME_EXPOSURE_ID,
COL_NAME_PARAMETERS_TIME,
COL_NAME_PARAMETERS_CHANNEL,
)
MeasurementParams = namedtuple("MeasurementParams", ["channel", "time"])
@ -53,22 +59,22 @@ def _get_measurement_params(folder): @@ -53,22 +59,22 @@ def _get_measurement_params(folder):
def _add_measurement_params(data_frame, params):
""" adds measurement parameters to a data frame """
for exposure_id, info in params.items():
mask = data_frame["Exposure.Id"] == exposure_id
data_frame.loc[mask, "Parameters.Channel"] = info.channel
data_frame.loc[mask, "Parameters.Time"] = info.time
data_frame["Parameters.Channel"] = data_frame["Parameters.Channel"].astype(
"category"
)
mask = data_frame[COL_NAME_EXPOSURE_ID] == exposure_id
data_frame.loc[mask, COL_NAME_PARAMETERS_CHANNEL] = info.channel
data_frame.loc[mask, COL_NAME_PARAMETERS_TIME] = info.time
data_frame[COL_NAME_PARAMETERS_CHANNEL] = data_frame[
COL_NAME_PARAMETERS_CHANNEL
].astype("category")
return data_frame
def add_optional_measurement_parameters(data_frame, folder):
""" adds measurement params to the data frame, if they could be parsed """
data_frame["Parameters.Channel"] = numpy.nan
data_frame["Parameters.Time"] = numpy.nan
data_frame[COL_NAME_PARAMETERS_CHANNEL] = numpy.nan
data_frame[COL_NAME_PARAMETERS_TIME] = numpy.nan
params = _get_measurement_params(folder)
if params:
available_exposures = set(data_frame["Exposure.Id"].unique())
available_exposures = set(data_frame[COL_NAME_EXPOSURE_ID].unique())
if available_exposures == set(params.keys()):
return _add_measurement_params(data_frame, params)
return data_frame

34
sensospot_data/parser.py

@ -9,6 +9,14 @@ from collections import namedtuple @@ -9,6 +9,14 @@ from collections import namedtuple
import pandas
from .columns import (
COL_NAME_POS_ID,
COL_NAME_WELL_ROW,
COL_NAME_SPOT_FOUND,
COL_NAME_EXPOSURE_ID,
COL_NAME_WELL_COLUMN,
COL_NAME_SPOT_DIAMETER,
)
from .parameters import add_optional_measurement_parameters
REGEX_WELL = re.compile(
@ -21,9 +29,9 @@ REGEX_WELL = re.compile( @@ -21,9 +29,9 @@ REGEX_WELL = re.compile(
COLUMNS_TO_DROP = ["Rect.", "Contour"]
COLUMNS_RENAME_MAP = {
" ID ": "Pos.Id",
"Found": "Spot.Found",
"Dia.": "Spot.Diameter",
" ID ": COL_NAME_POS_ID,
"Found": COL_NAME_SPOT_FOUND,
"Dia.": COL_NAME_SPOT_DIAMETER,
}
CACHE_FILE_NAME = "raw_data.h5"
@ -79,9 +87,9 @@ def parse_file(data_file): @@ -79,9 +87,9 @@ def parse_file(data_file):
""" parses one data file and adds metadata to result """
measurement_info = _extract_measurement_info(data_file)
data_frame = _parse_csv(data_file)
data_frame["Well.Row"] = measurement_info.row
data_frame["Well.Column"] = measurement_info.column
data_frame["Exposure.Id"] = measurement_info.exposure
data_frame[COL_NAME_WELL_ROW] = measurement_info.row
data_frame[COL_NAME_WELL_COLUMN] = measurement_info.column
data_frame[COL_NAME_EXPOSURE_ID] = measurement_info.exposure
return _cleanup_data_columns(data_frame)
@ -93,7 +101,9 @@ def parse_multiple_files(file_list): @@ -93,7 +101,9 @@ def parse_multiple_files(file_list):
data_frame = next(collection)
for next_frame in collection:
data_frame = data_frame.append(next_frame, ignore_index=True)
data_frame["Well.Row"] = data_frame["Well.Row"].astype("category")
data_frame[COL_NAME_WELL_ROW] = data_frame[COL_NAME_WELL_ROW].astype(
"category"
)
return data_frame
@ -107,10 +117,10 @@ def _list_csv_files(folder): @@ -107,10 +117,10 @@ def _list_csv_files(folder):
def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """
field_rows = len(data_frame["Well.Row"].unique())
field_cols = len(data_frame["Well.Column"].unique())
exposures = len(data_frame["Exposure.Id"].unique())
spot_positions = len(data_frame["Pos.Id"].unique())
field_rows = len(data_frame[COL_NAME_WELL_ROW].unique())
field_cols = len(data_frame[COL_NAME_WELL_COLUMN].unique())
exposures = len(data_frame[COL_NAME_EXPOSURE_ID].unique())
spot_positions = len(data_frame[COL_NAME_POS_ID].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions
if expected_rows != len(data_frame):
raise ValueError("Measurements are missing")
@ -125,7 +135,7 @@ def parse_folder(folder): @@ -125,7 +135,7 @@ def parse_folder(folder):
return _sanity_check(data_frame)
def process_folder(folder, exposures=None, use_cache=True):
def process_folder(folder, use_cache=True):
""" parses all csv files in a folder, adds some checks and more data """
hdf5_path = folder / CACHE_FILE_NAME
if use_cache:

63
tests/conftest.py

@ -2,6 +2,7 @@ @@ -2,6 +2,7 @@
from pathlib import Path
import pandas
import pytest
EXAMPLE_DIR_WO_PARAMS = "mtp_wo_parameters"
@ -35,3 +36,65 @@ def dir_for_caching(tmpdir, example_file): @@ -35,3 +36,65 @@ def dir_for_caching(tmpdir, example_file):
dest = temp_path / example_file.name
shutil.copy(example_file, dest)
yield temp_path
@pytest.fixture
def normalization_data_frame():
from sensospot_data.columns import COLUMN_NORMALIZATION
overflow_test_values = [
(1, 1, 1, 50, 1, 0),
(1, 1, 2, 50, 1, 2),
(1, 1, 3, 50, 1, 2),
(1, 1, 4, 50, 1, 0),
(1, 1, 1, 25, 2, 0),
(1, 1, 2, 25, 2, 0),
(1, 1, 3, 25, 2, 2),
(1, 1, 4, 25, 2, 2),
(1, 1, 1, 10, 3, 0),
(1, 1, 2, 10, 3, 0),
(1, 1, 3, 10, 3, 2),
(1, 1, 4, 10, 3, 0),
(1, 2, 1, 50, 10, 0),
(1, 2, 2, 50, 10, 0),
(1, 2, 3, 50, 10, 0),
(1, 2, 4, 50, 10, 0),
(1, 2, 1, 25, 20, 0),
(1, 2, 2, 25, 20, 0),
(1, 2, 3, 25, 20, 2),
(1, 2, 4, 25, 20, 2),
(1, 2, 1, 10, 30, 0),
(1, 2, 2, 10, 30, 0),
(1, 2, 3, 10, 30, 2),
(1, 2, 4, 10, 30, 0),
(2, 1, 1, 50, 100, 0),
(2, 1, 2, 50, 100, 0),
(2, 1, 3, 50, 100, 0),
(2, 1, 4, 50, 100, 0),
(2, 1, 1, 25, 200, 0),
(2, 1, 2, 25, 200, 0),
(2, 1, 3, 25, 200, 2),
(2, 1, 4, 25, 200, 2),
(2, 1, 1, 10, 300, 0),
(2, 1, 2, 10, 300, 0),
(2, 1, 3, 10, 300, 2),
(2, 1, 4, 10, 300, 0),
]
overflow_test_keys = [
"Well.Row",
"Well.Column",
"Pos.Id",
"Exposure.Time",
"Value",
"Saturation",
]
overflow_test_data = [
dict(zip(overflow_test_keys, v)) for v in overflow_test_values
]
data_frame = pandas.DataFrame(overflow_test_data)
data_frame["Exposure.Channel"] = "Cy5"
for value_column in COLUMN_NORMALIZATION.keys():
data_frame[value_column] = data_frame["Value"]
yield data_frame

294
tests/test_normailsation.py

@ -0,0 +1,294 @@ @@ -0,0 +1,294 @@
from collections import namedtuple
import pandas
import pytest
from .conftest import EXAMPLE_DIR_WO_PARAMS, EXAMPLE_DIR_WITH_PARAMS
ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"])
def test_split_data_frame(example_dir):
from sensospot_data.parser import process_folder
from sensospot_data.normalisation import _split_data_frame
data_frame = process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
result = _split_data_frame(data_frame, "Well.Row")
assert set(result.keys()) == set("ABC")
for key, value_df in result.items():
assert set(value_df["Well.Row"].unique()) == {key}
def test_infer_exposure_from_parameters(example_dir):
from sensospot_data.parser import process_folder
from sensospot_data.normalisation import _infer_exposure_from_parameters
data_frame = process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
result = _infer_exposure_from_parameters(data_frame)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_infer_exposure_from_parameters_raises_error(example_dir):
from sensospot_data.parser import process_folder
from sensospot_data.normalisation import _infer_exposure_from_parameters
data_frame = process_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
with pytest.raises(ValueError) as excinfo:
_infer_exposure_from_parameters(data_frame)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_apply_exposure_map(example_dir):
from sensospot_data.parser import process_folder
from sensospot_data.normalisation import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
data_frame = process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
result = apply_exposure_map(data_frame, exposure_map)
for key, value in exposure_map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["Exposure.Channel"].unique()) == {value.channel}
assert set(partial["Exposure.Time"].unique()) == {value.time}
def test_apply_exposure_map_raises_error(example_dir):
from sensospot_data.parser import process_folder
from sensospot_data.normalisation import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
"X": ExposureSetting("Cy5", 150),
}
data_frame = process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(data_frame, exposure_map)
assert str(excinfo.value).startswith("Exposure Map differs")
def test_apply_exposure_map_from_parameters(example_dir):
from sensospot_data.parser import process_folder
from sensospot_data.normalisation import apply_exposure_map
data_frame = process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
result = apply_exposure_map(data_frame, None)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_apply_exposure_map_from_parameters_raises_error(example_dir):
from sensospot_data.parser import process_folder
from sensospot_data.normalisation import apply_exposure_map
data_frame = process_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(data_frame, None)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_check_overflow_limit_defaults():
from sensospot_data.normalisation import _check_overflow_limit
data_frame = pandas.DataFrame(data={"Spot.Mean": [0.1, 0.5, 0.6]})
result = _check_overflow_limit(data_frame)
assert list(result["Spot.Overflow"]) == [False, False, True]
def test_check_overflow_limit_custom_limit():
from sensospot_data.normalisation import _check_overflow_limit
data_frame = pandas.DataFrame(data={"Spot.Sat": [4, 2, 3, 4]})
result = _check_overflow_limit(data_frame, "Spot.Sat", 2)
assert list(result["Spot.Overflow"]) == [True, False, True, True]
def test_reduce_overflow_in_channel(normalization_data_frame):
from sensospot_data.normalisation import (
_reduce_overflow_in_channel,
_check_overflow_limit,
)
data_frame = _check_overflow_limit(
normalization_data_frame, "Saturation", 1
)
result = _reduce_overflow_in_channel(data_frame)
sorted_results = result.sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_reduce_overflow_in_channel_shortcut(normalization_data_frame):
from sensospot_data.normalisation import (
_reduce_overflow_in_channel,
_check_overflow_limit,
)
normalization_data_frame["Exposure.Time"] = 1
data_frame = _check_overflow_limit(
normalization_data_frame, "Saturation", 1
)
result = _reduce_overflow_in_channel(data_frame)
assert result is data_frame
def test_reduce_overflow(normalization_data_frame):
from sensospot_data.normalisation import reduce_overflow
result = reduce_overflow(normalization_data_frame, "Saturation", 1)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_infer_normalization_map(normalization_data_frame):
from sensospot_data.normalisation import (
_infer_normalization_map,
_split_data_frame,
)
normalization_data_frame.loc[5, "Exposure.Channel"] = "Cy3"
split_frames = _split_data_frame(
normalization_data_frame, "Exposure.Channel"
)
result = _infer_normalization_map(split_frames)
assert result == {"Cy3": 25, "Cy5": 50}
def test_normalize_exposure(normalization_data_frame):
from sensospot_data.normalisation import (
_normalize_exposure,
reduce_overflow,
)
from sensospot_data.columns import COLUMN_NORMALIZATION
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = _normalize_exposure(reduced["Cy5"], 100)
sorted_results = result.sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in COLUMN_NORMALIZATION.values():
list(sorted_results[normalized_col]) == expected_values
def test_normalize_exposure_time(normalization_data_frame):
from sensospot_data.normalisation import (
normalize_exposure_time,
reduce_overflow,
)
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_exposure_time(reduced, {"Cy5": 100, "Cy3": 0})
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
assert list(sorted_results["Normalized.Spot.Mean"]) == expected_values
def test_normalize_exposure_time_infered_map(normalization_data_frame):
from sensospot_data.normalisation import (
normalize_exposure_time,
reduce_overflow,
)
reduced = reduce_overflow(normalization_data_frame, "Saturation", 1)
result = normalize_exposure_time(reduced)
assert "Cy5" in result
sorted_results = result["Cy5"].sort_values(
by=["Well.Row", "Well.Column", "Pos.Id"]
)
expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100]
assert list(sorted_results["Normalized.Spot.Mean"]) == expected_values
def test_normalize_measurement(example_dir):
from sensospot_data.normalisation import normalize_measurement
from sensospot_data.parser import process_folder
sub_dir = example_dir / EXAMPLE_DIR_WITH_PARAMS
data_frame = process_folder(sub_dir)
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
normalization_map = {"Cy5": 25}
result = normalize_measurement(data_frame, exposure_map, normalization_map)
cy3_df, cy5_df = result["Cy3"], result["Cy5"]
assert set(result.keys()) == {"Cy3", "Cy5"}
assert cy3_df["Normalized.Exposure.Time"].unique() == 100
assert cy5_df["Normalized.Exposure.Time"].unique() == 25

7
tests/test_parameters.py

@ -113,13 +113,12 @@ def test_add_optional_measurement_parameters_without_params_file( @@ -113,13 +113,12 @@ def test_add_optional_measurement_parameters_without_params_file(
exposure_df, example_dir
):
from sensospot_data.parameters import add_optional_measurement_parameters
from pandas import isnull
folder = example_dir / EXAMPLE_DIR_WO_PARAMS
add_optional_measurement_parameters(exposure_df, folder)
for exposure_id in range(1, 4):
mask = exposure_df["Exposure.Id"] == exposure_id
example_row = exposure_df.loc[mask].iloc[0]
assert isnull(example_row["Parameters.Channel"])
assert isnull(example_row["Parameters.Time"])
one_exposure_data_frame = exposure_df.loc[mask]
assert one_exposure_data_frame["Parameters.Channel"].hasnans
assert one_exposure_data_frame["Parameters.Time"].hasnans

47
tests/test_parser.py

@ -65,7 +65,7 @@ def test_parse_csv_no_array(example_dir): @@ -65,7 +65,7 @@ def test_parse_csv_no_array(example_dir):
@pytest.mark.parametrize(
"input, expected", [("", "."), ("..,", "."), (".,,", ","), ("..,,", "."),]
"input, expected", [("", "."), ("..,", "."), (".,,", ","), ("..,,", ".")]
)
def test_guess_decimal_separator_returns_correct_separator(input, expected):
from sensospot_data.parser import _guess_decimal_separator
@ -107,7 +107,7 @@ def test_well_regex_no_match(input): @@ -107,7 +107,7 @@ def test_well_regex_no_match(input):
@pytest.mark.parametrize(
"filename, expected",
[("A1_1.csv", ("A", 1, 1)), ("test/measurement_1_H12_2", ("H", 12, 2)),],
[("A1_1.csv", ("A", 1, 1)), ("test/measurement_1_H12_2", ("H", 12, 2))],
)
def test_extract_measurement_info_ok(filename, expected):
from sensospot_data.parser import _extract_measurement_info
@ -242,10 +242,7 @@ def test_parse_folder(example_dir): @@ -242,10 +242,7 @@ def test_parse_folder(example_dir):
def test_sanity_check_ok(example_dir):
from sensospot_data.parser import (
_sanity_check,
parse_multiple_files,
)
from sensospot_data.parser import _sanity_check, parse_multiple_files
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
file_list = [
@ -261,10 +258,7 @@ def test_sanity_check_ok(example_dir): @@ -261,10 +258,7 @@ def test_sanity_check_ok(example_dir):
def test_sanity_check_raises_value_error(example_dir):
from sensospot_data.parser import (
_sanity_check,
parse_multiple_files,
)
from sensospot_data.parser import _sanity_check, parse_multiple_files
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
file_list = [
@ -279,11 +273,17 @@ def test_sanity_check_raises_value_error(example_dir): @@ -279,11 +273,17 @@ def test_sanity_check_raises_value_error(example_dir):
_sanity_check(data_frame)
def test_get_cache_table_name():
from sensospot_data.parser import _get_cache_table_name
from sensospot_data import VERSION_TABLE_NAME
result = _get_cache_table_name()
assert result == VERSION_TABLE_NAME
def test_process_folder_creates_cache(dir_for_caching):
from sensospot_data.parser import (
process_folder,
CACHE_FILE_NAME,
)
from sensospot_data.parser import process_folder, CACHE_FILE_NAME
cache_path = dir_for_caching / CACHE_FILE_NAME
assert not cache_path.is_file()
@ -309,10 +309,7 @@ def test_process_folder_reads_from_cache(dir_for_caching, example_file): @@ -309,10 +309,7 @@ def test_process_folder_reads_from_cache(dir_for_caching, example_file):
def test_process_folder_read_cache_fails_silently(
dir_for_caching, exposure_df
):
from sensospot_data.parser import (
process_folder,
CACHE_FILE_NAME,
)
from sensospot_data.parser import process_folder, CACHE_FILE_NAME
cache_path = dir_for_caching / CACHE_FILE_NAME
exposure_df.to_hdf(cache_path, "unknown table")
@ -322,15 +319,6 @@ def test_process_folder_read_cache_fails_silently( @@ -322,15 +319,6 @@ def test_process_folder_read_cache_fails_silently(
assert result["Well.Row"][0] == "A"
def test_get_cache_table_name():
from sensospot_data.parser import _get_cache_table_name
from sensospot_data import VERSION_TABLE_NAME
result = _get_cache_table_name()
assert result == VERSION_TABLE_NAME
def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df):
from sensospot_data.parser import (
process_folder,
@ -347,10 +335,7 @@ def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df): @@ -347,10 +335,7 @@ def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df):
def test_process_folder_writes_cache(dir_for_caching):
from sensospot_data.parser import (
process_folder,
CACHE_FILE_NAME,
)
from sensospot_data.parser import process_folder, CACHE_FILE_NAME
process_folder(dir_for_caching, use_cache=True)

1
tests/test_sensovation_data_parser.py

@ -7,3 +7,4 @@ def test_import_api(): @@ -7,3 +7,4 @@ def test_import_api():
from sensospot_data import parse_folder # noqa: F401
from sensospot_data import parse_multiple_files # noqa: F401
from sensospot_data import process_folder # noqa: F401
from sensospot_data import normalize_measurement # noqa: F401

Loading…
Cancel
Save