Browse Source
The `sensospot_parser.parse_folder()` function now tries to parse the xml file first and will fall back to parsing csv files if an error occursxmlparsing
Holger Frey
2 years ago
6 changed files with 621 additions and 14 deletions
@ -0,0 +1,192 @@
@@ -0,0 +1,192 @@
|
||||
""" Sensospot Data Parser |
||||
|
||||
Parsing the csv result files from Sensovations Sensospot image analysis. |
||||
""" |
||||
|
||||
import pathlib |
||||
from typing import Union, Optional |
||||
from datetime import datetime |
||||
|
||||
import pandas |
||||
from defusedxml import ElementTree |
||||
|
||||
from . import columns, parameters |
||||
|
||||
PathLike = Union[str, pathlib.Path] |
||||
|
||||
RESULT_TAG_TYPES = { |
||||
"System.Int32": int, |
||||
"System.UInt32": int, |
||||
"System.Double": float, |
||||
"System.Boolean": lambda x: x.lower() == "true", |
||||
} |
||||
|
||||
DATETIME_XML_FORMAT = "%m/%d/%Y %I:%M:%S %p" |
||||
|
||||
|
||||
class ParserTarget: |
||||
"""Class to parse the event stream emitted by ElementTree.XMLParser |
||||
|
||||
The methods "start()", "data()", "end()" and "close()" are defined |
||||
according to the requirements of the ElementTree.XMLParser |
||||
""" |
||||
|
||||
def __init__(self): |
||||
"""initialization of the object instance""" |
||||
self.collected = [] |
||||
self._current = {} |
||||
self._data_func = None |
||||
|
||||
def start(self, tag: str, attributes: dict[str:str]) -> None: |
||||
"""start of an xml tag |
||||
|
||||
The sensovation software uses sometimes the attributes of a tag to |
||||
store relevant data and sometimes the data part of the xml tree. |
||||
|
||||
This methods extracts the data from the attributes or preparse the |
||||
parsing of the data section |
||||
|
||||
Args: |
||||
tag: the name of the tag |
||||
attributes: the attributes of the tag as a dict |
||||
""" |
||||
if tag == "ScanJobResult": |
||||
self._current[columns.ANALYSIS_NAME] = attributes["ID"] |
||||
elif tag == "AssayResult": |
||||
well = attributes["ID"] |
||||
self._current[columns.WELL_NAME] = attributes["ID"] |
||||
self._current[columns.WELL_ROW] = well[0] |
||||
self._current[columns.WELL_COLUMN] = int(well[1:]) |
||||
elif tag.startswith("ChannelConfig"): |
||||
self._current[columns.EXPOSURE_ID] = int(tag[13:]) |
||||
elif tag == "Spot": |
||||
self._current[columns.POS_ID] = int(attributes["ID"]) |
||||
elif tag == "Result": |
||||
self._result_attributes_parser(attributes) |
||||
elif tag == "Timestamp": |
||||
self._data_func = self._data_timestamp_parser |
||||
elif tag == "ImageFileName": |
||||
self._data_func = self._data_image_name_parser |
||||
|
||||
def _result_attributes_parser(self, data: dict[str:str]) -> None: |
||||
"""parses the attributes of the "Result" tag""" |
||||
label = data["Label"] |
||||
converter = RESULT_TAG_TYPES.get(data["Type"], str) |
||||
self._current[label] = converter(data["Value"]) |
||||
|
||||
def _data_timestamp_parser(self, data: str) -> None: |
||||
"""parses the data section of a "Timestamp" tag""" |
||||
timestamp = datetime.strptime(data.strip(), DATETIME_XML_FORMAT) |
||||
self._current[columns.ANALYSIS_DATETIME] = timestamp |
||||
|
||||
def _data_image_name_parser(self, data: str) -> None: |
||||
"""parses the data section of a "ImageFileName" tag""" |
||||
self._current[columns.ANALYSIS_IMAGE] = data.strip() |
||||
|
||||
def data(self, data: str) -> None: |
||||
"""parses the data section of the xml tree |
||||
|
||||
The data sections in the xml tree of the sensovation software are |
||||
not often used. |
||||
|
||||
The "start()" method sets a parser for the upcoming data section and |
||||
this parser is removed after it was called. |
||||
""" |
||||
if self._data_func: |
||||
self._data_func(data) |
||||
self._data_func = None |
||||
|
||||
def end(self, tag: str) -> None: |
||||
"""the end of a tag is reached |
||||
|
||||
If it is the end of a "Spot" tag, a copy of the current data is added |
||||
to the collected data property. |
||||
""" |
||||
if tag == "Spot": |
||||
spot_data = dict(self._current) |
||||
self.collected.append(spot_data) |
||||
|
||||
def closed(self) -> None: |
||||
"""the end of the xml file is reached""" |
||||
pass |
||||
|
||||
|
||||
def _find_result_xml_file(folder: PathLike) -> Optional[pathlib.Path]: |
||||
"""searches a results folder for the analysis xml file |
||||
|
||||
There may be multiple xml files in the folder, but only one xsl file with |
||||
the same (base) name as the xml file we are looking for. This is why we |
||||
first look for the xsl file and then derive the path from the xml file |
||||
from it. |
||||
|
||||
Args: |
||||
folder: path of folder containing data files |
||||
|
||||
Returns: |
||||
Path to xml assay result file or None if it could not be found |
||||
""" |
||||
source = pathlib.Path(folder) |
||||
files = (i for i in source.iterdir() if i.is_file()) |
||||
not_hidden = (f for f in files if not f.name.startswith(".")) |
||||
xsl_files = [f for f in not_hidden if f.suffix == ".xsl"] |
||||
if len(xsl_files) != 1: |
||||
# multiple xsl files in a folder |
||||
# this does not to be a "normal" results folder |
||||
return None |
||||
xsl_file = xsl_files[0] |
||||
xml_file = xsl_file.with_suffix(".xml") |
||||
return xml_file if xml_file.is_file() else None |
||||
|
||||
|
||||
def parse_xml_file(xml_file: PathLike) -> pandas.DataFrame: |
||||
"""parses an assay result xml file into a pandas data frame |
||||
|
||||
Will raise a ValueError on a non-parsable xml file. |
||||
|
||||
Args: |
||||
xml_file: path to the xml file |
||||
|
||||
Returns: |
||||
A pandas DataFrame with the parsed data |
||||
|
||||
Raises: |
||||
ValueError if the xml file could not be parsed |
||||
""" |
||||
xml_file = pathlib.Path(xml_file) |
||||
if not xml_file.is_file(): |
||||
raise ValueError("Xml file does not exist") |
||||
|
||||
target = ParserTarget() |
||||
parser = ElementTree.DefusedXMLParser(target=target) |
||||
|
||||
try: |
||||
parser.feed(xml_file.read_text()) |
||||
except (IndexError, KeyError, ValueError, TypeError) as e: |
||||
raise ValueError("Malformed data in xml file") from e |
||||
|
||||
data_frame = pandas.DataFrame(data=target.collected).reset_index() |
||||
if data_frame.empty: |
||||
raise ValueError("Could not parse assay results xml file") |
||||
|
||||
return columns._cleanup_data_columns(data_frame) |
||||
|
||||
|
||||
def parse_xml_folder(folder: PathLike) -> pandas.DataFrame: |
||||
"""parses the xml result file in a folder to one large dataframe |
||||
|
||||
Will raise an ValueError, if no sensospot data could be found in |
||||
the folder |
||||
|
||||
Args: |
||||
folder: path of folder containing data files |
||||
|
||||
Returns: |
||||
a pandas data frame with parsed data |
||||
""" |
||||
folder = pathlib.Path(folder) |
||||
xml_file = _find_result_xml_file(folder) |
||||
if xml_file is None: |
||||
raise ValueError("Could not find assay results xml file") |
||||
data_frame = parse_xml_file(xml_file) |
||||
data_frame = parameters.add_measurement_parameters(data_frame, folder) |
||||
return columns._cleanup_data_columns(data_frame) |
@ -1,8 +1,50 @@
@@ -1,8 +1,50 @@
|
||||
""" testing the __ini__ file """ |
||||
import pytest |
||||
|
||||
from .conftest import EXAMPLE_DIR_CSV_WO_PARAMS, EXAMPLE_DIR_XML_WO_PARAMS |
||||
|
||||
|
||||
def test_import_api(): |
||||
from sensospot_parser import main # noqa: F401 |
||||
from sensospot_parser import columns # noqa: F401 |
||||
from sensospot_parser import parse_csv_file # noqa: F401 |
||||
from sensospot_parser import parse_folder # noqa: F401 |
||||
from sensospot_parser import parse_csv_folder # noqa: F401 |
||||
from sensospot_parser import parse_xml_folder # noqa: F401 |
||||
|
||||
|
||||
def test_compare_xml_to_csv(example_dir): |
||||
import pandas |
||||
|
||||
from sensospot_parser import parse_csv_folder, parse_xml_folder |
||||
|
||||
folder = example_dir / EXAMPLE_DIR_XML_WO_PARAMS |
||||
|
||||
csv_df = parse_csv_folder(folder) |
||||
xml_df = parse_xml_folder(folder) |
||||
|
||||
assert isinstance(csv_df, pandas.DataFrame) |
||||
assert isinstance(xml_df, pandas.DataFrame) |
||||
|
||||
assert len(csv_df) == len(xml_df) |
||||
assert set(csv_df["Well.Name"]) == set(xml_df["Well.Name"]) |
||||
assert set(csv_df["Exposure.Id"]) == set(xml_df["Exposure.Id"]) |
||||
assert set(csv_df["Spot.Diameter"]) == set(xml_df["Spot.Diameter"]) |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
"folder, length, hasnans", |
||||
[ |
||||
(EXAMPLE_DIR_XML_WO_PARAMS, 6400, False), |
||||
(EXAMPLE_DIR_CSV_WO_PARAMS, 28800, True), |
||||
], |
||||
) |
||||
def test_parse_folder_switches_parser(example_dir, folder, length, hasnans): |
||||
import pandas |
||||
|
||||
from sensospot_parser import parse_folder |
||||
|
||||
result = parse_folder(example_dir / folder) |
||||
|
||||
assert isinstance(result, pandas.DataFrame) |
||||
assert len(result) == length |
||||
assert result["Analysis.Datetime"].hasnans == hasnans |
||||
|
@ -0,0 +1,341 @@
@@ -0,0 +1,341 @@
|
||||
from datetime import datetime |
||||
|
||||
import pytest |
||||
|
||||
from .conftest import EXAMPLE_DIR_XML_WO_PARAMS, EXAMPLE_DIR_XML_WITH_PARAMS |
||||
|
||||
|
||||
class DummyDataFunc: |
||||
def __init__(self, as_bool): |
||||
self.data = None |
||||
self.as_bool = as_bool |
||||
|
||||
def __call__(self, data): |
||||
self.data = data |
||||
|
||||
def __bool__(self): |
||||
return self.as_bool |
||||
|
||||
|
||||
def test_parser_target_init(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
|
||||
assert target.collected == [] |
||||
assert target._current == {} |
||||
assert target._data_func is None |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
"tag, attributes, expected", |
||||
[ |
||||
("UnknownTag", {"ID": "something"}, {}), |
||||
( |
||||
"ScanJobResult", |
||||
{"ID": "scan job 1"}, |
||||
{"Analysis.Name": "scan job 1"}, |
||||
), |
||||
( |
||||
"AssayResult", |
||||
{"ID": "C03"}, |
||||
{"Well.Name": "C03", "Well.Row": "C", "Well.Column": 3}, |
||||
), |
||||
("ChannelConfig1", {}, {"Exposure.Id": 1}), |
||||
("Spot", {"ID": "456"}, {"Pos.Id": 456}), |
||||
( |
||||
"Result", |
||||
{"Label": "a label", "Type": "Unknown", "Value": "a value"}, |
||||
{"a label": "a value"}, |
||||
), |
||||
], |
||||
) |
||||
@pytest.mark.parametrize("additionals", [{}, {"Ignored": "value"}]) |
||||
def test_parser_target_start_simple_attributes( |
||||
tag, attributes, additionals, expected |
||||
): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
attributes.update(additionals) |
||||
|
||||
target.start(tag, attributes) # stateful operation |
||||
|
||||
assert target._current == expected |
||||
assert target._data_func is None |
||||
|
||||
|
||||
def test_parser_target_start_timestamp(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
target.start("Timestamp", {}) |
||||
|
||||
assert target._data_func == target._data_timestamp_parser |
||||
|
||||
|
||||
def test_parser_target_start_image_file_name(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
target.start("ImageFileName", {}) |
||||
|
||||
assert target._data_func == target._data_image_name_parser |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
"data_type, value, expected", |
||||
[ |
||||
("unknown type", 1, "1"), |
||||
("System.Int32", "12", 12), |
||||
("System.UInt32", "23", 23), |
||||
("System.Double", "4.56", 4.56), |
||||
("System.Boolean", "true", True), |
||||
("System.Boolean", "True", True), |
||||
("System.Boolean", "Xrue", False), |
||||
], |
||||
) |
||||
def test_parser_target_result_attributes_parser(data_type, value, expected): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
data = {"Label": "some label", "Type": data_type, "Value": value} |
||||
|
||||
target._result_attributes_parser(data) # stateful operation |
||||
|
||||
assert target._current == {"some label": expected} |
||||
assert type(target._current["some label"]) == type(expected) |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
"value, expected", |
||||
[ |
||||
("3/7/2022 5:31:47 PM", datetime(2022, 3, 7, 17, 31, 47)), |
||||
("03/7/2022 5:31:47 PM", datetime(2022, 3, 7, 17, 31, 47)), |
||||
("3/07/2022 5:31:47 PM", datetime(2022, 3, 7, 17, 31, 47)), |
||||
("03/07/2022 5:31:47 PM", datetime(2022, 3, 7, 17, 31, 47)), |
||||
("3/7/2022 5:3:47 PM", datetime(2022, 3, 7, 17, 3, 47)), |
||||
("3/7/2022 5:31:4 PM", datetime(2022, 3, 7, 17, 31, 4)), |
||||
("3/7/2022 5:31:47 pm", datetime(2022, 3, 7, 17, 31, 47)), |
||||
("3/7/2022 5:31:47 AM", datetime(2022, 3, 7, 5, 31, 47)), |
||||
], |
||||
) |
||||
def test_parser_target_data_timestamp_parser(value, expected): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
|
||||
target._data_timestamp_parser(value) # stateful operation |
||||
|
||||
assert target._current == {"Analysis.Datetime": expected} |
||||
|
||||
|
||||
def test_parser_target_data_image_name_parser(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
|
||||
target._data_image_name_parser(" some file path ") # stateful operation |
||||
|
||||
assert target._current == {"Analysis.Image": "some file path"} |
||||
|
||||
|
||||
def test_parser_target_data_does_not_call_function(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
dummy = DummyDataFunc(as_bool=False) |
||||
target._data_func = dummy |
||||
|
||||
target.data("some data") # the NotImplementedError is not raised |
||||
|
||||
assert dummy.data is None |
||||
|
||||
|
||||
def test_parser_target_data_does_call_function(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
dummy = DummyDataFunc(as_bool=True) |
||||
target._data_func = dummy |
||||
|
||||
target.data("some data") # stateful operation |
||||
|
||||
assert dummy.data == "some data" |
||||
|
||||
|
||||
def test_parser_target_data_reacts_on_spot(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
target._current = {"some current": "data values"} |
||||
|
||||
target.end("Spot") # stateful operation |
||||
|
||||
assert target.collected == [{"some current": "data values"}] |
||||
assert target.collected[0] is not target._current |
||||
|
||||
|
||||
def test_parser_target_data_does_only_react_on_spot(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
target._current = {"some current": "data values"} |
||||
|
||||
target.end("NonSpotTag") # stateful operation |
||||
|
||||
assert target.collected == [] |
||||
|
||||
|
||||
def test_parser_target_closed(): |
||||
from sensospot_parser.xml_parser import ParserTarget |
||||
|
||||
target = ParserTarget() |
||||
|
||||
target.closed() # stateful operation, must be callable |
||||
|
||||
|
||||
def test_find_result_xml_file_ok(tmp_path): |
||||
from sensospot_parser.xml_parser import _find_result_xml_file |
||||
|
||||
xls_file = tmp_path / "result.xsl" |
||||
xls_file.touch() |
||||
xml_file = tmp_path / "result.xml" |
||||
xml_file.touch() |
||||
|
||||
print(list(tmp_path.iterdir())) |
||||
|
||||
result = _find_result_xml_file(tmp_path) |
||||
|
||||
assert result == xml_file |
||||
|
||||
|
||||
def test_find_result_xml_file_no_matching_xml_file(tmp_path): |
||||
from sensospot_parser.xml_parser import _find_result_xml_file |
||||
|
||||
xls_file = tmp_path / "result.xsl" |
||||
xls_file.touch() |
||||
xml_file = tmp_path / "other.xml" |
||||
xml_file.touch() |
||||
|
||||
result = _find_result_xml_file(tmp_path) |
||||
|
||||
assert result is None |
||||
|
||||
|
||||
def test_find_result_xml_file_no_xsl_file(tmp_path): |
||||
from sensospot_parser.xml_parser import _find_result_xml_file |
||||
|
||||
xml_file = tmp_path / "result.xml" |
||||
xml_file.touch() |
||||
|
||||
result = _find_result_xml_file(tmp_path) |
||||
|
||||
assert result is None |
||||
|
||||
|
||||
def test_find_result_xml_file_multiple_xsl_files(tmp_path): |
||||
from sensospot_parser.xml_parser import _find_result_xml_file |
||||
|
||||
xls_file = tmp_path / "result.xsl" |
||||
xls_file.touch() |
||||
surplus_file = tmp_path / "surplus.xsl" |
||||
surplus_file.touch() |
||||
xml_file = tmp_path / "result.xml" |
||||
xml_file.touch() |
||||
|
||||
result = _find_result_xml_file(tmp_path) |
||||
|
||||
assert result is None |
||||
|
||||
|
||||
def test_find_result_hidden_xsl_file(tmp_path): |
||||
from sensospot_parser.xml_parser import _find_result_xml_file |
||||
|
||||
xls_file = tmp_path / ".result.xsl" |
||||
xls_file.touch() |
||||
xml_file = tmp_path / ".result.xml" |
||||
xml_file.touch() |
||||
|
||||
print(list(tmp_path.iterdir())) |
||||
|
||||
result = _find_result_xml_file(tmp_path) |
||||
|
||||
assert result is None |
||||
|
||||
|
||||
def test_parse_xml_file_ok(example_dir): |
||||
import pandas |
||||
|
||||
from sensospot_parser.xml_parser import ( |
||||
parse_xml_file, |
||||
_find_result_xml_file, |
||||
) |
||||
|
||||
folder = example_dir / EXAMPLE_DIR_XML_WO_PARAMS |
||||
xml_file = _find_result_xml_file(folder) |
||||
|
||||
result = parse_xml_file(xml_file) |
||||
|
||||
assert isinstance(result, pandas.DataFrame) |
||||
assert len(result) == 4 * 4 * 4 * 100 |
||||
assert set(result["Well.Row"]) == set("ABCD") |
||||
assert set(result["Well.Column"]) == {1, 2, 3, 4} |
||||
assert set(result["Exposure.Id"]) == {1, 2, 3, 4} |
||||
assert min(result["Spot.Diameter"]) == 22 |
||||
assert max(result["Spot.Diameter"]) == 34 |
||||
assert "Parameters.Time" not in result |
||||
|
||||
|
||||
@pytest.mark.parametrize( |
||||
"file_name, message", |
||||
[ |
||||
("not_existing.xml", "Xml file does not exist"), |
||||
("incomplete.xml", "Could not parse assay results xml file"), |
||||
("malformed_data.xml", "Malformed data in xml file"), |
||||
], |
||||
) |
||||
def test_parse_xml_file_raies_error(file_name, message, example_dir): |
||||
from sensospot_parser.xml_parser import parse_xml_file |
||||
|
||||
xml_file = example_dir / file_name |
||||
|
||||
with pytest.raises(ValueError) as e: |
||||
parse_xml_file(xml_file) |
||||
assert message in str(e) |
||||
|
||||
|
||||
def test_parse_xml_folder_with_params(example_dir): |
||||
import pandas |
||||
|
||||
from sensospot_parser.xml_parser import parse_xml_folder |
||||
|
||||
folder = example_dir / EXAMPLE_DIR_XML_WITH_PARAMS |
||||
|
||||
result = parse_xml_folder(folder) |
||||
|
||||
assert isinstance(result, pandas.DataFrame) |
||||
assert len(result) == 4 * 4 * 4 * 100 |
||||
assert not result["Parameters.Time"].hasnans |
||||
|
||||
|
||||
def test_parse_xml_folder_without_params(example_dir): |
||||
import pandas |
||||
|
||||
from sensospot_parser.xml_parser import parse_xml_folder |
||||
|
||||
folder = example_dir / EXAMPLE_DIR_XML_WO_PARAMS |
||||
|
||||
result = parse_xml_folder(folder) |
||||
|
||||
assert isinstance(result, pandas.DataFrame) |
||||
assert len(result) == 4 * 4 * 4 * 100 |
||||
assert result["Parameters.Time"].hasnans |
||||
|
||||
|
||||
def test_parse_xml_folder_non_existing_xml_file(tmp_path): |
||||
from sensospot_parser.xml_parser import parse_xml_folder |
||||
|
||||
with pytest.raises(ValueError) as e: |
||||
parse_xml_folder(tmp_path) |
||||
assert "Could not find assay results xml file" in str(e) |
Loading…
Reference in new issue