Browse Source

tests passing after cleanup

xmlparsing
Holger Frey 4 years ago
parent
commit
e641c341f8
  1. 5
      sensospot_data/__init__.py
  2. 2
      sensospot_data/columns.py
  3. 22
      sensospot_data/parameters.py
  4. 38
      sensospot_data/parser.py
  5. 8
      tests/test_parameters.py
  6. 85
      tests/test_parser.py
  7. 7
      tests/test_sensovation_data_parser.py

5
sensospot_data/__init__.py

@ -10,10 +10,7 @@ from pathlib import Path
import click import click
from .parser import ( # noqa: F401 from .parser import parse_file, parse_folder # noqa: F401
parse_file,
parse_folder,
)
from .parameters import ExposureInfo, get_measurement_params # noqa: F401 from .parameters import ExposureInfo, get_measurement_params # noqa: F401

2
sensospot_data/columns.py

@ -20,7 +20,7 @@ RAW_DATA_POS_NOM_Y = "Pos.Nom.Y"
RAW_DATA_POS_ID = "Pos.Id" RAW_DATA_POS_ID = "Pos.Id"
RAW_DATA_SPOT_FOUND = "Spot.Found" RAW_DATA_SPOT_FOUND = "Spot.Found"
RAW_DATA_SPOT_DIAMETER = "Spot.Diameter" RAW_DATA_SPOT_DIAMETER = "Spot.Diameter"
RAW_DATA_SPOT_SAT = "Spot.Sat[%]" RAW_DATA_SPOT_SAT = "Spot.Saturation"
RAW_DATA_COLUMNS_RENAME_MAP = { RAW_DATA_COLUMNS_RENAME_MAP = {

22
sensospot_data/parameters.py

@ -10,9 +10,9 @@ import numpy
from defusedxml import ElementTree from defusedxml import ElementTree
from .columns import ( from .columns import (
COL_NAME_EXPOSURE_ID, META_DATA_EXPOSURE_ID,
COL_NAME_PARAMETERS_TIME, META_DATA_PARAMETERS_TIME,
COL_NAME_PARAMETERS_CHANNEL, META_DATA_PARAMETERS_CHANNEL,
) )
ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"]) ExposureInfo = namedtuple("ExposureInfo", ["channel", "time"])
@ -59,22 +59,22 @@ def get_measurement_params(folder):
def _add_measurement_params(data_frame, params): def _add_measurement_params(data_frame, params):
""" adds measurement parameters to a data frame """ """ adds measurement parameters to a data frame """
for exposure_id, info in params.items(): for exposure_id, info in params.items():
mask = data_frame[COL_NAME_EXPOSURE_ID] == exposure_id mask = data_frame[META_DATA_EXPOSURE_ID] == exposure_id
data_frame.loc[mask, COL_NAME_PARAMETERS_CHANNEL] = info.channel data_frame.loc[mask, META_DATA_PARAMETERS_CHANNEL] = info.channel
data_frame.loc[mask, COL_NAME_PARAMETERS_TIME] = info.time data_frame.loc[mask, META_DATA_PARAMETERS_TIME] = info.time
data_frame[COL_NAME_PARAMETERS_CHANNEL] = data_frame[ data_frame[META_DATA_PARAMETERS_CHANNEL] = data_frame[
COL_NAME_PARAMETERS_CHANNEL META_DATA_PARAMETERS_CHANNEL
].astype("category") ].astype("category")
return data_frame return data_frame
def add_optional_measurement_parameters(data_frame, folder): def add_optional_measurement_parameters(data_frame, folder):
""" adds measurement params to the data frame, if they could be parsed """ """ adds measurement params to the data frame, if they could be parsed """
data_frame[COL_NAME_PARAMETERS_CHANNEL] = numpy.nan data_frame[META_DATA_PARAMETERS_CHANNEL] = numpy.nan
data_frame[COL_NAME_PARAMETERS_TIME] = numpy.nan data_frame[META_DATA_PARAMETERS_TIME] = numpy.nan
params = get_measurement_params(folder) params = get_measurement_params(folder)
if params: if params:
available_exposures = set(data_frame[COL_NAME_EXPOSURE_ID].unique()) available_exposures = set(data_frame[META_DATA_EXPOSURE_ID].unique())
if available_exposures == set(params.keys()): if available_exposures == set(params.keys()):
return _add_measurement_params(data_frame, params) return _add_measurement_params(data_frame, params)
return data_frame return data_frame

38
sensospot_data/parser.py

@ -10,14 +10,12 @@ from collections import namedtuple
import pandas import pandas
from .columns import ( from .columns import (
COL_NAME_POS_ID, RAW_DATA_POS_ID,
COL_NAME_WELL_ROW, META_DATA_WELL_ROW,
COL_NAME_SPOT_FOUND,
RAW_DATA_COLUMN_SET, RAW_DATA_COLUMN_SET,
COL_NAME_EXPOSURE_ID, META_DATA_EXPOSURE_ID,
COL_NAME_WELL_COLUMN, META_DATA_WELL_COLUMN,
COL_NAME_SPOT_DIAMETER, RAW_DATA_COLUMNS_RENAME_MAP,
COLUMNS_RENAME_MAP
) )
from .parameters import add_optional_measurement_parameters from .parameters import add_optional_measurement_parameters
@ -29,12 +27,6 @@ REGEX_WELL = re.compile(
re.VERBOSE | re.IGNORECASE, re.VERBOSE | re.IGNORECASE,
) )
COLUMNS_RENAME_MAP = {
" ID ": COL_NAME_POS_ID,
"Found": COL_NAME_SPOT_FOUND,
"Dia.": COL_NAME_SPOT_DIAMETER,
}
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"])
@ -71,7 +63,7 @@ def _extract_measurement_info(data_file):
def _cleanup_data_columns(data_frame): def _cleanup_data_columns(data_frame):
""" renames some data columns for consistency and drops unused columns """ """ renames some data columns for consistency and drops unused columns """
renamed = data_frame.rename(columns=COLUMNS_RENAME_MAP) renamed = data_frame.rename(columns=RAW_DATA_COLUMNS_RENAME_MAP)
surplus_columns = set(renamed.columns) - RAW_DATA_COLUMN_SET surplus_columns = set(renamed.columns) - RAW_DATA_COLUMN_SET
return renamed.drop(columns=surplus_columns) return renamed.drop(columns=surplus_columns)
@ -86,9 +78,9 @@ def parse_file(data_file, silent=False):
else: else:
raise e raise e
data_frame = _parse_csv(data_file) data_frame = _parse_csv(data_file)
data_frame[COL_NAME_WELL_ROW] = measurement_info.row data_frame[META_DATA_WELL_ROW] = measurement_info.row
data_frame[COL_NAME_WELL_COLUMN] = measurement_info.column data_frame[META_DATA_WELL_COLUMN] = measurement_info.column
data_frame[COL_NAME_EXPOSURE_ID] = measurement_info.exposure data_frame[META_DATA_EXPOSURE_ID] = measurement_info.exposure
return _cleanup_data_columns(data_frame) return _cleanup_data_columns(data_frame)
@ -101,7 +93,7 @@ def parse_multiple_files(file_list):
data_frame = next(filtered) data_frame = next(filtered)
for next_frame in filtered: for next_frame in filtered:
data_frame = data_frame.append(next_frame, ignore_index=True) data_frame = data_frame.append(next_frame, ignore_index=True)
data_frame[COL_NAME_WELL_ROW] = data_frame[COL_NAME_WELL_ROW].astype( data_frame[META_DATA_WELL_ROW] = data_frame[META_DATA_WELL_ROW].astype(
"category" "category"
) )
return data_frame return data_frame
@ -117,10 +109,10 @@ def list_csv_files(folder):
def _sanity_check(data_frame): def _sanity_check(data_frame):
""" checks some basic constrains of a combined data frame """ """ checks some basic constrains of a combined data frame """
field_rows = len(data_frame[COL_NAME_WELL_ROW].unique()) field_rows = len(data_frame[META_DATA_WELL_ROW].unique())
field_cols = len(data_frame[COL_NAME_WELL_COLUMN].unique()) field_cols = len(data_frame[META_DATA_WELL_COLUMN].unique())
exposures = len(data_frame[COL_NAME_EXPOSURE_ID].unique()) exposures = len(data_frame[META_DATA_EXPOSURE_ID].unique())
spot_positions = len(data_frame[COL_NAME_POS_ID].unique()) spot_positions = len(data_frame[RAW_DATA_POS_ID].unique())
expected_rows = field_rows * field_cols * exposures * spot_positions expected_rows = field_rows * field_cols * exposures * spot_positions
if expected_rows != len(data_frame): if expected_rows != len(data_frame):
raise ValueError("Measurements are missing") raise ValueError("Measurements are missing")
@ -129,7 +121,7 @@ def _sanity_check(data_frame):
def parse_folder(folder): def parse_folder(folder):
""" parses all csv files in a folder to one large dataframe """ """ parses all csv files in a folder to one large dataframe """
file = list_csv_files(Path(folder)) file_list = list_csv_files(Path(folder))
data_frame = parse_multiple_files(file_list) data_frame = parse_multiple_files(file_list)
data_frame = add_optional_measurement_parameters(data_frame, folder) data_frame = add_optional_measurement_parameters(data_frame, folder)
return _sanity_check(data_frame) return _sanity_check(data_frame)

8
tests/test_parameters.py

@ -50,9 +50,9 @@ def test_parse_channel_info(example_dir):
def test_get_measurement_params_file_found(example_dir): def test_get_measurement_params_file_found(example_dir):
from sensospot_data.parameters import _get_measurement_params from sensospot_data.parameters import get_measurement_params
result = _get_measurement_params(example_dir / EXAMPLE_DIR_WITH_PARAMS) result = get_measurement_params(example_dir / EXAMPLE_DIR_WITH_PARAMS)
assert set(result.keys()) == {1, 2, 3} assert set(result.keys()) == {1, 2, 3}
assert result[1] == ("green", 100) assert result[1] == ("green", 100)
@ -61,9 +61,9 @@ def test_get_measurement_params_file_found(example_dir):
def test_get_measurement_params_file_not_found(example_dir): def test_get_measurement_params_file_not_found(example_dir):
from sensospot_data.parameters import _get_measurement_params from sensospot_data.parameters import get_measurement_params
result = _get_measurement_params(example_dir / EXAMPLE_DIR_WO_PARAMS) result = get_measurement_params(example_dir / EXAMPLE_DIR_WO_PARAMS)
assert result is None assert result is None

85
tests/test_parser.py

@ -163,7 +163,7 @@ def test_parse_file(example_file):
"Spot.Sum", "Spot.Sum",
"Bkg.Area", "Bkg.Area",
"Spot.Area", "Spot.Area",
"Spot.Sat. (%)", "Spot.Saturation",
"Spot.Found", "Spot.Found",
"Pos.Nom.X", "Pos.Nom.X",
"Pos.Nom.Y", "Pos.Nom.Y",
@ -221,9 +221,9 @@ def testparse_multiple_files_empty_array(example_dir):
def test_list_csv_files(example_dir): def test_list_csv_files(example_dir):
from sensospot_data.parser import _list_csv_files from sensospot_data.parser import list_csv_files
result = list(_list_csv_files(example_dir / EXAMPLE_DIR_WITH_PARAMS)) result = list(list_csv_files(example_dir / EXAMPLE_DIR_WITH_PARAMS))
assert len(result) == 36 * 3 assert len(result) == 36 * 3
assert all(str(item).endswith(".csv") for item in result) assert all(str(item).endswith(".csv") for item in result)
@ -274,82 +274,3 @@ def test_sanity_check_raises_value_error(example_dir):
with pytest.raises(ValueError): with pytest.raises(ValueError):
_sanity_check(data_frame) _sanity_check(data_frame)
def test_get_cache_table_name():
from sensospot_data import VERSION_TABLE_NAME
from sensospot_data.parser import _get_cache_table_name
result = _get_cache_table_name()
assert result == VERSION_TABLE_NAME
def test_process_folder_creates_cache(dir_for_caching):
from sensospot_data.parser import CACHE_FILE_NAME, process_folder
cache_path = dir_for_caching / CACHE_FILE_NAME
assert not cache_path.is_file()
result = process_folder(dir_for_caching)
assert len(result) == 100
assert cache_path.is_file()
def test_process_folder_reads_from_cache(dir_for_caching, example_file):
from sensospot_data.parser import process_folder
process_folder(dir_for_caching)
csv_file = dir_for_caching / example_file.name
csv_file.unlink()
result = process_folder(dir_for_caching)
assert len(result) == 100
def test_process_folder_read_cache_fails_silently(
dir_for_caching, exposure_df
):
from sensospot_data.parser import CACHE_FILE_NAME, process_folder
cache_path = dir_for_caching / CACHE_FILE_NAME
exposure_df.to_hdf(cache_path, "unknown table")
result = process_folder(dir_for_caching)
assert result["Well.Row"][0] == "A"
def test_process_folder_read_cache_no_cache_arg(dir_for_caching, exposure_df):
from sensospot_data.parser import (
CACHE_FILE_NAME,
process_folder,
_get_cache_table_name,
)
cache_path = dir_for_caching / CACHE_FILE_NAME
exposure_df.to_hdf(cache_path, _get_cache_table_name())
result = process_folder(dir_for_caching, use_cache=False)
assert result["Well.Row"][0] == "A"
def test_process_folder_writes_cache(dir_for_caching):
from sensospot_data.parser import CACHE_FILE_NAME, process_folder
process_folder(dir_for_caching, use_cache=True)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert cache_path.is_file()
def test_process_folder_writes_cache_no_cache_arg(dir_for_caching):
from sensospot_data.parser import CACHE_FILE_NAME, process_folder
process_folder(dir_for_caching, use_cache=False)
cache_path = dir_for_caching / CACHE_FILE_NAME
assert not cache_path.is_file()

7
tests/test_sensovation_data_parser.py

@ -2,11 +2,8 @@
def test_import_api(): def test_import_api():
from sensospot_data import CACHE_FILE_NAME # noqa: F401
from sensospot_data import ExposureInfo # noqa: F401 from sensospot_data import ExposureInfo # noqa: F401
from sensospot_data import run # noqa: F401
from sensospot_data import parse_file # noqa: F401 from sensospot_data import parse_file # noqa: F401
from sensospot_data import parse_folder # noqa: F401 from sensospot_data import parse_folder # noqa: F401
from sensospot_data import process_folder # noqa: F401 from sensospot_data import get_measurement_params # noqa: F401
from sensospot_data import split_channels # noqa: F401
from sensospot_data import normalize_channel # noqa: F401
from sensospot_data import parse_multiple_files # noqa: F401

Loading…
Cancel
Save