You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
167 lines
5.4 KiB
167 lines
5.4 KiB
from collections.abc import Mapping, Sequence |
|
|
|
import pandas |
|
|
|
from .columns import ( |
|
AGGREGATION_PREFIX, |
|
META_DATA_WELL_ROW, |
|
META_DATA_EXPOSURE_ID, |
|
META_DATA_WELL_COLUMN, |
|
SETTINGS_EXPOSURE_TIME, |
|
META_DATA_PARAMETERS_TIME, |
|
SETTINGS_EXPOSURE_CHANNEL, |
|
META_DATA_PARAMETERS_CHANNEL, |
|
) |
|
|
|
DEFAULT_AGGREGATION_COLUMNS = [ |
|
META_DATA_EXPOSURE_ID, |
|
META_DATA_WELL_ROW, |
|
META_DATA_WELL_COLUMN, |
|
] |
|
|
|
|
|
def split_data_frame(data_frame, column): |
|
""" splits a data frame on unique column values """ |
|
values = data_frame[column].unique() |
|
masks = {value: (data_frame[column] == value) for value in values} |
|
return {value: data_frame[mask] for value, mask in masks.items()} |
|
|
|
|
|
def _is_list_or_tuple(something): |
|
""" returns true if something is a list or tuple """ |
|
if isinstance(something, Sequence): |
|
return not isinstance(something, str) |
|
return False |
|
|
|
|
|
def _is_numerical(something): |
|
""" returns true if something is an int or float """ |
|
return isinstance(something, int) or isinstance(something, float) |
|
|
|
|
|
def _check_valid_exposure_map_entry(entry): |
|
""" raises a ValueError, if an exposure map entry is not suitable """ |
|
if not _is_list_or_tuple(entry): |
|
raise ValueError("Eposure Map: entries must be tuples or lists") |
|
if not len(entry) == 2: |
|
raise ValueError("Eposure Map: entries must consist of two items") |
|
if not _is_numerical(entry[1]): |
|
raise ValueError("Exposure Map: second entry must be numerical") |
|
|
|
|
|
def _check_exposure_map(data_frame, exposure_map): |
|
"""checks if an exposure maps fits the requirements |
|
|
|
Will raise an ValueError if requirements are not met |
|
""" |
|
if not isinstance(exposure_map, Mapping): |
|
raise ValueError("Exposure Map: map must be a dict") |
|
exposure_ids_in_df = set(data_frame[META_DATA_EXPOSURE_ID].unique()) |
|
exposure_ids_in_map = set(exposure_map.keys()) |
|
if exposure_ids_in_df != exposure_ids_in_map: |
|
msg = ( |
|
f"Exposure Ids {exposure_ids_in_df} don't match " |
|
f"provided map {exposure_ids_in_map}" |
|
) |
|
raise ValueError(msg) |
|
for entry in exposure_map.values(): |
|
_check_valid_exposure_map_entry(entry) |
|
|
|
|
|
def _set_exposure_data_from_parameters(data_frame): |
|
"""infer the exposures from measurement parameters |
|
|
|
will raise a ValueError if the parameters contain NaNs |
|
""" |
|
df = data_frame # shorthand for cleaner code |
|
|
|
if ( |
|
df[META_DATA_PARAMETERS_CHANNEL].hasnans |
|
or df[META_DATA_PARAMETERS_TIME].hasnans |
|
): |
|
raise ValueError("Exposure Map: measurement parameters incomplete") |
|
|
|
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL] |
|
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME] |
|
return df |
|
|
|
|
|
def apply_exposure_map(data_frame, exposure_map=None): |
|
"""applies the parameters of a exposure map to the data frame |
|
|
|
exposure map: |
|
keys: must be the same as the exposure ids, |
|
values: objects with at least time and channel attributes |
|
|
|
if the exposure map is None, the values from the optionally parsed |
|
measurement parameters are used. |
|
|
|
will raise an ValueError, if the provided exposure map does not map to the |
|
exposure ids. |
|
""" |
|
|
|
if exposure_map is None: |
|
return _set_exposure_data_from_parameters(data_frame) |
|
|
|
_check_exposure_map(data_frame, exposure_map) |
|
|
|
exposure_df = pandas.DataFrame.from_dict( |
|
exposure_map, |
|
orient="index", |
|
columns=[SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME], |
|
) |
|
return data_frame.merge( |
|
exposure_df, |
|
how="left", |
|
left_on=META_DATA_EXPOSURE_ID, |
|
right_index=True, |
|
) |
|
|
|
|
|
def aggregate( |
|
data_frame, column, method, on=DEFAULT_AGGREGATION_COLUMNS, new_name=None |
|
): |
|
"""returns the aggregates of one data frame column |
|
|
|
data_frame: pandas data frame with the data to aggregate |
|
column: column name to aggregate |
|
method: method of aggregation |
|
on: list of coulumns to group by, defaults to |
|
- Exposure.Id |
|
- Well.Column |
|
- Well.Row |
|
new_name: the name of the aggregate column |
|
if set to None, a prefix will be added to the original name |
|
""" |
|
if new_name is None: |
|
method_as_name = method.title() |
|
new_name = f"{AGGREGATION_PREFIX}.{method_as_name}.{column}" |
|
grouped = data_frame.groupby(on) |
|
aggregated_data = grouped.agg({column: method}) |
|
aggregated_data.columns = [new_name] |
|
return aggregated_data |
|
|
|
|
|
def add_aggregate( |
|
data_frame, column, method, on=DEFAULT_AGGREGATION_COLUMNS, new_name=None |
|
): |
|
"""aggregates one column in a data frame and |
|
adds the resulting column to the data frame |
|
|
|
data_frame: pandas data frame with the data to aggregate |
|
column: column name to aggregate |
|
method: method of aggregation |
|
on: list of coulumns to group by, defaults to |
|
- Exposure.Id |
|
- Well.Column |
|
- Well.Row |
|
new_name: the name of the aggregate column, |
|
if set to None, a prefix will be added to the original name |
|
""" |
|
aggregated_data = aggregate(data_frame, column, method, on, new_name) |
|
return data_frame.merge( |
|
aggregated_data, |
|
how="left", |
|
left_on=on, |
|
right_index=True, |
|
)
|
|
|