Browse Source

simplified adding parsed parameters

xmlparsing
Holger Frey 3 years ago
parent
commit
64cee96485
  1. 3
      sensospot_data/__init__.py
  2. 81
      sensospot_data/parameters.py
  3. 3
      sensospot_data/parser.py
  4. 102
      tests/test_parameters.py
  5. 3
      tests/test_sensospot_data.py

3
sensospot_data/__init__.py

@ -12,9 +12,8 @@ from pathlib import Path
import click import click
import pandas import pandas
from . import columns from . import columns # noqa: F401
from .parser import parse_file, parse_folder # noqa: F401 from .parser import parse_file, parse_folder # noqa: F401
from .parameters import ExposureInfo # noqa: F401
DEFAULT_OUTPUT_FILENAME = "collected_data.csv" DEFAULT_OUTPUT_FILENAME = "collected_data.csv"

81
sensospot_data/parameters.py

@ -12,7 +12,6 @@ from defusedxml import ElementTree
from . import columns from . import columns
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
def _search_measurement_params_file(folder): def _search_measurement_params_file(folder):
@ -28,21 +27,28 @@ def _search_measurement_params_file(folder):
return None return None
def _get_channel_data(channel_node):
# child.tag == "ChannelConfig1"
exposure_id = int(channel_node.tag[-1])
# channel_description == "[Cy3|Cy5] Green"
description = channel_node.attrib["Description"]
exposure_channel = description.rsplit(" ", 1)[-1]
# floats can be used for exposure times, not only ints
exposure_time = float(channel_node.attrib["ExposureTimeMs"])
return {
columns.EXPOSURE_ID: exposure_id,
columns.PARAMETERS_CHANNEL: exposure_channel.lower(),
columns.PARAMETERS_TIME: exposure_time,
}
def _parse_measurement_params(params_file): def _parse_measurement_params(params_file):
"""parses the cannel informations from a settings file""" """parses the cannel informations from a settings file"""
file_path = Path(params_file) file_path = Path(params_file)
with file_path.open("r") as file_handle: with file_path.open("r") as file_handle:
tree = ElementTree.parse(file_handle) tree = ElementTree.parse(file_handle)
result = {} data = [_get_channel_data(child) for child in tree.find("Channels")]
for child in tree.find("Channels"): return pandas.DataFrame(data)
# child.tag == "ChannelConfig1"
exposure = int(child.tag[-1])
channel_description = child.attrib["Description"]
# channel_description == "[Cy3|Cy5] Green"
channel = channel_description.rsplit(" ", 1)[-1]
time = float(child.attrib["ExposureTimeMs"])
result[exposure] = ExposureInfo(channel.lower(), time)
return result
def get_measurement_params(folder): def get_measurement_params(folder):
@ -53,51 +59,16 @@ def get_measurement_params(folder):
return None return None
def _add_measurement_params(data_frame, params):
"""adds measurement parameters to a data frame"""
keys = [columns.PARAMETERS_CHANNEL, columns.PARAMETERS_TIME]
map = {k: dict(zip(keys, v)) for k, v in params.items()}
return _apply_map(data_frame, map, columns.EXPOSURE_ID)
def _apply_map(data_frame, map, index_col):
"""adds a nested dictionary to a data frame on a specific index column
map:
keys: must be the same as the values in the index column,
values: dictionary with new column names as keys and the values
example:
>>> df = DataFrame(data={"MyIndex": [10, 10, 20]})
>>> map = {
... 10: {"NewCol": "foo"},
... 20: {"NewCol": "Bar"},
... }
>>> apply_map(df, map, "MyIndex")
MyIndex NewCol
0 10 foo
1 10 foo
2 20 bar
"""
map_df = pandas.DataFrame.from_dict(map, orient="index")
return data_frame.merge(
map_df,
how="left",
left_on=index_col,
right_index=True,
)
def add_optional_measurement_parameters(data_frame, folder): def add_optional_measurement_parameters(data_frame, folder):
"""adds measurement params to the data frame, if they could be parsed""" """adds measurement params to the data frame, if they could be parsed"""
params = get_measurement_params(folder) params = get_measurement_params(folder)
if params: if params is not None:
available_exposures = set(data_frame[columns.EXPOSURE_ID].unique()) params_exposures = params[columns.EXPOSURE_ID].unique()
if available_exposures == set(params.keys()): data_exposures = data_frame[columns.EXPOSURE_ID].unique()
return _add_measurement_params(data_frame, params) if set(data_exposures) == set(params_exposures):
else: return data_frame.merge(params, how="left", on=columns.EXPOSURE_ID)
data_frame[columns.PARAMETERS_CHANNEL] = numpy.nan
data_frame[columns.PARAMETERS_TIME] = numpy.nan # only executing if the parameters were not merged to the data frame
data_frame[columns.PARAMETERS_CHANNEL] = numpy.nan
data_frame[columns.PARAMETERS_TIME] = numpy.nan
return data_frame return data_frame

3
sensospot_data/parser.py

@ -9,9 +9,10 @@ from collections import namedtuple
import pandas import pandas
from . import columns from . import columns
from .parameters import add_optional_measurement_parameters from .parameters import add_optional_measurement_parameters
REGEX_WELL = re.compile( REGEX_WELL = re.compile(
r""" r"""
(?P<row>([A-Z]+)) # row name containing one or more letters (?P<row>([A-Z]+)) # row name containing one or more letters

102
tests/test_parameters.py

@ -1,3 +1,4 @@
import pandas
from .conftest import EXAMPLE_DIR_WO_PARAMS, EXAMPLE_DIR_WITH_PARAMS from .conftest import EXAMPLE_DIR_WO_PARAMS, EXAMPLE_DIR_WITH_PARAMS
@ -43,10 +44,13 @@ def test_parse_channel_info(example_dir):
) )
result = _parse_measurement_params(params) result = _parse_measurement_params(params)
assert set(result.keys()) == {1, 2, 3} expected = pandas.DataFrame({
assert result[1] == ("green", 100) "Exposure.Id": [1,2,3],
assert result[2] == ("red", 150) "Parameters.Channel": ["green", "red", "red"],
assert result[3] == ("red", 15) "Parameters.Time" : [100.0, 150.0, 15.0]
})
assert result.equals(expected)
def test_get_measurement_params_file_found(example_dir): def test_get_measurement_params_file_found(example_dir):
@ -54,10 +58,13 @@ def test_get_measurement_params_file_found(example_dir):
result = get_measurement_params(example_dir / EXAMPLE_DIR_WITH_PARAMS) result = get_measurement_params(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert set(result.keys()) == {1, 2, 3} expected = pandas.DataFrame({
assert result[1] == ("green", 100) "Exposure.Id": [1,2,3],
assert result[2] == ("red", 150) "Parameters.Channel": ["green", "red", "red"],
assert result[3] == ("red", 15) "Parameters.Time" : [100.0, 150.0, 15.0]
})
assert result.equals(expected)
def test_get_measurement_params_file_not_found(example_dir): def test_get_measurement_params_file_not_found(example_dir):
@ -68,28 +75,6 @@ def test_get_measurement_params_file_not_found(example_dir):
assert result is None assert result is None
def test_add_measurement_params(exposure_df):
from sensospot_data.parameters import ExposureInfo, _add_measurement_params
params = {
1: ExposureInfo("red", 10),
2: ExposureInfo("green", 20),
3: ExposureInfo("blue", 50),
}
result = _add_measurement_params(exposure_df, params)
assert result["Exposure.Id"][0] == 1
assert result["Parameters.Channel"][0] == "red"
assert result["Parameters.Time"][0] == 10
assert result["Exposure.Id"][1] == 2
assert result["Parameters.Channel"][1] == "green"
assert result["Parameters.Time"][1] == 20
assert result["Exposure.Id"][2] == 3
assert result["Parameters.Channel"][2] == "blue"
assert result["Parameters.Time"][2] == 50
def test_add_optional_measurement_parameters_with_params_file( def test_add_optional_measurement_parameters_with_params_file(
exposure_df, example_dir exposure_df, example_dir
): ):
@ -121,60 +106,3 @@ def test_add_optional_measurement_parameters_without_params_file(
assert one_exposure_data_frame["Parameters.Time"].hasnans assert one_exposure_data_frame["Parameters.Time"].hasnans
def test_apply_map(exposure_df):
from sensospot_data.parameters import _apply_map
map = {
1: {"SomeColumn": "A", "OtherColumn": 9},
2: {"SomeColumn": "B", "OtherColumn": 8},
3: {"SomeColumn": "C", "OtherColumn": 7},
}
result = _apply_map(exposure_df, map, "Exposure.Id")
for key, value in map.items():
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["SomeColumn"].unique()) == {value["SomeColumn"]}
assert set(partial["OtherColumn"].unique()) == {value["OtherColumn"]}
def test_apply_map_keys_not_in_df(exposure_df):
from sensospot_data.parameters import _apply_map
map = {
1: {"some_col": "A", "other_col": 9},
2: {"some_col": "B", "other_col": 8},
3: {"some_col": "C", "other_col": 7},
4: {"some_col": "D", "other_col": 6},
}
result = _apply_map(exposure_df, map, "Exposure.Id")
for key in (1, 2, 3):
value = map[key]
mask = result["Exposure.Id"] == key
partial = result.loc[mask]
assert set(partial["some_col"].unique()) == {value["some_col"]}
assert set(partial["other_col"].unique()) == {value["other_col"]}
assert "D" not in set(result["some_col"].unique())
assert "6" not in set(result["other_col"].unique())
def test_apply_map_not_all_keys_map_to_df(exposure_df):
from sensospot_data.parameters import _apply_map
map = {
1: {"some_col": "A", "other_col": 9},
3: {"some_col": "C", "other_col": 7},
}
result = _apply_map(exposure_df, map, "Exposure.Id")
assert not result.iloc[0].hasnans
assert result.iloc[1].hasnans
assert not result.iloc[2].hasnans
assert result["some_col"].hasnans
assert result["other_col"].hasnans

3
tests/test_sensospot_data.py

@ -2,8 +2,7 @@
def test_import_api(): def test_import_api():
from sensospot_data import ExposureInfo # noqa: F401
from sensospot_data import main # noqa: F401 from sensospot_data import main # noqa: F401
from sensospot_data import columns # noqa: F401
from sensospot_data import parse_file # noqa: F401 from sensospot_data import parse_file # noqa: F401
from sensospot_data import parse_folder # noqa: F401 from sensospot_data import parse_folder # noqa: F401
from sensospot_data import columns # noqa: F401

Loading…
Cancel
Save