Browse Source

removed "utils" and "dynamic_range"

this two modules should be added to a new project. This project should concentrate on  on just parsing the data.
xmlparsing
Holger Frey 3 years ago
parent
commit
0290c0a9ba
  1. 6
      CHANGES.md
  2. 54
      README.md
  3. 10
      sensospot_data/__init__.py
  4. 110
      sensospot_data/dynamic_range.py
  5. 34
      sensospot_data/parameters.py
  6. 140
      sensospot_data/utils.py
  7. 249
      tests/test_dynamic_range.py
  8. 59
      tests/test_parameters.py
  9. 6
      tests/test_sensovation_data.py
  10. 238
      tests/test_utils.py

6
CHANGES.md

@ -1,3 +1,9 @@ @@ -1,3 +1,9 @@
0.6.0 - doing splits
--------------------
- the modules `utils` and `dynamic_range` were deleted and will be moved into a separate project
- the resulting output file format is now a tab-delimered csv for more compability
0.5.0 - real life fixes
-----------------------

54
README.md

@ -1,8 +1,7 @@ @@ -1,8 +1,7 @@
Sensospot Data Parser
=====================
Parsing the numerical output from Sensovation Sensospot image analysis and some
other useful functions for working with the data.
Parsing the numerical output from Sensovation Sensospot image analysis.
## Example:
@ -13,21 +12,14 @@ other useful functions for working with the data. @@ -13,21 +12,14 @@ other useful functions for working with the data.
# read the raw data of a folder
raw_data = sensospot_data.parse_folder(<path to results directory>)
# apply an exposure map to add more data:
# key relates to column "Exposure.Id"
# values are (Exposure.Channel, Exposure.Time)
exposure_map = {
1: ("Cy3", 100),
2: ("Cy5", 150),
3: ("Cy5", 15),
}
enhanced_data = sensospot_data.apply_exposure_map(raw_data, exposure_map)
# split the measurement according to channels
channels = sensospot_data.split(enhanced_data "Exposure.Channel")
# merge the two cy5 measurements together, creating an extended dynamic range
cy5_xdr = sensospot_data.create_xdr(channels["cy5"], normalized_time=25)
sorted(raw_data.columns) == [
'Bkg.Area', 'Bkg.Mean', 'Bkg.Median', 'Bkg.StdDev', 'Bkg.Sum',
'Exposure.Id',
'Parameters.Channel', 'Parameters.Time',
'Pos.Id', 'Pos.Nom.X', 'Pos.Nom.Y', 'Pos.X', 'Pos.Y',
'Spot.Area', 'Spot.Diameter', 'Spot.Found', 'Spot.Mean', 'Spot.Median', 'Spot.Saturation', 'Spot.StdDev', 'Spot.Sum',
'Well.Column', 'Well.Name', 'Well.Row']
]
```
## Avaliable functions:
@ -40,31 +32,7 @@ from .parser import parse_file, parse_folder # noqa: F401 @@ -40,31 +32,7 @@ from .parser import parse_file, parse_folder # noqa: F401
- **parse_file(path_to_csv_file)**
Parses the csv file into a pandas data frame and will add additional some
meta data from the file name. Is internally also used by `parse_folder()`
- **split(data_frame, column)**
Splits a data frame based on the unique values of a column. Will return a
dict, with the unique values as keys and the corresponding data frame as
value
- **apply_map(data_frame, map, index_col)**
Adds information provided in the nested dictionary `map` to a data frame,
based on the values in the data_frame column `index_col`.
- **apply_exposure_map(data_frame, exposure_map)**
Adds information about the channel and exposure time to a data frame, based
on the exposure id. Will get bonus karma points, if the named tuple
`ExposureInfo` is used:
`{1:ExposureInfo("Cy3", 100), 2:ExposureInfo("Cy3", 100), }`
- **ExposureInfo(exposure_channel, exposure_time)**
A named tuple for defining an exposure map. Usage will increase readability
and karma points.
- **blend(data_frame, [column="Spot.Saturation", limit=2])**
If provided with a data frame with multiple exposure times for the same
exposure channel, the function will blend theese two times together based
on given column and limit.
- **normalize_values(data_frame, [normalized_time=None])**
Adds new columns to the data frame with intensity values recalculated to the
normalized exposure time. If no time is given, the max exposure time is used.
- **create_xdr(data_frame, [normalized_time=None, column="Spot.Saturation", limit=2])**
This combines the methods `blend()` and `normalize_values()` into one call.
What a joy!
## CLI
@ -76,7 +44,7 @@ Arguments: @@ -76,7 +44,7 @@ Arguments:
SOURCE: Folder with Sensospot measurement
Options:
-o, --outfile TEXT Output file name, relative to SOURCE, defaults to 'raw_data.h5'
-o, --outfile TEXT Output file name, relative to SOURCE, defaults to 'collected_data.csv'
--help Show this message and exit.
```

10
sensospot_data/__init__.py

@ -3,17 +3,15 @@ @@ -3,17 +3,15 @@
Parsing the numerical output from Sensovations Sensospot image analysis.
"""
__version__ = "0.5.4"
__version__ = "0.6.0"
from pathlib import Path
import click
from .utils import split, apply_map, apply_exposure_map # noqa: F401
from .parser import parse_file, parse_folder # noqa: F401
from .parameters import ExposureInfo # noqa: F401
from .dynamic_range import blend, create_xdr, normalize_values # noqa: F401
@click.command()
@ -30,7 +28,7 @@ from .dynamic_range import blend, create_xdr, normalize_values # noqa: F401 @@ -30,7 +28,7 @@ from .dynamic_range import blend, create_xdr, normalize_values # noqa: F401
@click.option(
"-o",
"--outfile",
default="raw_data.h5",
default="collected_data.csv",
help="Output file name",
)
@click.option(
@ -44,5 +42,5 @@ def run(source, outfile, quiet=False): @@ -44,5 +42,5 @@ def run(source, outfile, quiet=False):
source_path = Path(source)
# read the raw data of a folder
raw_data = parse_folder(source_path, quiet=quiet)
hdf5_path = source_path / outfile
raw_data.to_hdf(hdf5_path, key="raw_data", format="table")
csv_file = source_path / outfile
raw_data.to_csv(csv_file, sep="\t")

110
sensospot_data/dynamic_range.py

@ -1,110 +0,0 @@ @@ -1,110 +0,0 @@
from pandas.api.types import is_numeric_dtype
from .utils import split
from .columns import (
RAW_DATA_POS_ID,
RAW_DATA_SPOT_SAT,
CALC_SPOT_OVERFLOW,
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
RAW_DATA_NORMALIZATION_MAP,
SETTINGS_NORMALIZED_EXPOSURE_TIME,
)
PROBE_MULTI_INDEX = [
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
RAW_DATA_POS_ID,
]
def _check_if_xdr_ready(data_frame):
"""check if a data frame meets the constraints for xdr"""
required_columns = {SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME}
if not required_columns.issubset(data_frame.columns):
raise ValueError("XDR: Apply an exposure map first")
if len(data_frame[SETTINGS_EXPOSURE_CHANNEL].unique()) != 1:
raise ValueError("XDR: Mixed Exposure Channels")
if not is_numeric_dtype(data_frame[SETTINGS_EXPOSURE_TIME]):
raise ValueError("XDR: Exposure time is not numerical")
if data_frame[SETTINGS_EXPOSURE_TIME].hasnans:
raise ValueError("XDR: Exposure time contains NaNs")
def _calc_overflow_info(data_frame, column=RAW_DATA_SPOT_SAT, limit=2):
"""add overflow info, based on column and limit"""
data_frame.loc[:, CALC_SPOT_OVERFLOW] = data_frame[column] > limit
return data_frame
def _reduce_overflow(data_frame):
"""the heavy lifting for creating an extended dynamic range"""
split_frames = split(data_frame, SETTINGS_EXPOSURE_TIME)
# get the exposure times, longest first
exposure_times = sorted(split_frames.keys(), reverse=True)
max_time, *rest_times = exposure_times
result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX)
for next_time in rest_times:
mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712
next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX)
rf_index = set(result_frame.index)
nf_index = set(next_frame.index)
diff = rf_index - nf_index | nf_index - rf_index
if diff:
num = len(diff)
raise ValueError(
f"XDR: Scan Data is incomplete, differs on {num} probes"
)
result_frame.loc[mask] = next_frame.loc[mask]
return result_frame.reset_index()
def blend(data_frame, column=RAW_DATA_SPOT_SAT, limit=2):
"""creates an extended dynamic range, eliminating overflowing spots"""
_check_if_xdr_ready(data_frame)
if CALC_SPOT_OVERFLOW not in data_frame.columns:
data_frame = _calc_overflow_info(data_frame, column, limit)
return _reduce_overflow(data_frame)
def normalize_values(data_frame, normalized_time=None):
"""add exposure time normalized values to a data frame
will use the maximum exposure time, if none is provided
and the column SETTINGS_NORMALIZED_EXPOSURE_TIME was not
set before.
"""
if normalized_time:
data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
elif SETTINGS_NORMALIZED_EXPOSURE_TIME not in data_frame.columns:
normalized_time = data_frame[SETTINGS_EXPOSURE_TIME].max()
data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items():
data_frame[normalized_col] = (
data_frame[original_col] / data_frame[SETTINGS_EXPOSURE_TIME]
) * data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME]
return data_frame
def create_xdr(
data_frame,
normalized_time=None,
column=RAW_DATA_SPOT_SAT,
limit=2,
):
"""normalize measurement exposures
normalized_time:
if it is None, the max exposure time is used for normalization.
"""
data_frame = blend(data_frame, column, limit)
return normalize_values(data_frame, normalized_time)

34
sensospot_data/parameters.py

@ -7,9 +7,9 @@ from pathlib import Path @@ -7,9 +7,9 @@ from pathlib import Path
from collections import namedtuple
import numpy
import pandas
from defusedxml import ElementTree
from .utils import apply_map
from .columns import (
META_DATA_EXPOSURE_ID,
META_DATA_PARAMETERS_TIME,
@ -61,7 +61,37 @@ def _add_measurement_params(data_frame, params): @@ -61,7 +61,37 @@ def _add_measurement_params(data_frame, params):
"""adds measurement parameters to a data frame"""
columns = [META_DATA_PARAMETERS_CHANNEL, META_DATA_PARAMETERS_TIME]
map = {k: dict(zip(columns, v)) for k, v in params.items()}
return apply_map(data_frame, map, META_DATA_EXPOSURE_ID)
return _apply_map(data_frame, map, META_DATA_EXPOSURE_ID)
def _apply_map(data_frame, map, index_col):
"""adds a nested dictionary to a data frame on a specific index column
map:
keys: must be the same as the values in the index column,
values: dictionary with new column names as keys and the values
example:
>>> df = DataFrame(data={"MyIndex": [10, 10, 20]})
>>> map = {
... 10: {"NewCol": "foo"},
... 20: {"NewCol": "Bar"},
... }
>>> apply_map(df, map, "MyIndex")
MyIndex NewCol
0 10 foo
1 10 foo
2 20 bar
"""
map_df = pandas.DataFrame.from_dict(map, orient="index")
return data_frame.merge(
map_df,
how="left",
left_on=index_col,
right_index=True,
)
def add_optional_measurement_parameters(data_frame, folder):

140
sensospot_data/utils.py

@ -1,140 +0,0 @@ @@ -1,140 +0,0 @@
from collections.abc import Mapping, Sequence
import pandas
from .columns import (
META_DATA_WELL_ROW,
META_DATA_EXPOSURE_ID,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
META_DATA_PARAMETERS_TIME,
SETTINGS_EXPOSURE_CHANNEL,
META_DATA_PARAMETERS_CHANNEL,
)
DEFAULT_AGGREGATION_INDEX = [
META_DATA_EXPOSURE_ID,
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
]
def split(data_frame, column):
"""splits a data frame on unique column values"""
values = data_frame[column].unique()
masks = {value: (data_frame[column] == value) for value in values}
return {value: data_frame[mask] for value, mask in masks.items()}
def _is_list_or_tuple(something):
"""returns true if something is a list or tuple"""
if isinstance(something, Sequence):
return not isinstance(something, str)
return False
def _is_numerical(something):
"""returns true if something is an int or float"""
return isinstance(something, int) or isinstance(something, float)
def _check_valid_exposure_map_entry(entry):
"""raises a ValueError, if an exposure map entry is not suitable"""
if not _is_list_or_tuple(entry):
raise ValueError("Eposure Map: entries must be tuples or lists")
if not len(entry) == 2:
raise ValueError("Eposure Map: entries must consist of two items")
if not _is_numerical(entry[1]):
raise ValueError("Exposure Map: second entry must be numerical")
def _check_exposure_map(data_frame, exposure_map):
"""checks if an exposure maps fits the requirements
Will raise an ValueError if requirements are not met
"""
if not isinstance(exposure_map, Mapping):
raise ValueError("Exposure Map: map must be a dict")
exposure_ids_in_df = set(data_frame[META_DATA_EXPOSURE_ID].unique())
exposure_ids_in_map = set(exposure_map.keys())
if exposure_ids_in_df != exposure_ids_in_map:
msg = (
f"Exposure Ids {exposure_ids_in_df} don't match "
f"provided map {exposure_ids_in_map}"
)
raise ValueError(msg)
for entry in exposure_map.values():
_check_valid_exposure_map_entry(entry)
def _set_exposure_data_from_parameters(data_frame):
"""infer the exposures from measurement parameters
will raise a ValueError if the parameters contain NaNs
"""
df = data_frame # shorthand for cleaner code
if (
df[META_DATA_PARAMETERS_CHANNEL].hasnans
or df[META_DATA_PARAMETERS_TIME].hasnans
):
raise ValueError("Exposure Map: measurement parameters incomplete")
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL]
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME]
return df
def apply_exposure_map(data_frame, exposure_map=None):
"""applies the parameters of a exposure map to the data frame
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
will raise an ValueError, if the provided exposure map does not map to the
exposure ids.
"""
if exposure_map is None:
return _set_exposure_data_from_parameters(data_frame)
_check_exposure_map(data_frame, exposure_map)
columns = [SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME]
map = {k: dict(zip(columns, v)) for k, v in exposure_map.items()}
return apply_map(data_frame, map, META_DATA_EXPOSURE_ID)
def apply_map(data_frame, map, index_col):
"""adds a nested dictionary to a data frame on a specific index column
map:
keys: must be the same as the values in the index column,
values: dictionary with new column names as keys and the values
example:
>>> df = DataFrame(data={"MyIndex": [10, 10, 20]})
>>> map = {
... 10: {"NewCol": "foo"},
... 20: {"NewCol": "Bar"},
... }
>>> apply_map(df, map, "MyIndex")
MyIndex NewCol
0 10 foo
1 10 foo
2 20 bar
"""
map_df = pandas.DataFrame.from_dict(map, orient="index")
return data_frame.merge(
map_df,
how="left",
left_on=index_col,
right_index=True,
)

249
tests/test_dynamic_range.py

@ -1,249 +0,0 @@ @@ -1,249 +0,0 @@
import numpy
import pandas
import pytest
def test_check_if_xdr_ready_ok(exposure_df):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = 1
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2
result = _check_if_xdr_ready(exposure_df)
assert result is None
@pytest.mark.parametrize(["run"], [[0], [1], [2]])
def test_check_if_xdr_ready_raises_error_missing_column(exposure_df, run):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
columns = [SETTINGS_EXPOSURE_TIME, SETTINGS_EXPOSURE_CHANNEL, "X"]
extra_col = columns[run]
exposure_df[extra_col] = 1
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_if_xdr_ready_raises_error_mixed_channels(exposure_df):
from sensospot_data.columns import (
META_DATA_EXPOSURE_ID,
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = 1
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = exposure_df[META_DATA_EXPOSURE_ID]
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_if_xdr_ready_raises_error_non_numeric_time(exposure_df):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = "X"
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_if_xdr_ready_raises_error_on_nan(exposure_df):
from sensospot_data.columns import (
SETTINGS_EXPOSURE_TIME,
SETTINGS_EXPOSURE_CHANNEL,
)
from sensospot_data.dynamic_range import _check_if_xdr_ready
exposure_df[SETTINGS_EXPOSURE_TIME] = numpy.nan
exposure_df[SETTINGS_EXPOSURE_CHANNEL] = 2
with pytest.raises(ValueError):
_check_if_xdr_ready(exposure_df)
def test_check_overflow_limit_defaults():
from sensospot_data.columns import RAW_DATA_SPOT_SAT, CALC_SPOT_OVERFLOW
from sensospot_data.dynamic_range import _calc_overflow_info
data_frame = pandas.DataFrame(data={RAW_DATA_SPOT_SAT: [1, 2, 3]})
result = _calc_overflow_info(data_frame)
assert list(result[CALC_SPOT_OVERFLOW]) == [False, False, True]
def test_check_overflow_limit_custom_limit():
from sensospot_data.columns import CALC_SPOT_OVERFLOW
from sensospot_data.dynamic_range import _calc_overflow_info
data_frame = pandas.DataFrame(data={"X": [4, 2, 3, 4]})
result = _calc_overflow_info(data_frame, "X", 2)
assert list(result[CALC_SPOT_OVERFLOW]) == [True, False, True, True]
def test_reduce_overflow_multiple_times(normalization_data_frame):
from sensospot_data.dynamic_range import (
PROBE_MULTI_INDEX,
_reduce_overflow,
_calc_overflow_info,
)
data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1)
result = _reduce_overflow(data_frame)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_reduce_overflow_only_one_exposure_time(normalization_data_frame):
from sensospot_data.dynamic_range import (
SETTINGS_EXPOSURE_TIME,
_reduce_overflow,
_calc_overflow_info,
)
normalization_data_frame[SETTINGS_EXPOSURE_TIME] = 1
data_frame = _calc_overflow_info(normalization_data_frame, "Saturation", 1)
result = _reduce_overflow(data_frame)
assert list(result["Value"]) == list(normalization_data_frame["Value"])
def test_blend(normalization_data_frame):
from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, blend
result = blend(normalization_data_frame, "Saturation", 1)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
assert list(sorted_results["Value"]) == [
1,
2,
3,
1,
10,
10,
10,
10,
100,
100,
100,
100,
]
def test_blend_raises_error(normalization_data_frame):
from sensospot_data.dynamic_range import SETTINGS_EXPOSURE_TIME, blend
normalization_data_frame[SETTINGS_EXPOSURE_TIME] = "A"
with pytest.raises(ValueError):
blend(normalization_data_frame, "Saturation", 1)
def test_normalize_values_no_param(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.dynamic_range import (
PROBE_MULTI_INDEX,
blend,
normalize_values,
)
reduced = blend(normalization_data_frame, "Saturation", 1)
result = normalize_values(reduced)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
expected_values = [1, 4, 15, 1, 10, 10, 10, 10, 100, 100, 100, 100]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
assert list(sorted_results[normalized_col]) == expected_values
def test_normalize_values_custom_param(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.dynamic_range import (
PROBE_MULTI_INDEX,
blend,
normalize_values,
)
reduced = blend(normalization_data_frame, "Saturation", 1)
result = normalize_values(reduced, 100)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
assert list(sorted_results[normalized_col]) == expected_values
def test_normalize_values_preset_param(normalization_data_frame):
from sensospot_data.columns import (
RAW_DATA_NORMALIZATION_MAP,
SETTINGS_NORMALIZED_EXPOSURE_TIME,
)
from sensospot_data.dynamic_range import (
PROBE_MULTI_INDEX,
blend,
normalize_values,
)
reduced = blend(normalization_data_frame, "Saturation", 1)
reduced[SETTINGS_NORMALIZED_EXPOSURE_TIME] = 100
result = normalize_values(reduced)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
assert list(sorted_results[normalized_col]) == expected_values
def test_create_xdr(normalization_data_frame):
from sensospot_data.columns import RAW_DATA_NORMALIZATION_MAP
from sensospot_data.dynamic_range import PROBE_MULTI_INDEX, create_xdr
result = create_xdr(normalization_data_frame, 100, "Saturation", 1)
sorted_results = result.sort_values(by=PROBE_MULTI_INDEX)
expected_values = [2, 8, 30, 2, 20, 20, 20, 20, 200, 200, 200, 200]
for normalized_col in RAW_DATA_NORMALIZATION_MAP.values():
assert list(sorted_results[normalized_col]) == expected_values

59
tests/test_parameters.py

@ -119,3 +119,62 @@ def test_add_optional_measurement_parameters_without_params_file( @@ -119,3 +119,62 @@ def test_add_optional_measurement_parameters_without_params_file(
one_exposure_data_frame = exposure_df.loc[mask]
assert one_exposure_data_frame["Parameters.Channel"].hasnans
assert one_exposure_data_frame["Parameters.Time"].hasnans
def test_apply_map(exposure_df):
from sensospot_data.parameters import _apply_map
map = {
1: {"SomeColumn": "A", "OtherColumn": 9},
2: {"SomeColumn": "B", "OtherColumn": 8},
3: {"SomeColumn": "C", "OtherColumn": 7},
}
result = _apply_map(exposure_df, map, "Exposure.Id")
for key, value in map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["SomeColumn"].unique()) == {value["SomeColumn"]}
assert set(partial["OtherColumn"].unique()) == {value["OtherColumn"]}
def test_apply_map_keys_not_in_df(exposure_df):
from sensospot_data.parameters import _apply_map
map = {
1: {"some_col": "A", "other_col": 9},
2: {"some_col": "B", "other_col": 8},
3: {"some_col": "C", "other_col": 7},
4: {"some_col": "D", "other_col": 6},
}
result = _apply_map(exposure_df, map, "Exposure.Id")
for key in (1, 2, 3):
value = map[key]
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["some_col"].unique()) == {value["some_col"]}
assert set(partial["other_col"].unique()) == {value["other_col"]}
assert "D" not in set(result["some_col"].unique())
assert "6" not in set(result["other_col"].unique())
def test_apply_map_not_all_keys_map_to_df(exposure_df):
from sensospot_data.parameters import _apply_map
map = {
1: {"some_col": "A", "other_col": 9},
3: {"some_col": "C", "other_col": 7},
}
result = _apply_map(exposure_df, map, "Exposure.Id")
assert not result.iloc[0].hasnans
assert result.iloc[1].hasnans
assert not result.iloc[2].hasnans
assert result["some_col"].hasnans
assert result["other_col"].hasnans

6
tests/test_sensovation_data.py

@ -4,11 +4,5 @@ @@ -4,11 +4,5 @@
def test_import_api():
from sensospot_data import ExposureInfo # noqa: F401
from sensospot_data import run # noqa: F401
from sensospot_data import blend # noqa: F401
from sensospot_data import split # noqa: F401
from sensospot_data import apply_map # noqa: F401
from sensospot_data import create_xdr # noqa: F401
from sensospot_data import parse_file # noqa: F401
from sensospot_data import parse_folder # noqa: F401
from sensospot_data import normalize_values # noqa: F401
from sensospot_data import apply_exposure_map # noqa: F401

238
tests/test_utils.py

@ -1,238 +0,0 @@ @@ -1,238 +0,0 @@
from collections import namedtuple
import pytest
ExposureSetting = namedtuple("ExposureSetting", ["channel", "time"])
def test_split(data_frame_with_params):
from sensospot_data.utils import split
result = split(data_frame_with_params, "Well.Row")
assert set(result.keys()) == set("ABC")
for key, value_df in result.items():
assert set(value_df["Well.Row"].unique()) == {key}
@pytest.mark.parametrize(
"value,expected",
[
[[1, 2], True],
[(1, 2), True],
[{1, 2}, False],
[{1: 2}, False],
["1, 2", False],
[None, False],
],
)
def test_is_list_or_tuple(value, expected):
from sensospot_data.utils import _is_list_or_tuple
result = _is_list_or_tuple(value)
assert result is expected
@pytest.mark.parametrize(
"value,expected",
[
[1, True],
[1.2, True],
[{1, 2}, False],
[{1: 2}, False],
["1", False],
[None, False],
],
)
def test_is_numerical(value, expected):
from sensospot_data.utils import _is_numerical
result = _is_numerical(value)
assert result is expected
def test_check_valid_exposure_map_entry_ok():
from sensospot_data.utils import _check_valid_exposure_map_entry
result = _check_valid_exposure_map_entry((2, 1))
assert result is None
@pytest.mark.parametrize(
"value", [[], [1], (1, 2, 3), {"a": 1, "b": 2}, ("A", "B")]
)
def test_check_valid_exposure_map_entry_raises_error(value):
from sensospot_data.utils import _check_valid_exposure_map_entry
with pytest.raises(ValueError):
_check_valid_exposure_map_entry(value)
def test_check_exposure_map_ok(exposure_df):
from sensospot_data.utils import _check_exposure_map
exposure_map = {1: ("A", 10), 2: ("B", 20), 3: ("C", 30)}
result = _check_exposure_map(exposure_df, exposure_map)
assert result is None
def test_check_exposure_map_wrong_type(exposure_df):
from sensospot_data.utils import _check_exposure_map
exposure_map = []
with pytest.raises(ValueError):
_check_exposure_map(exposure_df, exposure_map)
def test_check_exposure_map_wrong_ids(exposure_df):
from sensospot_data.utils import _check_exposure_map
exposure_map = {1: ("A", 10), 2: ("B", 20), 4: ("D", 40)}
with pytest.raises(ValueError):
_check_exposure_map(exposure_df, exposure_map)
def test_check_exposure_map_invalid_entries(exposure_df):
from sensospot_data.utils import _check_exposure_map
exposure_map = {1: ("A", 10), 2: ("B", 20), 3: "ERROR"}
with pytest.raises(ValueError):
_check_exposure_map(exposure_df, exposure_map)
def test_infer_exposure_from_parameters(data_frame_with_params):
from sensospot_data.utils import _set_exposure_data_from_parameters
result = _set_exposure_data_from_parameters(data_frame_with_params)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_infer_exposure_from_parameters_raises_error(
data_frame_without_params,
):
from sensospot_data.utils import _set_exposure_data_from_parameters
with pytest.raises(ValueError) as excinfo:
_set_exposure_data_from_parameters(data_frame_without_params)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_apply_exposure_map(data_frame_with_params):
from sensospot_data.utils import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
3: ExposureSetting("Cy5", 150),
}
result = apply_exposure_map(data_frame_with_params, exposure_map)
for key, value in exposure_map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["Exposure.Channel"].unique()) == {value.channel}
assert set(partial["Exposure.Time"].unique()) == {value.time}
def test_apply_exposure_map_raises_error(data_frame_with_params):
from sensospot_data.utils import apply_exposure_map
exposure_map = {
1: ExposureSetting("Cy3", 100),
2: ExposureSetting("Cy5", 15),
"X": ExposureSetting("Cy5", 150),
}
with pytest.raises(ValueError):
apply_exposure_map(data_frame_with_params, exposure_map)
def test_apply_exposure_map_from_parameters(data_frame_with_params):
from sensospot_data.utils import apply_exposure_map
result = apply_exposure_map(data_frame_with_params, None)
assert all(result["Exposure.Channel"] == result["Parameters.Channel"])
assert all(result["Exposure.Time"] == result["Parameters.Time"])
def test_apply_exposure_map_from_parameters_raises_error(
data_frame_without_params,
):
from sensospot_data.utils import apply_exposure_map
with pytest.raises(ValueError) as excinfo:
apply_exposure_map(data_frame_without_params, None)
assert str(excinfo.value).startswith("Exposure Map: measurement")
def test_apply_map(exposure_df):
from sensospot_data.utils import apply_map
map = {
1: {"SomeColumn": "A", "OtherColumn": 9},
2: {"SomeColumn": "B", "OtherColumn": 8},
3: {"SomeColumn": "C", "OtherColumn": 7},
}
result = apply_map(exposure_df, map, "Exposure.Id")
for key, value in map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["SomeColumn"].unique()) == {value["SomeColumn"]}
assert set(partial["OtherColumn"].unique()) == {value["OtherColumn"]}
def test_apply_map_keys_not_in_df(exposure_df):
from sensospot_data.utils import apply_map
map = {
1: {"some_col": "A", "other_col": 9},
2: {"some_col": "B", "other_col": 8},
3: {"some_col": "C", "other_col": 7},
4: {"some_col": "D", "other_col": 6},
}
result = apply_map(exposure_df, map, "Exposure.Id")
for key in (1, 2, 3):
value = map[key]
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["some_col"].unique()) == {value["some_col"]}
assert set(partial["other_col"].unique()) == {value["other_col"]}
assert "D" not in set(result["some_col"].unique())
assert "6" not in set(result["other_col"].unique())
def test_apply_map_not_all_keys_map_to_df(exposure_df):
from sensospot_data.utils import apply_map
map = {
1: {"some_col": "A", "other_col": 9},
3: {"some_col": "C", "other_col": 7},
}
result = apply_map(exposure_df, map, "Exposure.Id")
assert not result.iloc[0].hasnans
assert result.iloc[1].hasnans
assert not result.iloc[2].hasnans
assert result["some_col"].hasnans
assert result["other_col"].hasnans
Loading…
Cancel
Save