from pandas.api.types import is_numeric_dtype from .utils import split from .columns import ( RAW_DATA_POS_ID, RAW_DATA_SPOT_SAT, CALC_SPOT_OVERFLOW, META_DATA_WELL_ROW, META_DATA_WELL_COLUMN, SETTINGS_EXPOSURE_TIME, SETTINGS_EXPOSURE_CHANNEL, RAW_DATA_NORMALIZATION_MAP, SETTINGS_NORMALIZED_EXPOSURE_TIME, ) PROBE_MULTI_INDEX = [ META_DATA_WELL_ROW, META_DATA_WELL_COLUMN, RAW_DATA_POS_ID, ] def _check_if_xdr_ready(data_frame): """ check if a data frame meets the constraints for xdr """ required_columns = {SETTINGS_EXPOSURE_CHANNEL, SETTINGS_EXPOSURE_TIME} if not required_columns.issubset(data_frame.columns): raise ValueError("XDR: Apply an exposure map first") if len(data_frame[SETTINGS_EXPOSURE_CHANNEL].unique()) != 1: raise ValueError("XDR: Mixed Exposure Channels") if not is_numeric_dtype(data_frame[SETTINGS_EXPOSURE_TIME]): raise ValueError("XDR: Exposure time is not numerical") if data_frame[SETTINGS_EXPOSURE_TIME].hasnans: raise ValueError("XDR: Exposure time contains NaNs") def _calc_overflow_info(data_frame, column=RAW_DATA_SPOT_SAT, limit=2): """ add overflow info, based on column and limit """ data_frame.loc[:, CALC_SPOT_OVERFLOW] = data_frame[column] > limit return data_frame def _reduce_overflow(data_frame): """ the heavy lifting for creating an extended dynamic range """ split_frames = split(data_frame, SETTINGS_EXPOSURE_TIME) # get the exposure times, longest first exposure_times = sorted(split_frames.keys(), reverse=True) max_time, *rest_times = exposure_times result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX) for next_time in rest_times: mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712 next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX) rf_index = set(result_frame.index) nf_index = set(next_frame.index) diff = rf_index - nf_index | nf_index - rf_index if diff: num = len(diff) raise ValueError( f"XDR: Scan Data is incomplete, differs on {num} probes" ) result_frame.loc[mask] = next_frame.loc[mask] return result_frame.reset_index() def blend(data_frame, column=RAW_DATA_SPOT_SAT, limit=2): """ creates an extended dynamic range, eliminating overflowing spots """ _check_if_xdr_ready(data_frame) if CALC_SPOT_OVERFLOW not in data_frame.columns: data_frame = _calc_overflow_info(data_frame, column, limit) return _reduce_overflow(data_frame) def normalize_values(data_frame, normalized_time=None): """add exposure time normalized values to a data frame will use the maximum exposure time, if none is provided and the column SETTINGS_NORMALIZED_EXPOSURE_TIME was not set before. """ if normalized_time: data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time elif SETTINGS_NORMALIZED_EXPOSURE_TIME not in data_frame.columns: normalized_time = data_frame[SETTINGS_EXPOSURE_TIME].max() data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items(): data_frame[normalized_col] = ( data_frame[original_col] / data_frame[SETTINGS_EXPOSURE_TIME] ) * data_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] return data_frame def create_xdr( data_frame, normalized_time=None, column=RAW_DATA_SPOT_SAT, limit=2, ): """normalize measurement exposures normalized_time: if it is None, the max exposure time is used for normalization. """ data_frame = blend(data_frame, column, limit) return normalize_values(data_frame, normalized_time)