from collections.abc import Mapping, Sequence import pandas from .columns import ( AGGREGATION_PREFIX, META_DATA_WELL_ROW, META_DATA_EXPOSURE_ID, META_DATA_WELL_COLUMN, SETTINGS_EXPOSURE_TIME, META_DATA_PARAMETERS_TIME, SETTINGS_EXPOSURE_CHANNEL, META_DATA_PARAMETERS_CHANNEL, ) DEFAULT_AGGREGATION_INDEX = [ META_DATA_EXPOSURE_ID, META_DATA_WELL_ROW, META_DATA_WELL_COLUMN, ] def split(data_frame, column): """ splits a data frame on unique column values """ values = data_frame[column].unique() masks = {value: (data_frame[column] == value) for value in values} return {value: data_frame[mask] for value, mask in masks.items()} def _is_list_or_tuple(something): """ returns true if something is a list or tuple """ if isinstance(something, Sequence): return not isinstance(something, str) return False def _is_numerical(something): """ returns true if something is an int or float """ return isinstance(something, int) or isinstance(something, float) def _check_valid_exposure_map_entry(entry): """ raises a ValueError, if an exposure map entry is not suitable """ if not _is_list_or_tuple(entry): raise ValueError("Eposure Map: entries must be tuples or lists") if not len(entry) == 2: raise ValueError("Eposure Map: entries must consist of two items") if not _is_numerical(entry[1]): raise ValueError("Exposure Map: second entry must be numerical") def _check_exposure_map(data_frame, exposure_map): """checks if an exposure maps fits the requirements Will raise an ValueError if requirements are not met """ if not isinstance(exposure_map, Mapping): raise ValueError("Exposure Map: map must be a dict") exposure_ids_in_df = set(data_frame[META_DATA_EXPOSURE_ID].unique()) exposure_ids_in_map = set(exposure_map.keys()) if exposure_ids_in_df != exposure_ids_in_map: msg = ( f"Exposure Ids {exposure_ids_in_df} don't match " f"provided map {exposure_ids_in_map}" ) raise ValueError(msg) for entry in exposure_map.values(): _check_valid_exposure_map_entry(entry) def _set_exposure_data_from_parameters(data_frame): """infer the exposures from measurement parameters will raise a ValueError if the parameters contain NaNs """ df = data_frame # shorthand for cleaner code if ( df[META_DATA_PARAMETERS_CHANNEL].hasnans or df[META_DATA_PARAMETERS_TIME].hasnans ): raise ValueError("Exposure Map: measurement parameters incomplete") df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL] df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME] return df def apply_exposure_map(data_frame, exposure_map=None): """applies the parameters of a exposure map to the data frame exposure map: keys: must be the same as the exposure ids, values: objects with at least time and channel attributes if the exposure map is None, the values from the optionally parsed measurement parameters are used. will raise an ValueError, if the provided exposure map does not map to the exposure ids. """ if exposure_map is None: return _set_exposure_data_from_parameters(data_frame) _check_exposure_map(data_frame, exposure_map) exposure_df = pandas.DataFrame.from_dict( exposure_map, orient="index", columns=[SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME], ) return data_frame.merge( exposure_df, how="left", left_on=META_DATA_EXPOSURE_ID, right_index=True, ) def aggregate( data_frame, column, method, on=DEFAULT_AGGREGATION_INDEX, new_name=None ): """returns the aggregates of one data frame column data_frame: pandas data frame with the data to aggregate column: column name to aggregate method: method of aggregation on: list of coulumns to group by, defaults to - Exposure.Id - Well.Column - Well.Row new_name: the name of the aggregate column if set to None, a prefix will be added to the original name """ if new_name is None: method_as_name = method.title() new_name = f"{AGGREGATION_PREFIX}.{method_as_name}.{column}" grouped = data_frame.groupby(on) aggregated_data = grouped.agg({column: method}) aggregated_data.columns = [new_name] return aggregated_data def add_aggregate( data_frame, column, method, on=DEFAULT_AGGREGATION_INDEX, new_name=None ): """aggregates one column in a data frame and adds the resulting column to the data frame data_frame: pandas data frame with the data to aggregate column: column name to aggregate method: method of aggregation on: list of coulumns to group by, defaults to - Exposure.Id - Well.Column - Well.Row new_name: the name of the aggregate column, if set to None, a prefix will be added to the original name """ aggregated_data = aggregate(data_frame, column, method, on, new_name) return data_frame.merge( aggregated_data, how="left", left_on=on, right_index=True, )