Browse Source

removed params info parsing

xmlparsing
Holger Frey 4 years ago
parent
commit
8b782f75ff
  1. 4
      sensospot_data/__init__.py
  2. 81
      sensospot_data/parser.py
  3. 173
      tests/test_parser.py
  4. 4
      tests/test_sensovation_data_parser.py

4
sensospot_data/__init__.py

@ -3,11 +3,11 @@
Parsing the numerical output from Sensovations Sensospot image analysis. Parsing the numerical output from Sensovations Sensospot image analysis.
""" """
__version__ = "0.0.1" __version__ = "0.1.0"
from .parser import ( # noqa: F401 from .parser import ( # noqa: F401
ExposureInfo, CACHE_FILE_NAME,
parse_file, parse_file,
parse_folder, parse_folder,
process_folder, process_folder,

81
sensospot_data/parser.py

@ -8,7 +8,6 @@ from pathlib import Path
from collections import namedtuple from collections import namedtuple
import pandas import pandas
from defusedxml import ElementTree
REGEX_WELL = re.compile( REGEX_WELL = re.compile(
r""" r"""
@ -28,7 +27,6 @@ COLUMNS_RENAME_MAP = {
CACHE_FILE_NAME = "raw_data.h5" CACHE_FILE_NAME = "raw_data.h5"
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"])
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
def _get_cache_table_name(): def _get_cache_table_name():
@ -79,8 +77,8 @@ def parse_file(data_file):
""" parses one data file and adds metadata to result """ """ parses one data file and adds metadata to result """
measurement_info = _extract_measurement_info(data_file) measurement_info = _extract_measurement_info(data_file)
data_frame = _parse_csv(data_file) data_frame = _parse_csv(data_file)
data_frame["Field.Row"] = measurement_info.row data_frame["Well.Row"] = measurement_info.row
data_frame["Field.Column"] = measurement_info.column data_frame["Well.Column"] = measurement_info.column
data_frame["Exposure.Id"] = measurement_info.exposure data_frame["Exposure.Id"] = measurement_info.exposure
return _cleanup_data_columns(data_frame) return _cleanup_data_columns(data_frame)
@ -106,8 +104,8 @@ def _list_csv_files(folder):
def _sanity_check(data_frame): def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """ """ checks some basic constrains of a combined data frame """
field_rows = len(data_frame["Field.Row"].unique()) field_rows = len(data_frame["Well.Row"].unique())
field_cols = len(data_frame["Field.Column"].unique()) field_cols = len(data_frame["Well.Column"].unique())
exposures = len(data_frame["Exposure.Id"].unique()) exposures = len(data_frame["Exposure.Id"].unique())
spot_positions = len(data_frame["Pos.Id"].unique()) spot_positions = len(data_frame["Pos.Id"].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions expected_rows = field_rows * field_cols * exposures * spot_positions
@ -120,73 +118,8 @@ def parse_folder(folder):
""" parses all csv files in a folder to one large dataframe """ """ parses all csv files in a folder to one large dataframe """
file_list = _list_csv_files(folder) file_list = _list_csv_files(folder)
data_frame = parse_multiple_files(file_list) data_frame = parse_multiple_files(file_list)
return data_frame data_frame["Well.Row"] = data_frame["Well.Row"].astype("category")
return _sanity_check(data_frame)
def _search_channel_info_file(folder):
""" searches for a exposure settings file in a folder """
folder_path = Path(folder)
params_folder = folder_path / "Parameters"
if not params_folder.is_dir():
return None
param_files = list(params_folder.glob("**/*.svexp"))
if len(param_files) == 1:
return param_files[0]
else:
return None
def _parse_channel_info(channel_file):
""" parses the cannel informations from a settings file """
file_path = Path(channel_file)
with file_path.open("r") as file_handle:
tree = ElementTree.parse(file_handle)
result = {}
for child in tree.find("Channels"):
# child.tag == "ChannelConfig1"
exposure = int(child.tag[-1])
channel_description = child.attrib["Description"]
# channel_description == "Cy3/Cy5 Green"
channel = channel_description.rsplit(" ", 1)[-1]
time = int(child.attrib["ExposureTimeMs"])
result[exposure] = ExposureInfo(channel.lower(), time)
return result
def _get_valid_exposure_map(folder, data_frame, exposure_map=None):
""" returns valid exposure information """
available_exposures = set(data_frame["Exposure.Id"].unique())
if exposure_map is None:
params_file = _search_channel_info_file(folder)
if params_file is not None:
exposure_map = _parse_channel_info(params_file)
if exposure_map is not None:
if available_exposures == set(exposure_map.keys()):
return exposure_map
return {c: ExposureInfo(None, None) for c in available_exposures}
def _augment_exposure_map(data_frame, exposure_map):
data_frame["Exposure.Channel"] = ""
data_frame["Exposure.Time"] = 0
for exposure_id, info in exposure_map.items():
channel, time = info
mask = data_frame["Exposure.Id"] == exposure_id
data_frame.loc[mask, "Exposure.Channel"] = channel
data_frame.loc[mask, "Exposure.Time"] = time
return data_frame
def _process_folder(folder, exposures=None):
""" parses all csv files in a folder, adds some checks and more data """
data_frame = parse_folder(folder)
exposures = _get_valid_exposure_map(folder, data_frame, exposures)
data_frame = _augment_exposure_map(data_frame, exposures)
data_frame["Field.Row"] = data_frame["Field.Row"].astype("category")
data_frame["Exposure.Channel"] = data_frame["Exposure.Channel"].astype(
"category"
)
return data_frame
def process_folder(folder, exposures=None, use_cache=True): def process_folder(folder, exposures=None, use_cache=True):
@ -198,7 +131,7 @@ def process_folder(folder, exposures=None, use_cache=True):
except (FileNotFoundError, KeyError): except (FileNotFoundError, KeyError):
# either file or table doesn't exist # either file or table doesn't exist
pass pass
data_frame = _process_folder(folder, exposures) data_frame = parse_folder(folder)
if use_cache: if use_cache:
try: try:
data_frame.to_hdf( data_frame.to_hdf(

173
tests/test_parser.py

@ -112,7 +112,7 @@ def test_guess_decimal_separator_rewinds_handle():
from sensospot_data.parser import _guess_decimal_separator from sensospot_data.parser import _guess_decimal_separator
from io import StringIO from io import StringIO
handle = StringIO(f"header\n{input}\n") handle = StringIO("\n".join(["header", "data_line"]))
_guess_decimal_separator(handle) _guess_decimal_separator(handle)
assert next(handle) == "header\n" assert next(handle) == "header\n"
@ -196,14 +196,14 @@ def test_parse_file(example_file):
"Pos.Nom.X", "Pos.Nom.X",
"Pos.Nom.Y", "Pos.Nom.Y",
"Spot.Diameter", "Spot.Diameter",
"Field.Row", "Well.Row",
"Field.Column", "Well.Column",
"Exposure.Id", "Exposure.Id",
} }
assert set(result.columns) == columns assert set(result.columns) == columns
assert result["Field.Row"][0] == "A" assert result["Well.Row"][0] == "A"
assert result["Field.Column"][0] == 1 assert result["Well.Column"][0] == 1
assert result["Exposure.Id"][0] == 1 assert result["Exposure.Id"][0] == 1
@ -264,8 +264,8 @@ def test_parse_folder(example_dir):
data_frame = parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS) data_frame = parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert len(data_frame) == 36 * 3 * 100 assert len(data_frame) == 36 * 3 * 100
assert len(data_frame["Field.Row"].unique()) == 3 assert len(data_frame["Well.Row"].unique()) == 3
assert len(data_frame["Field.Column"].unique()) == 12 assert len(data_frame["Well.Column"].unique()) == 12
assert len(data_frame["Exposure.Id"].unique()) == 3 assert len(data_frame["Exposure.Id"].unique()) == 3
assert len(data_frame["Pos.Id"].unique()) == 100 assert len(data_frame["Pos.Id"].unique()) == 100
@ -308,161 +308,6 @@ def test_sanity_check_raises_value_error(example_dir):
_sanity_check(data_frame) _sanity_check(data_frame)
def test_search_channel_info_file_ok(example_dir):
from sensospot_data.parser import _search_channel_info_file
result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert result.suffix == ".svexp"
def test_search_channel_info_file_no_parameters_folder(example_dir):
from sensospot_data.parser import _search_channel_info_file
result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WO_PARAMS)
assert result is None
def test_search_channel_info_file_no_parameters_file(tmpdir):
from sensospot_data.parser import _search_channel_info_file
params_dir = tmpdir / "Parameters"
params_dir.mkdir()
result = _search_channel_info_file(tmpdir)
assert result is None
def test_parse_channel_info(example_dir):
from sensospot_data.parser import (
_search_channel_info_file,
_parse_channel_info,
)
params = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS)
result = _parse_channel_info(params)
assert set(result.keys()) == {1, 2, 3}
assert result[1] == ("green", 100)
assert result[2] == ("red", 150)
assert result[3] == ("red", 15)
def test_get_valid_exposure_map_provided_ok(exposure_df):
from sensospot_data.parser import (
_get_valid_exposure_map,
ExposureInfo,
)
dummy_value = ExposureInfo(None, None)
exposure_map = {1: dummy_value, 2: dummy_value, 3: dummy_value}
result = _get_valid_exposure_map(
"/nonexistent", exposure_df, exposure_map=exposure_map
)
assert result == exposure_map
def test_get_valid_exposure_map_provided_not_ok(exposure_df):
from sensospot_data.parser import _get_valid_exposure_map
exposure_map = {1: None, 2: None}
result = _get_valid_exposure_map(
"/nonexistent", exposure_df, exposure_map=exposure_map
)
assert set(result.keys()) == {1, 2, 3}
assert all(v == (None, None) for v in result.values())
def test_get_valid_exposure_map_info_from_file_ok(example_dir, exposure_df):
from sensospot_data.parser import _get_valid_exposure_map
result = _get_valid_exposure_map(
example_dir / EXAMPLE_DIR_WITH_PARAMS, exposure_df, exposure_map=None
)
assert set(result.keys()) == {1, 2, 3}
assert result[1] == ("green", 100)
assert result[2] == ("red", 150)
assert result[3] == ("red", 15)
def test_get_valid_exposure_map_info_from_file_not_ok(
example_dir, exposure_df
):
from sensospot_data.parser import _get_valid_exposure_map
data_frame = exposure_df.drop(exposure_df.index[1])
result = _get_valid_exposure_map(
example_dir / EXAMPLE_DIR_WITH_PARAMS, data_frame, exposure_map=None
)
assert set(result.keys()) == {1, 3}
assert all(v == (None, None) for v in result.values())
def test_augment_exposure_map(exposure_df):
from sensospot_data.parser import (
_augment_exposure_map,
ExposureInfo,
)
exposure_map = {
1: ExposureInfo("red", 10),
2: ExposureInfo("green", 20),
3: ExposureInfo("blue", 50),
}
result = _augment_exposure_map(exposure_df, exposure_map)
assert result["Exposure.Id"][0] == 1
assert result["Exposure.Channel"][0] == "red"
assert result["Exposure.Time"][0] == 10
assert result["Exposure.Id"][1] == 2
assert result["Exposure.Channel"][1] == "green"
assert result["Exposure.Time"][1] == 20
assert result["Exposure.Id"][2] == 3
assert result["Exposure.Channel"][2] == "blue"
assert result["Exposure.Time"][2] == 50
def test_process_folder_with_exposure_map(example_dir):
from sensospot_data.parser import _process_folder
result = _process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert len(result) == 36 * 100 * 3
expected = [(1, "green", 100), (2, "red", 150), (3, "red", 15)]
for exposure_id, channel, time in expected:
mask = result["Exposure.Id"] == exposure_id
example_row = result.loc[mask].iloc[1]
assert example_row["Exposure.Channel"] == channel
assert example_row["Exposure.Time"] == time
def test_process_folder_without_exposure_map(example_dir):
from sensospot_data.parser import _process_folder
from pandas import isnull
result = _process_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
assert len(result) == 96 * 100 * 3
for exposure_id in range(1, 4):
mask = result["Exposure.Id"] == exposure_id
example_row = result.loc[mask].iloc[1]
print(type(example_row["Exposure.Channel"]))
assert isnull(example_row["Exposure.Channel"])
assert isnull(example_row["Exposure.Time"])
def test_process_folder_creates_cache(dir_for_caching): def test_process_folder_creates_cache(dir_for_caching):
from sensospot_data.parser import ( from sensospot_data.parser import (
process_folder, process_folder,
@ -503,7 +348,7 @@ def test_process_folder_read_cache_fails_silently(
result = process_folder(dir_for_caching) result = process_folder(dir_for_caching)
assert result["Field.Row"][0] == "A" assert result["Well.Row"][0] == "A"
def test_get_cache_table_name(): def test_get_cache_table_name():
@ -528,7 +373,7 @@ def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df):
result = process_folder(dir_for_caching, use_cache=False) result = process_folder(dir_for_caching, use_cache=False)
assert result["Field.Row"][0] == "A" assert result["Well.Row"][0] == "A"
def test_process_folder_writes_cache(dir_for_caching): def test_process_folder_writes_cache(dir_for_caching):

4
tests/test_sensovation_data_parser.py

@ -2,8 +2,8 @@
def test_import_api(): def test_import_api():
from sensospot_data import ExposureInfo # noqa: F401 from sensospot_data import CACHE_FILE_NAME # noqa: F401
from sensospot_data import parse_file # noqa: F401 from sensospot_data import parse_file # noqa: F401
from sensospot_data import parse_multiple_files # noqa: F401
from sensospot_data import parse_folder # noqa: F401 from sensospot_data import parse_folder # noqa: F401
from sensospot_data import parse_multiple_files # noqa: F401
from sensospot_data import process_folder # noqa: F401 from sensospot_data import process_folder # noqa: F401

Loading…
Cancel
Save