|
|
|
import numpy
|
|
|
|
|
|
|
|
from .columns import (
|
|
|
|
RAW_DATA_POS_ID,
|
|
|
|
CALC_SPOT_OVERFLOW,
|
|
|
|
META_DATA_WELL_ROW,
|
|
|
|
RAW_DATA_SPOT_MEAN,
|
|
|
|
META_DATA_EXPOSURE_ID,
|
|
|
|
META_DATA_WELL_COLUMN,
|
|
|
|
SETTINGS_EXPOSURE_TIME,
|
|
|
|
META_DATA_PARAMETERS_TIME,
|
|
|
|
SETTINGS_EXPOSURE_CHANNEL,
|
|
|
|
RAW_DATA_NORMALIZATION_MAP,
|
|
|
|
META_DATA_PARAMETERS_CHANNEL,
|
|
|
|
SETTINGS_NORMALIZED_EXPOSURE_TIME,
|
|
|
|
)
|
|
|
|
|
|
|
|
PROBE_MULTI_INDEX = [
|
|
|
|
META_DATA_WELL_ROW,
|
|
|
|
META_DATA_WELL_COLUMN,
|
|
|
|
RAW_DATA_POS_ID,
|
|
|
|
]
|
|
|
|
|
|
|
|
from .utils import split_data_frame
|
|
|
|
|
|
|
|
def _infer_exposure_from_parameters(data_frame):
|
|
|
|
"""infer the exposures from measurement parameters
|
|
|
|
|
|
|
|
will raise a ValueError if the parameters contain NaNs
|
|
|
|
"""
|
|
|
|
df = data_frame # shorthand for cleaner code
|
|
|
|
|
|
|
|
if (
|
|
|
|
df[META_DATA_PARAMETERS_CHANNEL].hasnans
|
|
|
|
or df[META_DATA_PARAMETERS_TIME].hasnans
|
|
|
|
):
|
|
|
|
raise ValueError("Exposure Map: measurement parameters incomplete")
|
|
|
|
|
|
|
|
df[SETTINGS_EXPOSURE_CHANNEL] = df[META_DATA_PARAMETERS_CHANNEL]
|
|
|
|
df[SETTINGS_EXPOSURE_TIME] = df[META_DATA_PARAMETERS_TIME]
|
|
|
|
return df
|
|
|
|
|
|
|
|
|
|
|
|
def apply_exposure_map(data_frame, exposure_map=None):
|
|
|
|
"""applies the parameters of a exposure map to the data frame
|
|
|
|
|
|
|
|
exposure map:
|
|
|
|
keys: must be the same as the exposure ids,
|
|
|
|
values: objects with at least time and channel attributes
|
|
|
|
|
|
|
|
if the exposure map is None, the values from the optionally parsed
|
|
|
|
measurement parameters are used.
|
|
|
|
|
|
|
|
will raise an ValueError, if the provided exposure map does not map to the
|
|
|
|
exposure ids.
|
|
|
|
"""
|
|
|
|
|
|
|
|
if exposure_map is None:
|
|
|
|
return _infer_exposure_from_parameters(data_frame)
|
|
|
|
|
|
|
|
existing = set(data_frame[META_DATA_EXPOSURE_ID].unique())
|
|
|
|
provided = set(exposure_map.keys())
|
|
|
|
if existing != provided:
|
|
|
|
raise ValueError(
|
|
|
|
f"Exposure Map differs from data frame: {provided} != {existing}"
|
|
|
|
)
|
|
|
|
|
|
|
|
data_frame[SETTINGS_EXPOSURE_CHANNEL] = numpy.nan
|
|
|
|
data_frame[SETTINGS_EXPOSURE_TIME] = numpy.nan
|
|
|
|
for exposure_id, exposure_info in exposure_map.items():
|
|
|
|
mask = data_frame[META_DATA_EXPOSURE_ID] == exposure_id
|
|
|
|
data_frame.loc[mask, SETTINGS_EXPOSURE_CHANNEL] = exposure_info.channel
|
|
|
|
data_frame.loc[mask, SETTINGS_EXPOSURE_TIME] = exposure_info.time
|
|
|
|
return data_frame
|
|
|
|
|
|
|
|
|
|
|
|
def _check_overflow_limit(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
|
|
|
|
""" add overflow info, based on column and limit """
|
|
|
|
data_frame[CALC_SPOT_OVERFLOW] = data_frame[column] > limit
|
|
|
|
return data_frame
|
|
|
|
|
|
|
|
|
|
|
|
def reduce_overflow(data_frame, column=RAW_DATA_SPOT_MEAN, limit=0.5):
|
|
|
|
""" reduces the data set per channel, eliminating overflowing spots """
|
|
|
|
data_frame = _check_overflow_limit(data_frame, column, limit)
|
|
|
|
|
|
|
|
split_frames = split_data_frame(data_frame, SETTINGS_EXPOSURE_CHANNEL)
|
|
|
|
|
|
|
|
return {
|
|
|
|
channel_id: _reduce_overflow_in_channel(channel_frame)
|
|
|
|
for channel_id, channel_frame in split_frames.items()
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def _reduce_overflow_in_channel(channel_frame):
|
|
|
|
""" does the heavy lifting for reduce_overflow """
|
|
|
|
|
|
|
|
split_frames = split_data_frame(channel_frame, SETTINGS_EXPOSURE_TIME)
|
|
|
|
|
|
|
|
if len(split_frames) == 1:
|
|
|
|
# shortcut, if there is only one exposure in the channel
|
|
|
|
return channel_frame
|
|
|
|
|
|
|
|
exposure_times = sorted(split_frames.keys(), reverse=True)
|
|
|
|
max_time, *rest_times = exposure_times
|
|
|
|
|
|
|
|
result_frame = split_frames[max_time].set_index(PROBE_MULTI_INDEX)
|
|
|
|
|
|
|
|
for next_time in rest_times:
|
|
|
|
mask = result_frame[CALC_SPOT_OVERFLOW] == True # noqa: E712
|
|
|
|
next_frame = split_frames[next_time].set_index(PROBE_MULTI_INDEX)
|
|
|
|
result_frame.loc[mask] = next_frame.loc[mask]
|
|
|
|
|
|
|
|
return result_frame.reset_index()
|
|
|
|
|
|
|
|
|
|
|
|
def _infer_normalization_map(split_data_frames):
|
|
|
|
""" extract a time normalization map from split data frames """
|
|
|
|
return {
|
|
|
|
key: frame[SETTINGS_EXPOSURE_TIME].max()
|
|
|
|
for key, frame in split_data_frames.items()
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_exposure_time(split_data_frames):
|
|
|
|
"""add time normalized values to the split data frames
|
|
|
|
|
|
|
|
The max exposure time per channel is used for normalization.
|
|
|
|
"""
|
|
|
|
normalization_map = _infer_normalization_map(split_data_frames)
|
|
|
|
return {
|
|
|
|
key: normalize_channel(frame, normalization_map[key])
|
|
|
|
for key, frame in split_data_frames.items()
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def normalize_channel(channel_frame, normalized_time):
|
|
|
|
""" add time normalized values to a channel data frames """
|
|
|
|
channel_frame = channel_frame.copy()
|
|
|
|
channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME] = normalized_time
|
|
|
|
|
|
|
|
for original_col, normalized_col in RAW_DATA_NORMALIZATION_MAP.items():
|
|
|
|
channel_frame[normalized_col] = (
|
|
|
|
channel_frame[original_col] / channel_frame[SETTINGS_EXPOSURE_TIME]
|
|
|
|
) * channel_frame[SETTINGS_NORMALIZED_EXPOSURE_TIME]
|
|
|
|
|
|
|
|
return channel_frame
|
|
|
|
|
|
|
|
|
|
|
|
def split_channels(
|
|
|
|
data_frame,
|
|
|
|
exposure_map=None,
|
|
|
|
overflow_column=RAW_DATA_SPOT_MEAN,
|
|
|
|
overflow_limit=0.5,
|
|
|
|
):
|
|
|
|
"""augment normalize the measurement exposures
|
|
|
|
|
|
|
|
exposure map:
|
|
|
|
keys: must be the same as the exposure ids,
|
|
|
|
values: objects with at least time and channel attributes
|
|
|
|
if the exposure map is None, the values from the optionally parsed
|
|
|
|
measurement parameters are used.
|
|
|
|
|
|
|
|
The max exposure time per channel is used for normalization.
|
|
|
|
"""
|
|
|
|
|
|
|
|
exposure_data_frame = apply_exposure_map(data_frame, exposure_map)
|
|
|
|
split_data_frames = reduce_overflow(
|
|
|
|
exposure_data_frame, overflow_column, overflow_limit
|
|
|
|
)
|
|
|
|
return normalize_exposure_time(split_data_frames)
|