Parsing the numerical output from Sensovation SensoSpot image analysis.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

168 lines
5.4 KiB

from collections.abc import Mapping, Sequence
import pandas
from .columns import (
AGGREGATION_PREFIX,
META_DATA_WELL_ROW,
META_DATA_EXPOSURE_ID,
META_DATA_WELL_COLUMN,
SETTINGS_EXPOSURE_TIME,
META_DATA_PARAMETERS_TIME,
SETTINGS_EXPOSURE_CHANNEL,
META_DATA_PARAMETERS_CHANNEL,
)
DEFAULT_AGGREGATION_INDEX = [
META_DATA_EXPOSURE_ID,
META_DATA_WELL_ROW,
META_DATA_WELL_COLUMN,
]
def split(data_frame, column):
""" splits a data frame on unique column values """
values = data_frame[column].unique()
masks = {value: (data_frame[column] == value) for value in values}
return {value: data_frame[mask] for value, mask in masks.items()}
def _is_list_or_tuple(something):
""" returns true if something is a list or tuple """
if isinstance(something, Sequence):
return not isinstance(something, str)
return False
def _is_numerical(something):
""" returns true if something is an int or float """
return isinstance(something, int) or isinstance(something, float)
def _check_valid_exposure_map_entry(entry):
""" raises a ValueError, if an exposure map entry is not suitable """
if not _is_list_or_tuple(entry):
raise ValueError("Eposure Map: entries must be tuples or lists")
if not len(entry) == 2:
raise ValueError("Eposure Map: entries must consist of two items")
if not _is_numerical(entry[1]):
raise ValueError("Exposure Map: second entry must be numerical")
def _check_exposure_map(data_frame, exposure_map):
"""checks if an exposure maps fits the requirements
Will raise an ValueError if requirements are not met
"""
if not isinstance(exposure_map, Mapping):
raise ValueError("Exposure Map: map must be a dict")
exposure_ids_in_df = set(data_frame[META_DATA_EXPOSURE_ID].unique())
exposure_ids_in_map = set(exposure_map.keys())
if exposure_ids_in_df != exposure_ids_in_map:
msg = (
f"Exposure Ids {exposure_ids_in_df} don't match "
f"provided map {exposure_ids_in_map}"
)
raise ValueError(msg)
for entry in exposure_map.values():
_check_valid_exposure_map_entry(entry)
def _set_exposure_data_from_parameters(data_frame):
"""infer the exposures from measurement parameters
will raise a ValueError if the parameters contain NaNs
"""
df = data_frame # shorthand for cleaner code
if (
df[META_DATA_PARAMETERS_CHANNEL].hasnans
or df[META_DATA_PARAMETERS_TIME].hasnans
):
raise ValueError("Exposure Map: measurement parameters incomplete")
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL]
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME]
return df
def apply_exposure_map(data_frame, exposure_map=None):
"""applies the parameters of a exposure map to the data frame
exposure map:
keys: must be the same as the exposure ids,
values: objects with at least time and channel attributes
if the exposure map is None, the values from the optionally parsed
measurement parameters are used.
will raise an ValueError, if the provided exposure map does not map to the
exposure ids.
"""
if exposure_map is None:
return _set_exposure_data_from_parameters(data_frame)
_check_exposure_map(data_frame, exposure_map)
exposure_df = pandas.DataFrame.from_dict(
exposure_map,
orient="index",
columns=[SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME],
)
return data_frame.merge(
exposure_df,
how="left",
left_on=META_DATA_EXPOSURE_ID,
right_index=True,
)
def aggregate(
data_frame, column, method, on=DEFAULT_AGGREGATION_INDEX, new_name=None
):
"""returns the aggregates of one data frame column
data_frame: pandas data frame with the data to aggregate
column: column name to aggregate
method: method of aggregation
on: list of coulumns to group by, defaults to
- Exposure.Id
- Well.Column
- Well.Row
new_name: the name of the aggregate column
if set to None, a prefix will be added to the original name
"""
if new_name is None:
method_as_name = method.title()
new_name = f"{AGGREGATION_PREFIX}.{method_as_name}.{column}"
grouped = data_frame.groupby(on)
aggregated_data = grouped.agg({column: method})
aggregated_data.columns = [new_name]
return aggregated_data
def add_aggregate(
data_frame, column, method, on=DEFAULT_AGGREGATION_INDEX, new_name=None
):
"""aggregates one column in a data frame and
adds the resulting column to the data frame
data_frame: pandas data frame with the data to aggregate
column: column name to aggregate
method: method of aggregation
on: list of coulumns to group by, defaults to
- Exposure.Id
- Well.Column
- Well.Row
new_name: the name of the aggregate column,
if set to None, a prefix will be added to the original name
"""
aggregated_data = aggregate(data_frame, column, method, on, new_name)
return data_frame.merge(
aggregated_data,
how="left",
left_on=on,
right_index=True,
)