You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
171 lines
5.6 KiB
171 lines
5.6 KiB
import numpy |
|
|
|
from .columns import ( |
|
COL_NAME_POS_ID, |
|
COL_NAME_WELL_ROW, |
|
COL_NAME_SPOT_MEAN, |
|
COL_NAME_EXPOSURE_ID, |
|
COL_NAME_WELL_COLUMN, |
|
COLUMN_NORMALIZATION, |
|
COL_NAME_EXPOSURE_TIME, |
|
COL_NAME_SPOT_OVERFLOW, |
|
COL_NAME_PARAMETERS_TIME, |
|
COL_NAME_EXPOSURE_CHANNEL, |
|
COL_NAME_PARAMETERS_CHANNEL, |
|
COL_NAME_NORMALIZED_EXPOSURE_TIME, |
|
) |
|
|
|
|
|
def _split_data_frame(data_frame, column): |
|
""" splits a data frame on unique column values """ |
|
values = data_frame[column].unique() |
|
masks = {value: (data_frame[column] == value) for value in values} |
|
return {value: data_frame[mask] for value, mask in masks.items()} |
|
|
|
|
|
def _infer_exposure_from_parameters(data_frame): |
|
"""infer the exposures from measurement parameters |
|
|
|
will raise a ValueError if the parameters contain NaNs |
|
""" |
|
df = data_frame # shorthand for cleaner code |
|
|
|
if ( |
|
df[COL_NAME_PARAMETERS_CHANNEL].hasnans |
|
or df[COL_NAME_PARAMETERS_TIME].hasnans |
|
): |
|
raise ValueError("Exposure Map: measurement parameters incomplete") |
|
|
|
df[COL_NAME_EXPOSURE_CHANNEL] = df[COL_NAME_PARAMETERS_CHANNEL] |
|
df[COL_NAME_EXPOSURE_TIME] = df[COL_NAME_PARAMETERS_TIME] |
|
return df |
|
|
|
|
|
def apply_exposure_map(data_frame, exposure_map=None): |
|
"""applies the parameters of a exposure map to the data frame |
|
|
|
exposure map: |
|
keys: must be the same as the exposure ids, |
|
values: objects with at least time and channel attributes |
|
|
|
if the exposure map is None, the values from the optionally parsed |
|
measurement parameters are used. |
|
|
|
will raise an ValueError, if the provided exposure map does not map to the |
|
exposure ids. |
|
""" |
|
|
|
if exposure_map is None: |
|
return _infer_exposure_from_parameters(data_frame) |
|
|
|
existing = set(data_frame[COL_NAME_EXPOSURE_ID].unique()) |
|
provided = set(exposure_map.keys()) |
|
if existing != provided: |
|
raise ValueError( |
|
f"Exposure Map differs from data frame: {provided} != {existing}" |
|
) |
|
|
|
data_frame[COL_NAME_EXPOSURE_CHANNEL] = numpy.nan |
|
data_frame[COL_NAME_EXPOSURE_TIME] = numpy.nan |
|
for exposure_id, exposure_info in exposure_map.items(): |
|
mask = data_frame[COL_NAME_EXPOSURE_ID] == exposure_id |
|
data_frame.loc[mask, COL_NAME_EXPOSURE_CHANNEL] = exposure_info.channel |
|
data_frame.loc[mask, COL_NAME_EXPOSURE_TIME] = exposure_info.time |
|
return data_frame |
|
|
|
|
|
def _check_overflow_limit(data_frame, column=COL_NAME_SPOT_MEAN, limit=0.5): |
|
""" add overflow info, based on column and limit """ |
|
data_frame[COL_NAME_SPOT_OVERFLOW] = data_frame[column] > limit |
|
return data_frame |
|
|
|
|
|
def reduce_overflow(data_frame, column=COL_NAME_SPOT_MEAN, limit=0.5): |
|
""" reduces the data set per channel, eliminating overflowing spots """ |
|
data_frame = _check_overflow_limit(data_frame, column, limit) |
|
|
|
split_frames = _split_data_frame(data_frame, COL_NAME_EXPOSURE_CHANNEL) |
|
|
|
return { |
|
channel_id: _reduce_overflow_in_channel(channel_frame) |
|
for channel_id, channel_frame in split_frames.items() |
|
} |
|
|
|
|
|
def _reduce_overflow_in_channel(channel_frame): |
|
""" does the heavy lifting for reduce_overflow """ |
|
|
|
split_frames = _split_data_frame(channel_frame, COL_NAME_EXPOSURE_TIME) |
|
|
|
if len(split_frames) == 1: |
|
# shortcut, if there is only one exposure in the channel |
|
return channel_frame |
|
|
|
exposure_times = sorted(split_frames.keys(), reverse=True) |
|
max_time, *rest_times = exposure_times |
|
|
|
multi_index = [COL_NAME_WELL_ROW, COL_NAME_WELL_COLUMN, COL_NAME_POS_ID] |
|
result_frame = split_frames[max_time].set_index(multi_index) |
|
|
|
for next_time in rest_times: |
|
mask = result_frame[COL_NAME_SPOT_OVERFLOW] == True # noqa: E712 |
|
next_frame = split_frames[next_time].set_index(multi_index) |
|
result_frame.loc[mask] = next_frame.loc[mask] |
|
|
|
return result_frame.reset_index() |
|
|
|
|
|
def _infer_normalization_map(split_data_frames): |
|
""" extract a time normalization map from split data frames """ |
|
return { |
|
key: frame[COL_NAME_EXPOSURE_TIME].max() |
|
for key, frame in split_data_frames.items() |
|
} |
|
|
|
|
|
def normalize_exposure_time(split_data_frames): |
|
"""add time normalized values to the split data frames |
|
|
|
The max exposure time per channel is used for normalization. |
|
""" |
|
normalization_map = _infer_normalization_map(split_data_frames) |
|
return { |
|
key: normalize_channel(frame, normalization_map[key]) |
|
for key, frame in split_data_frames.items() |
|
} |
|
|
|
|
|
def normalize_channel(channel_frame, normalized_time): |
|
""" add time normalized values to a channel data frames """ |
|
channel_frame[COL_NAME_NORMALIZED_EXPOSURE_TIME] = normalized_time |
|
|
|
for original_col, normalized_col in COLUMN_NORMALIZATION.items(): |
|
channel_frame[normalized_col] = ( |
|
channel_frame[original_col] / channel_frame[COL_NAME_EXPOSURE_TIME] |
|
) * channel_frame[COL_NAME_NORMALIZED_EXPOSURE_TIME] |
|
|
|
return channel_frame |
|
|
|
|
|
def normalize_measurement( |
|
data_frame, |
|
exposure_map=None, |
|
overflow_column=COL_NAME_SPOT_MEAN, |
|
overflow_limit=0.5, |
|
): |
|
"""augment normalize the measurement exposures |
|
|
|
exposure map: |
|
keys: must be the same as the exposure ids, |
|
values: objects with at least time and channel attributes |
|
if the exposure map is None, the values from the optionally parsed |
|
measurement parameters are used. |
|
|
|
The max exposure time per channel is used for normalization. |
|
""" |
|
|
|
exposure_data_frame = apply_exposure_map(data_frame, exposure_map) |
|
split_data_frames = reduce_overflow( |
|
exposure_data_frame, overflow_column, overflow_limit |
|
) |
|
return normalize_exposure_time(split_data_frames)
|
|
|