Browse Source

moved main code into .parser submodule

this should lead to a cleaner structur when a cli module is added.
Also the public facing methods are now clearly defined.
xmlparsing
Holger Frey 5 years ago
parent
commit
ef00e80700
  1. 202
      sensovation_data_parser/__init__.py
  2. 211
      sensovation_data_parser/parser.py
  3. 552
      tests/test_parser.py
  4. 534
      tests/test_sensovation_data_parser.py

202
sensovation_data_parser/__init__.py

@ -6,200 +6,10 @@ Parsing the numerical output from Sensovation image analysis. @@ -6,200 +6,10 @@ Parsing the numerical output from Sensovation image analysis.
__version__ = "0.0.1"
import re
from pathlib import Path
from collections import namedtuple
import pandas
from defusedxml import ElementTree
REGEX_WELL = re.compile(
r"""
(?P<row>([A-Z]+)) # row name containing one or more letters
(?P<column>(\d+)) # column, one or more decimals
""",
re.VERBOSE | re.IGNORECASE,
from .parser import ( # noqa: F401
ExposureInfo,
parse_file,
parse_folder,
process_folder,
parse_multiple_files,
)
COLUMNS_TO_DROP = ["Rect.", "Contour"]
COLUMNS_RENAME_MAP = {
" ID ": "Pos.Id",
"Found": "Spot.Found",
"Dia.": "Spot.Diameter",
}
CACHE_FILE_NAME = "cached_data.h5"
CACHE_TABLE_NAME = f"raw_data_v{__version__}"
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"])
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
def _guess_decimal_separator(file_handle):
""" guesses the decimal spearator of a opened data file """
file_handle.seek(0)
headers = next(file_handle) # noqa: F841
data = next(file_handle)
separator = "," if data.count(",") > data.count(".") else "."
file_handle.seek(0)
return separator
def _parse_csv(data_file):
""" parse a csv sensovation data file """
data_path = Path(data_file)
with data_path.open("r") as handle:
decimal_sep = _guess_decimal_separator(handle)
return pandas.read_csv(handle, sep="\t", decimal=decimal_sep)
def _extract_measurement_info(data_file):
""" extract measurement meta data from a file name """
data_path = Path(data_file)
*rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841
matched = REGEX_WELL.match(well)
if matched is None:
raise ValueError(f"not a valid well: '{well}'")
row = matched["row"].upper()
column = int(matched["column"])
exposure = int(exposure)
return FileInfo(row, column, exposure)
def _cleanup_data_columns(data_frame):
""" renames some data columns for consistency and drops unused columns """
renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP)
return renamed.drop(columns=COLUMNS_TO_DROP)
def parse_file(data_file):
""" parses one data file and adds metadata to result """
measurement_info = _extract_measurement_info(data_file)
data_frame = _parse_csv(data_file)
data_frame["Field.Row"] = measurement_info.row
data_frame["Field.Column"] = measurement_info.column
data_frame["Exposure.Id"] = measurement_info.exposure
return _cleanup_data_columns(data_frame)
def parse_multiple_files(file_list):
""" parses a list of file paths to one combined dataframe """
if not file_list:
raise ValueError("Empty file list provided")
collection = (parse_file(path) for path in file_list)
data_frame = next(collection)
for next_frame in collection:
data_frame = data_frame.append(next_frame, ignore_index=True)
return data_frame
def _list_csv_files(folder):
""" returns all csv files in a folder """
folder_path = Path(folder)
files = (item for item in folder_path.iterdir() if item.is_file())
visible = (item for item in files if not item.stem.startswith("."))
return (item for item in visible if item.suffix.lower() == ".csv")
def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """
field_rows = len(data_frame["Field.Row"].unique())
field_cols = len(data_frame["Field.Column"].unique())
exposures = len(data_frame["Exposure.Id"].unique())
spot_positions = len(data_frame["Pos.Id"].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions
if expected_rows != len(data_frame):
raise ValueError("Measurements are missing")
return data_frame
def parse_folder(folder):
""" parses all csv files in a folder to one large dataframe """
file_list = _list_csv_files(folder)
data_frame = parse_multiple_files(file_list)
return data_frame
def _search_channel_info_file(folder):
""" searches for a exposure settings file in a folder """
folder_path = Path(folder)
params_folder = folder_path / "Parameters"
if not params_folder.is_dir():
return None
param_files = list(params_folder.glob("**/*.svexp"))
if len(param_files) == 1:
return param_files[0]
else:
return None
def _parse_channel_info(channel_file):
""" parses the cannel informations from a settings file """
file_path = Path(channel_file)
with file_path.open("r") as file_handle:
tree = ElementTree.parse(file_handle)
result = {}
for child in tree.find("Channels"):
# child.tag == "ChannelConfig1"
exposure = int(child.tag[-1])
channel_description = child.attrib["Description"]
# channel_description == "Cy3/Cy5 Green"
channel = channel_description.rsplit(" ", 1)[-1]
time = int(child.attrib["ExposureTimeMs"])
result[exposure] = ExposureInfo(channel.lower(), time)
return result
def _get_valid_exposure_info(folder, data_frame, exposure_info=None):
""" returns valid exposure information """
available_exposures = set(data_frame["Exposure.Id"].unique())
if exposure_info is None:
params_file = _search_channel_info_file(folder)
if params_file is not None:
exposure_info = _parse_channel_info(params_file)
if exposure_info is not None:
if available_exposures == set(exposure_info.keys()):
return exposure_info
return {c: ExposureInfo(None, None) for c in available_exposures}
def _augment_exposure_info(data_frame, exposure_info):
data_frame["Exposure.Channel"] = ""
data_frame["Exposure.Time"] = 0
for exposure_id, info in exposure_info.items():
mask = data_frame["Exposure.Id"] == exposure_id
data_frame.loc[mask, "Exposure.Channel"] = info.channel
data_frame.loc[mask, "Exposure.Time"] = info.time
return data_frame
def _process_folder(folder, exposures=None):
""" parses all csv files in a folder, adds some checks and more data """
data_frame = parse_folder(folder)
exposures = _get_valid_exposure_info(folder, data_frame, exposures)
data_frame = _augment_exposure_info(data_frame, exposures)
data_frame["Field.Row"] = data_frame["Field.Row"].astype("category")
data_frame["Exposure.Channel"] = data_frame["Exposure.Channel"].astype(
"category"
)
return data_frame
def process_folder(folder, exposures=None, use_cache=True):
""" parses all csv files in a folder, adds some checks and more data """
hdf5_path = folder / CACHE_FILE_NAME
if use_cache:
try:
return pandas.read_hdf(hdf5_path, CACHE_TABLE_NAME)
except (FileNotFoundError, KeyError):
# either file or table doesn't exist
pass
data_frame = _process_folder(folder, exposures)
if use_cache:
try:
data_frame.to_hdf(hdf5_path, CACHE_TABLE_NAME, format="table")
except OSError:
# capturing high level OSError
# read only filesystems don't throw a more specific exception
pass
return data_frame

211
sensovation_data_parser/parser.py

@ -0,0 +1,211 @@ @@ -0,0 +1,211 @@
""" Sensovation Data Parser
Parsing the numerical output from Sensovation image analysis.
"""
import re
from pathlib import Path
from collections import namedtuple
import pandas
from defusedxml import ElementTree
REGEX_WELL = re.compile(
r"""
(?P<row>([A-Z]+)) # row name containing one or more letters
(?P<column>(\d+)) # column, one or more decimals
""",
re.VERBOSE | re.IGNORECASE,
)
COLUMNS_TO_DROP = ["Rect.", "Contour"]
COLUMNS_RENAME_MAP = {
" ID ": "Pos.Id",
"Found": "Spot.Found",
"Dia.": "Spot.Diameter",
}
CACHE_FILE_NAME = "raw_data.h5"
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"])
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
def _get_cache_table_name():
""" automatic hdf5 table name, avoids a circular import """
from . import __version__
return f"v{__version__}"
def _guess_decimal_separator(file_handle):
""" guesses the decimal spearator of a opened data file """
file_handle.seek(0)
headers = next(file_handle) # noqa: F841
data = next(file_handle)
separator = "," if data.count(",") > data.count(".") else "."
file_handle.seek(0)
return separator
def _parse_csv(data_file):
""" parse a csv sensovation data file """
data_path = Path(data_file)
with data_path.open("r") as handle:
decimal_sep = _guess_decimal_separator(handle)
return pandas.read_csv(handle, sep="\t", decimal=decimal_sep)
def _extract_measurement_info(data_file):
""" extract measurement meta data from a file name """
data_path = Path(data_file)
*rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841
matched = REGEX_WELL.match(well)
if matched is None:
raise ValueError(f"not a valid well: '{well}'")
row = matched["row"].upper()
column = int(matched["column"])
exposure = int(exposure)
return FileInfo(row, column, exposure)
def _cleanup_data_columns(data_frame):
""" renames some data columns for consistency and drops unused columns """
renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP)
return renamed.drop(columns=COLUMNS_TO_DROP)
def parse_file(data_file):
""" parses one data file and adds metadata to result """
measurement_info = _extract_measurement_info(data_file)
data_frame = _parse_csv(data_file)
data_frame["Field.Row"] = measurement_info.row
data_frame["Field.Column"] = measurement_info.column
data_frame["Exposure.Id"] = measurement_info.exposure
return _cleanup_data_columns(data_frame)
def parse_multiple_files(file_list):
""" parses a list of file paths to one combined dataframe """
if not file_list:
raise ValueError("Empty file list provided")
collection = (parse_file(path) for path in file_list)
data_frame = next(collection)
for next_frame in collection:
data_frame = data_frame.append(next_frame, ignore_index=True)
return data_frame
def _list_csv_files(folder):
""" returns all csv files in a folder """
folder_path = Path(folder)
files = (item for item in folder_path.iterdir() if item.is_file())
visible = (item for item in files if not item.stem.startswith("."))
return (item for item in visible if item.suffix.lower() == ".csv")
def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """
field_rows = len(data_frame["Field.Row"].unique())
field_cols = len(data_frame["Field.Column"].unique())
exposures = len(data_frame["Exposure.Id"].unique())
spot_positions = len(data_frame["Pos.Id"].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions
if expected_rows != len(data_frame):
raise ValueError("Measurements are missing")
return data_frame
def parse_folder(folder):
""" parses all csv files in a folder to one large dataframe """
file_list = _list_csv_files(folder)
data_frame = parse_multiple_files(file_list)
return data_frame
def _search_channel_info_file(folder):
""" searches for a exposure settings file in a folder """
folder_path = Path(folder)
params_folder = folder_path / "Parameters"
if not params_folder.is_dir():
return None
param_files = list(params_folder.glob("**/*.svexp"))
if len(param_files) == 1:
return param_files[0]
else:
return None
def _parse_channel_info(channel_file):
""" parses the cannel informations from a settings file """
file_path = Path(channel_file)
with file_path.open("r") as file_handle:
tree = ElementTree.parse(file_handle)
result = {}
for child in tree.find("Channels"):
# child.tag == "ChannelConfig1"
exposure = int(child.tag[-1])
channel_description = child.attrib["Description"]
# channel_description == "Cy3/Cy5 Green"
channel = channel_description.rsplit(" ", 1)[-1]
time = int(child.attrib["ExposureTimeMs"])
result[exposure] = ExposureInfo(channel.lower(), time)
return result
def _get_valid_exposure_map(folder, data_frame, exposure_map=None):
""" returns valid exposure information """
available_exposures = set(data_frame["Exposure.Id"].unique())
if exposure_map is None:
params_file = _search_channel_info_file(folder)
if params_file is not None:
exposure_map = _parse_channel_info(params_file)
if exposure_map is not None:
if available_exposures == set(exposure_map.keys()):
return exposure_map
return {c: ExposureInfo(None, None) for c in available_exposures}
def _augment_exposure_map(data_frame, exposure_map):
data_frame["Exposure.Channel"] = ""
data_frame["Exposure.Time"] = 0
for exposure_id, info in exposure_map.items():
channel, time = info
mask = data_frame["Exposure.Id"] == exposure_id
data_frame.loc[mask, "Exposure.Channel"] = channel
data_frame.loc[mask, "Exposure.Time"] = time
return data_frame
def _process_folder(folder, exposures=None):
""" parses all csv files in a folder, adds some checks and more data """
data_frame = parse_folder(folder)
exposures = _get_valid_exposure_map(folder, data_frame, exposures)
data_frame = _augment_exposure_map(data_frame, exposures)
data_frame["Field.Row"] = data_frame["Field.Row"].astype("category")
data_frame["Exposure.Channel"] = data_frame["Exposure.Channel"].astype(
"category"
)
return data_frame
def process_folder(folder, exposures=None, use_cache=True):
""" parses all csv files in a folder, adds some checks and more data """
hdf5_path = folder / CACHE_FILE_NAME
if use_cache:
try:
return pandas.read_hdf(hdf5_path, _get_cache_table_name())
except (FileNotFoundError, KeyError):
# either file or table doesn't exist
pass
data_frame = _process_folder(folder, exposures)
if use_cache:
try:
data_frame.to_hdf(
hdf5_path, _get_cache_table_name(), format="table"
)
except OSError:
# capturing high level OSError
# read only filesystems don't throw a more specific exception
pass
return data_frame

552
tests/test_parser.py

@ -0,0 +1,552 @@ @@ -0,0 +1,552 @@
""" Stub file for testing the project """
from pathlib import Path
import numpy
import pytest
EXAMPLE_DIR_WO_PARAMS = "mtp_wo_parameters"
EXAMPLE_DIR_WITH_PARAMS = "mtp_with_parameters"
@pytest.fixture
def example_dir(request):
root_dir = Path(request.config.rootdir)
yield root_dir / "example_data"
@pytest.fixture
def example_file(example_dir):
data_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
yield data_dir / "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv"
@pytest.fixture
def exposure_df():
from pandas import DataFrame
yield DataFrame(data={"Exposure.Id": [1, 2, 3]})
@pytest.fixture
def dir_for_caching(tmpdir, example_file):
import shutil
temp_path = Path(tmpdir)
dest = temp_path / example_file.name
shutil.copy(example_file, dest)
yield temp_path
@pytest.mark.parametrize(
"sub_dir, file_name",
[
(
EXAMPLE_DIR_WO_PARAMS,
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
),
(
EXAMPLE_DIR_WITH_PARAMS,
"160210_SG2-010-001_Regen_cy3100_1_A1_1.csv",
),
],
)
def test_parse_csv(example_dir, sub_dir, file_name):
from sensovation_data_parser.parser import _parse_csv
result = _parse_csv(example_dir / sub_dir / file_name)
columns = {
" ID ": numpy.int64,
"Pos.X": numpy.int64,
"Pos.Y": numpy.int64,
"Bkg.Mean": float,
"Spot.Mean": float,
"Bkg.Median": float,
"Spot.Median": float,
"Bkg.StdDev": float,
"Spot.StdDev": float,
"Bkg.Sum": numpy.int64,
"Spot.Sum": numpy.int64,
"Bkg.Area": numpy.int64,
"Spot.Area": numpy.int64,
"Spot.Sat. (%)": numpy.int64,
"Found": numpy.bool_,
"Pos.Nom.X": numpy.int64,
"Pos.Nom.Y": numpy.int64,
"Dia.": numpy.int64,
"Rect.": str,
"Contour": object, # ignore the type of contour
}
assert set(result.columns) == set(columns.keys())
assert len(result[" ID "].unique()) == 100
assert len(result) == 100
for column, value_type in columns.items():
assert isinstance(result[column][0], value_type)
def test_parse_csv_no_array(example_dir):
from sensovation_data_parser.parser import _parse_csv
result = _parse_csv(example_dir / "no_array_A1_1.csv")
assert len(result) == 1
assert result[" ID "][0] == 0
@pytest.mark.parametrize(
"input, expected", [("", "."), ("..,", "."), (".,,", ","), ("..,,", "."),]
)
def test_guess_decimal_separator_returns_correct_separator(input, expected):
from sensovation_data_parser.parser import _guess_decimal_separator
from io import StringIO
handle = StringIO(f"header\n{input}\n")
result = _guess_decimal_separator(handle)
assert result == expected
def test_guess_decimal_separator_rewinds_handle():
from sensovation_data_parser.parser import _guess_decimal_separator
from io import StringIO
handle = StringIO(f"header\n{input}\n")
_guess_decimal_separator(handle)
assert next(handle) == "header\n"
def test_well_regex_ok():
from sensovation_data_parser.parser import REGEX_WELL
result = REGEX_WELL.match("AbC123")
assert result["row"] == "AbC"
assert result["column"] == "123"
@pytest.mark.parametrize("input", ["", "A", "1", "1A", "-1", "A-"])
def test_well_regex_no_match(input):
from sensovation_data_parser.parser import REGEX_WELL
result = REGEX_WELL.match(input)
assert result is None
@pytest.mark.parametrize(
"filename, expected",
[("A1_1.csv", ("A", 1, 1)), ("test/measurement_1_H12_2", ("H", 12, 2)),],
)
def test_extract_measurement_info_ok(filename, expected):
from sensovation_data_parser.parser import _extract_measurement_info
result = _extract_measurement_info(filename)
assert result == expected
@pytest.mark.parametrize("filename", ["wrong_exposure_A1_B", "no_well_XX_1"])
def test_extract_measurement_info_raises_error(filename):
from sensovation_data_parser.parser import _extract_measurement_info
with pytest.raises(ValueError):
_extract_measurement_info(filename)
def test_cleanup_data_columns():
from sensovation_data_parser.parser import _cleanup_data_columns
from pandas import DataFrame
columns = ["Rect.", "Contour", " ID ", "Found", "Dia."]
data = {col: [i] for i, col in enumerate(columns)}
data_frame = DataFrame(data=data)
result = _cleanup_data_columns(data_frame)
assert set(result.columns) == {"Pos.Id", "Spot.Found", "Spot.Diameter"}
assert result["Pos.Id"][0] == 2
assert result["Spot.Found"][0] == 3
assert result["Spot.Diameter"][0] == 4
def test_parse_file(example_file):
from sensovation_data_parser.parser import parse_file
result = parse_file(example_file)
columns = {
"Pos.Id",
"Pos.X",
"Pos.Y",
"Bkg.Mean",
"Spot.Mean",
"Bkg.Median",
"Spot.Median",
"Bkg.StdDev",
"Spot.StdDev",
"Bkg.Sum",
"Spot.Sum",
"Bkg.Area",
"Spot.Area",
"Spot.Sat. (%)",
"Spot.Found",
"Pos.Nom.X",
"Pos.Nom.Y",
"Spot.Diameter",
"Field.Row",
"Field.Column",
"Exposure.Id",
}
assert set(result.columns) == columns
assert result["Field.Row"][0] == "A"
assert result["Field.Column"][0] == 1
assert result["Exposure.Id"][0] == 1
@pytest.mark.parametrize(
"file_list",
[
[
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv",
],
["160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv"],
],
)
def testparse_multiple_files_ok(example_dir, file_list):
from sensovation_data_parser.parser import parse_multiple_files
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
files = [sub_dir / file for file in file_list]
data_frame = parse_multiple_files(files)
print(data_frame["Exposure.Id"].unique())
assert len(data_frame) == 100 * len(files)
assert len(data_frame["Exposure.Id"].unique()) == len(files)
def testparse_multiple_files_empty_file_list():
from sensovation_data_parser.parser import parse_multiple_files
with pytest.raises(ValueError):
parse_multiple_files([])
def testparse_multiple_files_empty_array(example_dir):
from sensovation_data_parser.parser import parse_multiple_files
files = [example_dir / "no_array_A1_1.csv"]
data_frame = parse_multiple_files(files)
print(data_frame["Exposure.Id"].unique())
assert len(data_frame) == 1
def test_list_csv_files(example_dir):
from sensovation_data_parser.parser import _list_csv_files
result = list(_list_csv_files(example_dir / EXAMPLE_DIR_WITH_PARAMS))
assert len(result) == 36 * 3
assert all(str(item).endswith(".csv") for item in result)
assert all(not item.stem.startswith(".") for item in result)
def test_parse_folder(example_dir):
from sensovation_data_parser.parser import parse_folder
data_frame = parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert len(data_frame) == 36 * 3 * 100
assert len(data_frame["Field.Row"].unique()) == 3
assert len(data_frame["Field.Column"].unique()) == 12
assert len(data_frame["Exposure.Id"].unique()) == 3
assert len(data_frame["Pos.Id"].unique()) == 100
def test_sanity_check_ok(example_dir):
from sensovation_data_parser.parser import (
_sanity_check,
parse_multiple_files,
)
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
file_list = [
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv",
]
files = [sub_dir / file for file in file_list]
data_frame = parse_multiple_files(files)
result = _sanity_check(data_frame)
assert len(result) == len(data_frame)
def test_sanity_check_raises_value_error(example_dir):
from sensovation_data_parser.parser import (
_sanity_check,
parse_multiple_files,
)
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
file_list = [
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv",
]
files = [sub_dir / file for file in file_list]
data_frame = parse_multiple_files(files)
data_frame = data_frame.drop(data_frame.index[1])
with pytest.raises(ValueError):
_sanity_check(data_frame)
def test_search_channel_info_file_ok(example_dir):
from sensovation_data_parser.parser import _search_channel_info_file
result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert result.suffix == ".svexp"
def test_search_channel_info_file_no_parameters_folder(example_dir):
from sensovation_data_parser.parser import _search_channel_info_file
result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WO_PARAMS)
assert result is None
def test_search_channel_info_file_no_parameters_file(tmpdir):
from sensovation_data_parser.parser import _search_channel_info_file
params_dir = tmpdir / "Parameters"
params_dir.mkdir()
result = _search_channel_info_file(tmpdir)
assert result is None
def test_parse_channel_info(example_dir):
from sensovation_data_parser.parser import (
_search_channel_info_file,
_parse_channel_info,
)
params = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS)
result = _parse_channel_info(params)
assert set(result.keys()) == {1, 2, 3}
assert result[1] == ("green", 100)
assert result[2] == ("red", 150)
assert result[3] == ("red", 15)
def test_get_valid_exposure_map_provided_ok(exposure_df):
from sensovation_data_parser.parser import (
_get_valid_exposure_map,
ExposureInfo,
)
dummy_value = ExposureInfo(None, None)
exposure_map = {1: dummy_value, 2: dummy_value, 3: dummy_value}
result = _get_valid_exposure_map(
"/nonexistent", exposure_df, exposure_map=exposure_map
)
assert result == exposure_map
def test_get_valid_exposure_map_provided_not_ok(exposure_df):
from sensovation_data_parser.parser import _get_valid_exposure_map
exposure_map = {1: None, 2: None}
result = _get_valid_exposure_map(
"/nonexistent", exposure_df, exposure_map=exposure_map
)
assert set(result.keys()) == {1, 2, 3}
assert all(v == (None, None) for v in result.values())
def test_get_valid_exposure_map_info_from_file_ok(example_dir, exposure_df):
from sensovation_data_parser.parser import _get_valid_exposure_map
result = _get_valid_exposure_map(
example_dir / EXAMPLE_DIR_WITH_PARAMS, exposure_df, exposure_map=None
)
assert set(result.keys()) == {1, 2, 3}
assert result[1] == ("green", 100)
assert result[2] == ("red", 150)
assert result[3] == ("red", 15)
def test_get_valid_exposure_map_info_from_file_not_ok(
example_dir, exposure_df
):
from sensovation_data_parser.parser import _get_valid_exposure_map
data_frame = exposure_df.drop(exposure_df.index[1])
result = _get_valid_exposure_map(
example_dir / EXAMPLE_DIR_WITH_PARAMS, data_frame, exposure_map=None
)
assert set(result.keys()) == {1, 3}
assert all(v == (None, None) for v in result.values())
def test_augment_exposure_map(exposure_df):
from sensovation_data_parser.parser import (
_augment_exposure_map,
ExposureInfo,
)
exposure_map = {
1: ExposureInfo("red", 10),
2: ExposureInfo("green", 20),
3: ExposureInfo("blue", 50),
}
result = _augment_exposure_map(exposure_df, exposure_map)
assert result["Exposure.Id"][0] == 1
assert result["Exposure.Channel"][0] == "red"
assert result["Exposure.Time"][0] == 10
assert result["Exposure.Id"][1] == 2
assert result["Exposure.Channel"][1] == "green"
assert result["Exposure.Time"][1] == 20
assert result["Exposure.Id"][2] == 3
assert result["Exposure.Channel"][2] == "blue"
assert result["Exposure.Time"][2] == 50
def test_process_folder_with_exposure_map(example_dir):
from sensovation_data_parser.parser import _process_folder
result = _process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert len(result) == 36 * 100 * 3
expected = [(1, "green", 100), (2, "red", 150), (3, "red", 15)]
for exposure_id, channel, time in expected:
mask = result["Exposure.Id"] == exposure_id
example_row = result.loc[mask].iloc[1]
assert example_row["Exposure.Channel"] == channel
assert example_row["Exposure.Time"] == time
def test_process_folder_without_exposure_map(example_dir):
from sensovation_data_parser.parser import _process_folder
from pandas import isnull
result = _process_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
assert len(result) == 96 * 100 * 3
for exposure_id in range(1, 4):
mask = result["Exposure.Id"] == exposure_id
example_row = result.loc[mask].iloc[1]
print(type(example_row["Exposure.Channel"]))
assert isnull(example_row["Exposure.Channel"])
assert isnull(example_row["Exposure.Time"])
def test_process_folder_creates_cache(dir_for_caching):
from sensovation_data_parser.parser import (
process_folder,
CACHE_FILE_NAME,
)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert not cache_path.is_file()
result = process_folder(dir_for_caching)
assert len(result) == 100
assert cache_path.is_file()
def test_process_folder_reads_from_cache(dir_for_caching, example_file):
from sensovation_data_parser.parser import process_folder
process_folder(dir_for_caching)
csv_file = dir_for_caching / example_file.name
csv_file.unlink()
result = process_folder(dir_for_caching)
assert len(result) == 100
def test_process_folder_read_cache_fails_silently(
dir_for_caching, exposure_df
):
from sensovation_data_parser.parser import (
process_folder,
CACHE_FILE_NAME,
)
cache_path = dir_for_caching / CACHE_FILE_NAME
exposure_df.to_hdf(cache_path, "unknown table")
result = process_folder(dir_for_caching)
assert result["Field.Row"][0] == "A"
def test_get_cache_table_name():
from sensovation_data_parser.parser import _get_cache_table_name
from sensovation_data_parser import __version__
result = _get_cache_table_name()
assert result.startswith("v")
assert result[1:] == __version__
def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df):
from sensovation_data_parser.parser import (
process_folder,
_get_cache_table_name,
CACHE_FILE_NAME,
)
cache_path = dir_for_caching / CACHE_FILE_NAME
exposure_df.to_hdf(cache_path, _get_cache_table_name())
result = process_folder(dir_for_caching, use_cache=False)
assert result["Field.Row"][0] == "A"
def test_process_folder_writes_cache(dir_for_caching):
from sensovation_data_parser.parser import (
process_folder,
CACHE_FILE_NAME,
)
process_folder(dir_for_caching, use_cache=True)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert cache_path.is_file()
def test_process_folder_writes_cache_no_cache_arg(dir_for_caching):
from sensovation_data_parser.parser import process_folder, CACHE_FILE_NAME
process_folder(dir_for_caching, use_cache=False)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert not cache_path.is_file()

534
tests/test_sensovation_data_parser.py

@ -1,529 +1,9 @@ @@ -1,529 +1,9 @@
""" Stub file for testing the project """
""" testing the __ini__ file """
from pathlib import Path
import numpy
import pytest
EXAMPLE_DIR_WO_PARAMS = "mtp_wo_parameters"
EXAMPLE_DIR_WITH_PARAMS = "mtp_with_parameters"
@pytest.fixture
def example_dir(request):
root_dir = Path(request.config.rootdir)
yield root_dir / "example_data"
@pytest.fixture
def example_file(example_dir):
data_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
yield data_dir / "160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv"
@pytest.fixture
def exposure_df():
from pandas import DataFrame
yield DataFrame(data={"Exposure.Id": [1, 2, 3]})
@pytest.fixture
def dir_for_caching(tmpdir, example_file):
import shutil
temp_path = Path(tmpdir)
dest = temp_path / example_file.name
shutil.copy(example_file, dest)
yield temp_path
@pytest.mark.parametrize(
"sub_dir, file_name",
[
(
EXAMPLE_DIR_WO_PARAMS,
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
),
(
EXAMPLE_DIR_WITH_PARAMS,
"160210_SG2-010-001_Regen_cy3100_1_A1_1.csv",
),
],
)
def test_parse_csv(example_dir, sub_dir, file_name):
from sensovation_data_parser import _parse_csv
result = _parse_csv(example_dir / sub_dir / file_name)
columns = {
" ID ": numpy.int64,
"Pos.X": numpy.int64,
"Pos.Y": numpy.int64,
"Bkg.Mean": float,
"Spot.Mean": float,
"Bkg.Median": float,
"Spot.Median": float,
"Bkg.StdDev": float,
"Spot.StdDev": float,
"Bkg.Sum": numpy.int64,
"Spot.Sum": numpy.int64,
"Bkg.Area": numpy.int64,
"Spot.Area": numpy.int64,
"Spot.Sat. (%)": numpy.int64,
"Found": numpy.bool_,
"Pos.Nom.X": numpy.int64,
"Pos.Nom.Y": numpy.int64,
"Dia.": numpy.int64,
"Rect.": str,
"Contour": object, # ignore the type of contour
}
assert set(result.columns) == set(columns.keys())
assert len(result[" ID "].unique()) == 100
assert len(result) == 100
for column, value_type in columns.items():
assert isinstance(result[column][0], value_type)
def test_parse_csv_no_array(example_dir):
from sensovation_data_parser import _parse_csv
result = _parse_csv(example_dir / "no_array_A1_1.csv")
assert len(result) == 1
assert result[" ID "][0] == 0
@pytest.mark.parametrize(
"input, expected", [("", "."), ("..,", "."), (".,,", ","), ("..,,", "."),]
)
def test_guess_decimal_separator_returns_correct_separator(input, expected):
from sensovation_data_parser import _guess_decimal_separator
from io import StringIO
handle = StringIO(f"header\n{input}\n")
result = _guess_decimal_separator(handle)
assert result == expected
def test_guess_decimal_separator_rewinds_handle():
from sensovation_data_parser import _guess_decimal_separator
from io import StringIO
handle = StringIO(f"header\n{input}\n")
_guess_decimal_separator(handle)
assert next(handle) == "header\n"
def test_well_regex_ok():
from sensovation_data_parser import REGEX_WELL
result = REGEX_WELL.match("AbC123")
assert result["row"] == "AbC"
assert result["column"] == "123"
@pytest.mark.parametrize("input", ["", "A", "1", "1A", "-1", "A-"])
def test_well_regex_no_match(input):
from sensovation_data_parser import REGEX_WELL
result = REGEX_WELL.match(input)
assert result is None
@pytest.mark.parametrize(
"filename, expected",
[("A1_1.csv", ("A", 1, 1)), ("test/measurement_1_H12_2", ("H", 12, 2)),],
)
def test_extract_measurement_info_ok(filename, expected):
from sensovation_data_parser import _extract_measurement_info
result = _extract_measurement_info(filename)
assert result == expected
@pytest.mark.parametrize("filename", ["wrong_exposure_A1_B", "no_well_XX_1"])
def test_extract_measurement_info_raises_error(filename):
from sensovation_data_parser import _extract_measurement_info
with pytest.raises(ValueError):
_extract_measurement_info(filename)
def test_cleanup_data_columns():
from sensovation_data_parser import _cleanup_data_columns
from pandas import DataFrame
columns = ["Rect.", "Contour", " ID ", "Found", "Dia."]
data = {col: [i] for i, col in enumerate(columns)}
data_frame = DataFrame(data=data)
result = _cleanup_data_columns(data_frame)
assert set(result.columns) == {"Pos.Id", "Spot.Found", "Spot.Diameter"}
assert result["Pos.Id"][0] == 2
assert result["Spot.Found"][0] == 3
assert result["Spot.Diameter"][0] == 4
def test_parse_file(example_file):
from sensovation_data_parser import parse_file
result = parse_file(example_file)
columns = {
"Pos.Id",
"Pos.X",
"Pos.Y",
"Bkg.Mean",
"Spot.Mean",
"Bkg.Median",
"Spot.Median",
"Bkg.StdDev",
"Spot.StdDev",
"Bkg.Sum",
"Spot.Sum",
"Bkg.Area",
"Spot.Area",
"Spot.Sat. (%)",
"Spot.Found",
"Pos.Nom.X",
"Pos.Nom.Y",
"Spot.Diameter",
"Field.Row",
"Field.Column",
"Exposure.Id",
}
assert set(result.columns) == columns
assert result["Field.Row"][0] == "A"
assert result["Field.Column"][0] == 1
assert result["Exposure.Id"][0] == 1
@pytest.mark.parametrize(
"file_list",
[
[
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv",
],
["160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv"],
],
)
def testparse_multiple_files_ok(example_dir, file_list):
from sensovation_data_parser import parse_multiple_files
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
files = [sub_dir / file for file in file_list]
data_frame = parse_multiple_files(files)
print(data_frame["Exposure.Id"].unique())
assert len(data_frame) == 100 * len(files)
assert len(data_frame["Exposure.Id"].unique()) == len(files)
def testparse_multiple_files_empty_file_list():
from sensovation_data_parser import parse_multiple_files
with pytest.raises(ValueError):
parse_multiple_files([])
def testparse_multiple_files_empty_array(example_dir):
from sensovation_data_parser import parse_multiple_files
files = [example_dir / "no_array_A1_1.csv"]
data_frame = parse_multiple_files(files)
print(data_frame["Exposure.Id"].unique())
assert len(data_frame) == 1
def test_list_csv_files(example_dir):
from sensovation_data_parser import _list_csv_files
result = list(_list_csv_files(example_dir / EXAMPLE_DIR_WITH_PARAMS))
assert len(result) == 36 * 3
assert all(str(item).endswith(".csv") for item in result)
assert all(not item.stem.startswith(".") for item in result)
def test_parse_folder(example_dir):
from sensovation_data_parser import parse_folder
data_frame = parse_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert len(data_frame) == 36 * 3 * 100
assert len(data_frame["Field.Row"].unique()) == 3
assert len(data_frame["Field.Column"].unique()) == 12
assert len(data_frame["Exposure.Id"].unique()) == 3
assert len(data_frame["Pos.Id"].unique()) == 100
def test_sanity_check_ok(example_dir):
from sensovation_data_parser import _sanity_check, parse_multiple_files
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
file_list = [
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv",
]
files = [sub_dir / file for file in file_list]
data_frame = parse_multiple_files(files)
result = _sanity_check(data_frame)
assert len(result) == len(data_frame)
def test_sanity_check_raises_value_error(example_dir):
from sensovation_data_parser import _sanity_check, parse_multiple_files
sub_dir = example_dir / EXAMPLE_DIR_WO_PARAMS
file_list = [
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_1.csv",
"160218_SG2-013-001_Regen1_Cy3-100_1_A1_2.csv",
]
files = [sub_dir / file for file in file_list]
data_frame = parse_multiple_files(files)
data_frame = data_frame.drop(data_frame.index[1])
with pytest.raises(ValueError):
_sanity_check(data_frame)
def test_search_channel_info_file_ok(example_dir):
from sensovation_data_parser import _search_channel_info_file
result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert result.suffix == ".svexp"
def test_search_channel_info_file_no_parameters_folder(example_dir):
from sensovation_data_parser import _search_channel_info_file
result = _search_channel_info_file(example_dir / EXAMPLE_DIR_WO_PARAMS)
assert result is None
def test_search_channel_info_file_no_parameters_file(tmpdir):
from sensovation_data_parser import _search_channel_info_file
params_dir = tmpdir / "Parameters"
params_dir.mkdir()
result = _search_channel_info_file(tmpdir)
assert result is None
def test_parse_channel_info(example_dir):
from sensovation_data_parser import (
_search_channel_info_file,
_parse_channel_info,
)
params = _search_channel_info_file(example_dir / EXAMPLE_DIR_WITH_PARAMS)
result = _parse_channel_info(params)
assert set(result.keys()) == {1, 2, 3}
assert result[1] == ("green", 100)
assert result[2] == ("red", 150)
assert result[3] == ("red", 15)
def test_get_valid_exposure_info_provided_ok(exposure_df):
from sensovation_data_parser import _get_valid_exposure_info
exposure_info = {1: None, 2: None, 3: None}
result = _get_valid_exposure_info(
"/nonexistent", exposure_df, exposure_info=exposure_info
)
assert result == exposure_info
def test_get_valid_exposure_info_provided_not_ok(exposure_df):
from sensovation_data_parser import _get_valid_exposure_info
exposure_info = {1: None, 2: None}
result = _get_valid_exposure_info(
"/nonexistent", exposure_df, exposure_info=exposure_info
)
assert set(result.keys()) == {1, 2, 3}
assert all(v == (None, None) for v in result.values())
def test_get_valid_exposure_info_info_from_file_ok(example_dir, exposure_df):
from sensovation_data_parser import _get_valid_exposure_info
result = _get_valid_exposure_info(
example_dir / EXAMPLE_DIR_WITH_PARAMS, exposure_df, exposure_info=None
)
assert set(result.keys()) == {1, 2, 3}
assert result[1] == ("green", 100)
assert result[2] == ("red", 150)
assert result[3] == ("red", 15)
def test_get_valid_exposure_info_info_from_file_not_ok(
example_dir, exposure_df
):
from sensovation_data_parser import _get_valid_exposure_info
data_frame = exposure_df.drop(exposure_df.index[1])
result = _get_valid_exposure_info(
example_dir / EXAMPLE_DIR_WITH_PARAMS, data_frame, exposure_info=None
)
assert set(result.keys()) == {1, 3}
assert all(v == (None, None) for v in result.values())
def test_augment_exposure_info(exposure_df):
from sensovation_data_parser import _augment_exposure_info, ExposureInfo
exposure_info = {
1: ExposureInfo("red", 10),
2: ExposureInfo("green", 20),
3: ExposureInfo("blue", 50),
}
result = _augment_exposure_info(exposure_df, exposure_info)
assert result["Exposure.Id"][0] == 1
assert result["Exposure.Channel"][0] == "red"
assert result["Exposure.Time"][0] == 10
assert result["Exposure.Id"][1] == 2
assert result["Exposure.Channel"][1] == "green"
assert result["Exposure.Time"][1] == 20
assert result["Exposure.Id"][2] == 3
assert result["Exposure.Channel"][2] == "blue"
assert result["Exposure.Time"][2] == 50
def test_process_folder_with_exposure_info(example_dir):
from sensovation_data_parser import _process_folder
result = _process_folder(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert len(result) == 36 * 100 * 3
expected = [(1, "green", 100), (2, "red", 150), (3, "red", 15)]
for exposure_id, channel, time in expected:
mask = result["Exposure.Id"] == exposure_id
example_row = result.loc[mask].iloc[1]
assert example_row["Exposure.Channel"] == channel
assert example_row["Exposure.Time"] == time
def test_process_folder_without_exposure_info(example_dir):
from sensovation_data_parser import _process_folder
from pandas import isnull
result = _process_folder(example_dir / EXAMPLE_DIR_WO_PARAMS)
assert len(result) == 96 * 100 * 3
for exposure_id in range(1, 4):
mask = result["Exposure.Id"] == exposure_id
example_row = result.loc[mask].iloc[1]
print(type(example_row["Exposure.Channel"]))
assert isnull(example_row["Exposure.Channel"])
assert isnull(example_row["Exposure.Time"])
def test_process_folder_creates_cache(dir_for_caching):
from sensovation_data_parser import (
process_folder,
CACHE_FILE_NAME,
)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert not cache_path.is_file()
result = process_folder(dir_for_caching)
assert len(result) == 100
assert cache_path.is_file()
def test_process_folder_reads_from_cache(dir_for_caching, example_file):
from sensovation_data_parser import process_folder
process_folder(dir_for_caching)
csv_file = dir_for_caching / example_file.name
csv_file.unlink()
result = process_folder(dir_for_caching)
assert len(result) == 100
def test_process_folder_read_cache_fails_silently(
dir_for_caching, exposure_df
):
from sensovation_data_parser import (
process_folder,
CACHE_FILE_NAME,
)
cache_path = dir_for_caching / CACHE_FILE_NAME
exposure_df.to_hdf(cache_path, "unknown table")
result = process_folder(dir_for_caching)
assert result["Field.Row"][0] == "A"
def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df):
from sensovation_data_parser import (
process_folder,
CACHE_FILE_NAME,
CACHE_TABLE_NAME,
)
cache_path = dir_for_caching / CACHE_FILE_NAME
exposure_df.to_hdf(cache_path, CACHE_TABLE_NAME)
result = process_folder(dir_for_caching, use_cache=False)
assert result["Field.Row"][0] == "A"
def test_process_folder_writes_cache(dir_for_caching):
from sensovation_data_parser import (
process_folder,
CACHE_FILE_NAME,
)
process_folder(dir_for_caching, use_cache=True)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert cache_path.is_file()
def test_process_folder_writes_cache_no_cache_arg(dir_for_caching):
from sensovation_data_parser import process_folder, CACHE_FILE_NAME
process_folder(dir_for_caching, use_cache=False)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert not cache_path.is_file()
def test_import_api():
from sensovation_data_parser import ExposureInfo # noqa: F401
from sensovation_data_parser import parse_file # noqa: F401
from sensovation_data_parser import parse_multiple_files # noqa: F401
from sensovation_data_parser import parse_folder # noqa: F401
from sensovation_data_parser import process_folder # noqa: F401

Loading…
Cancel
Save