|
|
|
""" Sensospot Data Parser
|
|
|
|
|
|
|
|
Parsing the numerical output from Sensovations Sensospot image analysis.
|
|
|
|
"""
|
|
|
|
|
|
|
|
from pathlib import Path
|
|
|
|
from collections import namedtuple
|
|
|
|
|
|
|
|
import numpy
|
|
|
|
import pandas
|
|
|
|
from defusedxml import ElementTree
|
|
|
|
|
|
|
|
from . import columns
|
|
|
|
|
|
|
|
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
|
|
|
|
|
|
|
|
|
|
|
|
def _search_measurement_params_file(folder):
|
|
|
|
"""searches for a exposure settings file in a folder"""
|
|
|
|
folder_path = Path(folder)
|
|
|
|
params_folder = folder_path / "Parameters"
|
|
|
|
if not params_folder.is_dir():
|
|
|
|
return None
|
|
|
|
param_files = list(params_folder.glob("**/*.svexp"))
|
|
|
|
if len(param_files) == 1:
|
|
|
|
return param_files[0]
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def _parse_measurement_params(params_file):
|
|
|
|
"""parses the cannel informations from a settings file"""
|
|
|
|
file_path = Path(params_file)
|
|
|
|
with file_path.open("r") as file_handle:
|
|
|
|
tree = ElementTree.parse(file_handle)
|
|
|
|
result = {}
|
|
|
|
for child in tree.find("Channels"):
|
|
|
|
# child.tag == "ChannelConfig1"
|
|
|
|
exposure = int(child.tag[-1])
|
|
|
|
channel_description = child.attrib["Description"]
|
|
|
|
# channel_description == "[Cy3|Cy5] Green"
|
|
|
|
channel = channel_description.rsplit(" ", 1)[-1]
|
|
|
|
time = float(child.attrib["ExposureTimeMs"])
|
|
|
|
result[exposure] = ExposureInfo(channel.lower(), time)
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
def get_measurement_params(folder):
|
|
|
|
"""returns measurement parameters"""
|
|
|
|
params_file = _search_measurement_params_file(folder)
|
|
|
|
if params_file is not None:
|
|
|
|
return _parse_measurement_params(params_file)
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
def _add_measurement_params(data_frame, params):
|
|
|
|
"""adds measurement parameters to a data frame"""
|
|
|
|
keys = [columns.PARAMETERS_CHANNEL, columns.PARAMETERS_TIME]
|
|
|
|
map = {k: dict(zip(keys, v)) for k, v in params.items()}
|
|
|
|
return _apply_map(data_frame, map, columns.EXPOSURE_ID)
|
|
|
|
|
|
|
|
|
|
|
|
def _apply_map(data_frame, map, index_col):
|
|
|
|
"""adds a nested dictionary to a data frame on a specific index column
|
|
|
|
|
|
|
|
map:
|
|
|
|
keys: must be the same as the values in the index column,
|
|
|
|
values: dictionary with new column names as keys and the values
|
|
|
|
|
|
|
|
example:
|
|
|
|
|
|
|
|
>>> df = DataFrame(data={"MyIndex": [10, 10, 20]})
|
|
|
|
>>> map = {
|
|
|
|
... 10: {"NewCol": "foo"},
|
|
|
|
... 20: {"NewCol": "Bar"},
|
|
|
|
... }
|
|
|
|
>>> apply_map(df, map, "MyIndex")
|
|
|
|
MyIndex NewCol
|
|
|
|
0 10 foo
|
|
|
|
1 10 foo
|
|
|
|
2 20 bar
|
|
|
|
|
|
|
|
"""
|
|
|
|
map_df = pandas.DataFrame.from_dict(map, orient="index")
|
|
|
|
return data_frame.merge(
|
|
|
|
map_df,
|
|
|
|
how="left",
|
|
|
|
left_on=index_col,
|
|
|
|
right_index=True,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def add_optional_measurement_parameters(data_frame, folder):
|
|
|
|
"""adds measurement params to the data frame, if they could be parsed"""
|
|
|
|
params = get_measurement_params(folder)
|
|
|
|
if params:
|
|
|
|
available_exposures = set(data_frame[columns.EXPOSURE_ID].unique())
|
|
|
|
if available_exposures == set(params.keys()):
|
|
|
|
return _add_measurement_params(data_frame, params)
|
|
|
|
else:
|
|
|
|
data_frame[columns.PARAMETERS_CHANNEL] = numpy.nan
|
|
|
|
data_frame[columns.PARAMETERS_TIME] = numpy.nan
|
|
|
|
return data_frame
|