You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
140 lines
4.4 KiB
140 lines
4.4 KiB
from collections.abc import Mapping, Sequence |
|
|
|
import pandas |
|
|
|
from .columns import ( |
|
META_DATA_WELL_ROW, |
|
META_DATA_EXPOSURE_ID, |
|
META_DATA_WELL_COLUMN, |
|
SETTINGS_EXPOSURE_TIME, |
|
META_DATA_PARAMETERS_TIME, |
|
SETTINGS_EXPOSURE_CHANNEL, |
|
META_DATA_PARAMETERS_CHANNEL, |
|
) |
|
|
|
DEFAULT_AGGREGATION_INDEX = [ |
|
META_DATA_EXPOSURE_ID, |
|
META_DATA_WELL_ROW, |
|
META_DATA_WELL_COLUMN, |
|
] |
|
|
|
|
|
def split(data_frame, column): |
|
"""splits a data frame on unique column values""" |
|
values = data_frame[column].unique() |
|
masks = {value: (data_frame[column] == value) for value in values} |
|
return {value: data_frame[mask] for value, mask in masks.items()} |
|
|
|
|
|
def _is_list_or_tuple(something): |
|
"""returns true if something is a list or tuple""" |
|
if isinstance(something, Sequence): |
|
return not isinstance(something, str) |
|
return False |
|
|
|
|
|
def _is_numerical(something): |
|
"""returns true if something is an int or float""" |
|
return isinstance(something, int) or isinstance(something, float) |
|
|
|
|
|
def _check_valid_exposure_map_entry(entry): |
|
"""raises a ValueError, if an exposure map entry is not suitable""" |
|
if not _is_list_or_tuple(entry): |
|
raise ValueError("Eposure Map: entries must be tuples or lists") |
|
if not len(entry) == 2: |
|
raise ValueError("Eposure Map: entries must consist of two items") |
|
if not _is_numerical(entry[1]): |
|
raise ValueError("Exposure Map: second entry must be numerical") |
|
|
|
|
|
def _check_exposure_map(data_frame, exposure_map): |
|
"""checks if an exposure maps fits the requirements |
|
|
|
Will raise an ValueError if requirements are not met |
|
""" |
|
if not isinstance(exposure_map, Mapping): |
|
raise ValueError("Exposure Map: map must be a dict") |
|
exposure_ids_in_df = set(data_frame[META_DATA_EXPOSURE_ID].unique()) |
|
exposure_ids_in_map = set(exposure_map.keys()) |
|
if exposure_ids_in_df != exposure_ids_in_map: |
|
msg = ( |
|
f"Exposure Ids {exposure_ids_in_df} don't match " |
|
f"provided map {exposure_ids_in_map}" |
|
) |
|
raise ValueError(msg) |
|
for entry in exposure_map.values(): |
|
_check_valid_exposure_map_entry(entry) |
|
|
|
|
|
def _set_exposure_data_from_parameters(data_frame): |
|
"""infer the exposures from measurement parameters |
|
|
|
will raise a ValueError if the parameters contain NaNs |
|
""" |
|
df = data_frame # shorthand for cleaner code |
|
|
|
if ( |
|
df[META_DATA_PARAMETERS_CHANNEL].hasnans |
|
or df[META_DATA_PARAMETERS_TIME].hasnans |
|
): |
|
raise ValueError("Exposure Map: measurement parameters incomplete") |
|
|
|
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL] |
|
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME] |
|
return df |
|
|
|
|
|
def apply_exposure_map(data_frame, exposure_map=None): |
|
"""applies the parameters of a exposure map to the data frame |
|
|
|
exposure map: |
|
keys: must be the same as the exposure ids, |
|
values: objects with at least time and channel attributes |
|
|
|
if the exposure map is None, the values from the optionally parsed |
|
measurement parameters are used. |
|
|
|
will raise an ValueError, if the provided exposure map does not map to the |
|
exposure ids. |
|
""" |
|
|
|
if exposure_map is None: |
|
return _set_exposure_data_from_parameters(data_frame) |
|
|
|
_check_exposure_map(data_frame, exposure_map) |
|
|
|
columns = [SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME] |
|
map = {k: dict(zip(columns, v)) for k, v in exposure_map.items()} |
|
|
|
return apply_map(data_frame, map, META_DATA_EXPOSURE_ID) |
|
|
|
|
|
def apply_map(data_frame, map, index_col): |
|
"""adds a nested dictionary to a data frame on a specific index column |
|
|
|
map: |
|
keys: must be the same as the values in the index column, |
|
values: dictionary with new column names as keys and the values |
|
|
|
example: |
|
|
|
>>> df = DataFrame(data={"MyIndex": [10, 10, 20]}) |
|
>>> map = { |
|
... 10: {"NewCol": "foo"}, |
|
... 20: {"NewCol": "Bar"}, |
|
... } |
|
>>> apply_map(df, map, "MyIndex") |
|
MyIndex NewCol |
|
0 10 foo |
|
1 10 foo |
|
2 20 bar |
|
|
|
""" |
|
map_df = pandas.DataFrame.from_dict(map, orient="index") |
|
return data_frame.merge( |
|
map_df, |
|
how="left", |
|
left_on=index_col, |
|
right_index=True, |
|
)
|
|
|