You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
151 lines
5.0 KiB
151 lines
5.0 KiB
""" Sensospot Data Parser |
|
|
|
Parsing the numerical output from Sensovations Sensospot image analysis. |
|
""" |
|
|
|
import re |
|
from pathlib import Path |
|
from collections import namedtuple |
|
|
|
import pandas |
|
|
|
from . import columns |
|
from .parameters import add_measurement_parameters |
|
|
|
REGEX_WELL = re.compile( |
|
r""" |
|
(?P<row>([A-Z]+)) # row name containing one or more letters |
|
(?P<column>(\d+)) # column, one or more decimals |
|
""", |
|
re.VERBOSE | re.IGNORECASE, |
|
) |
|
|
|
FileInfo = namedtuple("FileInfo", ["row", "column", "exposure"]) |
|
|
|
|
|
def _guess_decimal_separator(file_handle): |
|
"""guesses the decimal spearator of a opened data file""" |
|
file_handle.seek(0) |
|
headers = next(file_handle) # noqa: F841 |
|
data = next(file_handle) |
|
separator = "," if data.count(",") > data.count(".") else "." |
|
file_handle.seek(0) |
|
return separator |
|
|
|
|
|
def _parse_csv(data_file): |
|
"""parse a csv sensovation data file""" |
|
data_path = Path(data_file) |
|
with data_path.open("r") as handle: |
|
decimal_sep = _guess_decimal_separator(handle) |
|
return pandas.read_csv(handle, sep="\t", decimal=decimal_sep) |
|
|
|
|
|
def _extract_measurement_info(data_file): |
|
"""extract measurement meta data from a file name""" |
|
data_path = Path(data_file) |
|
*rest, well, exposure = data_path.stem.rsplit("_", 2) # noqa: F841 |
|
matched = REGEX_WELL.match(well) |
|
if matched is None: |
|
raise ValueError(f"not a valid well: '{well}'") |
|
row = matched["row"].upper() |
|
column = int(matched["column"]) |
|
exposure = int(exposure) |
|
return FileInfo(row, column, exposure) |
|
|
|
|
|
def _cleanup_data_columns(data_frame): |
|
"""renames some data columns for consistency and drops unused columns""" |
|
renamed = data_frame.rename(columns=columns.CSV_RENAME_MAP) |
|
surplus_columns = set(renamed.columns) - columns.PARSED_DATA_COLUMN_SET |
|
return renamed.drop(columns=surplus_columns) |
|
|
|
|
|
def parse_file(data_file): |
|
"""parses one data file and adds metadata to result |
|
|
|
will race a ValueError, if metadata could not be extracted |
|
""" |
|
data_path = Path(data_file).resolve() |
|
measurement_info = _extract_measurement_info(data_path) |
|
data_frame = _parse_csv(data_path) |
|
# normalized well name |
|
data_frame[ |
|
columns.WELL_NAME |
|
] = f"{measurement_info.row}{measurement_info.column:02d}" |
|
data_frame[columns.WELL_ROW] = measurement_info.row |
|
data_frame[columns.WELL_COLUMN] = measurement_info.column |
|
data_frame[columns.EXPOSURE_ID] = measurement_info.exposure |
|
data_frame[columns.ANALYSIS_NAME] = data_path.parent.name |
|
return _cleanup_data_columns(data_frame) |
|
|
|
|
|
def _silenced_parse_file(data_file): |
|
"""parses one data file and adds metadata |
|
|
|
returns data frame or None on ValueError |
|
""" |
|
try: |
|
return parse_file(data_file) |
|
except ValueError: |
|
return None |
|
|
|
|
|
def parse_multiple_files(file_list): |
|
"""parses a list of file paths to one combined dataframe""" |
|
if not file_list: |
|
raise ValueError("Empty file list provided") |
|
collection = (_silenced_parse_file(path) for path in file_list) |
|
filtered = (frame for frame in collection if frame is not None) |
|
data_frame = pandas.concat(filtered, ignore_index=True).reset_index() |
|
data_frame[columns.WELL_ROW] = data_frame[columns.WELL_ROW].astype( |
|
"category" |
|
) |
|
return data_frame |
|
|
|
|
|
def list_csv_files(folder): |
|
"""returns all csv files in a folder""" |
|
folder_path = Path(folder) |
|
files = (item for item in folder_path.iterdir() if item.is_file()) |
|
visible = (item for item in files if not item.stem.startswith(".")) |
|
return (item for item in visible if item.suffix.lower() == ".csv") |
|
|
|
|
|
def _sanity_check(data_frame): |
|
"""checks some basic constrains of a combined data frame""" |
|
field_rows = len(data_frame[columns.WELL_ROW].unique()) |
|
field_cols = len(data_frame[columns.WELL_COLUMN].unique()) |
|
exposures = len(data_frame[columns.EXPOSURE_ID].unique()) |
|
spot_positions = len(data_frame[columns.POS_ID].unique()) |
|
expected_rows = field_rows * field_cols * exposures * spot_positions |
|
if expected_rows != len(data_frame): |
|
raise ValueError( |
|
f"Measurements are missing: {expected_rows} != {len(data_frame)}" |
|
) |
|
# set the right data type for measurement columns |
|
for raw_column in columns.NUMERIC_COLUMNS: |
|
data_frame[raw_column] = pandas.to_numeric(data_frame[raw_column]) |
|
return data_frame |
|
|
|
|
|
def parse_folder(folder, quiet=False): |
|
"""parses all csv files in a folder to one large dataframe |
|
|
|
Will raise an ValueError, if no sensospot data could be found in |
|
the folder |
|
|
|
folder: path of folder containing data files |
|
quiet: skip sanity check, defaults to False |
|
returns: pandas dataframe with parsed data |
|
""" |
|
folder_path = Path(folder) |
|
file_list = list_csv_files(folder_path) |
|
try: |
|
data_frame = parse_multiple_files(file_list) |
|
except ValueError: |
|
raise ValueError(f"No sensospot data found in folder '{folder}'") |
|
data_frame = add_measurement_parameters(data_frame, folder_path) |
|
if quiet: |
|
return data_frame |
|
return _sanity_check(data_frame)
|
|
|