from collections.abc import Mapping, Sequence import pandas from .columns import ( META_DATA_WELL_ROW, META_DATA_EXPOSURE_ID, META_DATA_WELL_COLUMN, SETTINGS_EXPOSURE_TIME, META_DATA_PARAMETERS_TIME, SETTINGS_EXPOSURE_CHANNEL, META_DATA_PARAMETERS_CHANNEL, ) DEFAULT_AGGREGATION_INDEX = [ META_DATA_EXPOSURE_ID, META_DATA_WELL_ROW, META_DATA_WELL_COLUMN, ] def split(data_frame, column): """ splits a data frame on unique column values """ values = data_frame[column].unique() masks = {value: (data_frame[column] == value) for value in values} return {value: data_frame[mask] for value, mask in masks.items()} def _is_list_or_tuple(something): """ returns true if something is a list or tuple """ if isinstance(something, Sequence): return not isinstance(something, str) return False def _is_numerical(something): """ returns true if something is an int or float """ return isinstance(something, int) or isinstance(something, float) def _check_valid_exposure_map_entry(entry): """ raises a ValueError, if an exposure map entry is not suitable """ if not _is_list_or_tuple(entry): raise ValueError("Eposure Map: entries must be tuples or lists") if not len(entry) == 2: raise ValueError("Eposure Map: entries must consist of two items") if not _is_numerical(entry[1]): raise ValueError("Exposure Map: second entry must be numerical") def _check_exposure_map(data_frame, exposure_map): """checks if an exposure maps fits the requirements Will raise an ValueError if requirements are not met """ if not isinstance(exposure_map, Mapping): raise ValueError("Exposure Map: map must be a dict") exposure_ids_in_df = set(data_frame[META_DATA_EXPOSURE_ID].unique()) exposure_ids_in_map = set(exposure_map.keys()) if exposure_ids_in_df != exposure_ids_in_map: msg = ( f"Exposure Ids {exposure_ids_in_df} don't match " f"provided map {exposure_ids_in_map}" ) raise ValueError(msg) for entry in exposure_map.values(): _check_valid_exposure_map_entry(entry) def _set_exposure_data_from_parameters(data_frame): """infer the exposures from measurement parameters will raise a ValueError if the parameters contain NaNs """ df = data_frame # shorthand for cleaner code if ( df[META_DATA_PARAMETERS_CHANNEL].hasnans or df[META_DATA_PARAMETERS_TIME].hasnans ): raise ValueError("Exposure Map: measurement parameters incomplete") df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL] df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME] return df def apply_exposure_map(data_frame, exposure_map=None): """applies the parameters of a exposure map to the data frame exposure map: keys: must be the same as the exposure ids, values: objects with at least time and channel attributes if the exposure map is None, the values from the optionally parsed measurement parameters are used. will raise an ValueError, if the provided exposure map does not map to the exposure ids. """ if exposure_map is None: return _set_exposure_data_from_parameters(data_frame) _check_exposure_map(data_frame, exposure_map) columns=[SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME] map = {k: dict(zip(columns, v)) for k, v in exposure_map.items()} return apply_map(data_frame, map, META_DATA_EXPOSURE_ID) def apply_map(data_frame, map, index_col): """adds a nested dictionary to a data frame on a specific index column map: keys: must be the same as the values in the index column, values: dictionary with new column names as keys and the values example: >>> df = DataFrame(data={"MyIndex": [10, 10, 20]}) >>> map = { ... 10: {"NewCol": "foo"}, ... 20: {"NewCol": "Bar"}, ... } >>> apply_map(df, map, "MyIndex") MyIndex NewCol 0 10 foo 1 10 foo 2 20 bar """ map_df = pandas.DataFrame.from_dict(map, orient="index") return data_frame.merge( map_df, how="left", left_on=index_col, right_index=True, )