|
|
|
""" Sensospot Images
|
|
|
|
|
|
|
|
Creating nice spot images from scans
|
|
|
|
"""
|
|
|
|
|
|
|
|
__version__ = "0.0.1"
|
|
|
|
|
|
|
|
import sys
|
|
|
|
from pathlib import Path
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
import click
|
|
|
|
from sensospot_data import parse_file
|
|
|
|
from sensospot_data.parameters import _search_measurement_params_file
|
|
|
|
|
|
|
|
from .images import (
|
|
|
|
crop,
|
|
|
|
recalculate,
|
|
|
|
get_position,
|
|
|
|
annotate_image,
|
|
|
|
load_array_image,
|
|
|
|
)
|
|
|
|
from .parameters import get_spot_parameters, get_array_parameters
|
|
|
|
|
|
|
|
|
|
|
|
def calulate_pixel_size(data_frame, array_definition):
|
|
|
|
first = get_position(data_frame.iloc[0], actual=False)
|
|
|
|
last = get_position(data_frame.iloc[-1], actual=False)
|
|
|
|
|
|
|
|
x_dist_pixel = last.x - first.x
|
|
|
|
y_dist_pixel = last.y - first.y
|
|
|
|
|
|
|
|
ad = array_definition
|
|
|
|
x_dist_um = ad.dist_x * (ad.size_x - 1)
|
|
|
|
y_dist_um = ad.dist_y * (ad.size_y - 1)
|
|
|
|
|
|
|
|
if x_dist_um == 0:
|
|
|
|
# only one spot in x direction
|
|
|
|
return x_dist_um / x_dist_pixel
|
|
|
|
elif y_dist_um == 0:
|
|
|
|
# only one spot in x direction
|
|
|
|
return y_dist_um / y_dist_pixel
|
|
|
|
|
|
|
|
# more than one spot in each direction
|
|
|
|
x_pixel_size = x_dist_um / x_dist_pixel
|
|
|
|
y_pixel_size = y_dist_um / y_dist_pixel
|
|
|
|
|
|
|
|
return (x_pixel_size + y_pixel_size) / 2
|
|
|
|
|
|
|
|
|
|
|
|
def get_example_data_path(input_dir):
|
|
|
|
input_path = Path(input_dir)
|
|
|
|
tif_files = input_path.glob("*.tif")
|
|
|
|
example_tif = next(tif_files)
|
|
|
|
return example_tif.with_suffix(".csv")
|
|
|
|
|
|
|
|
|
|
|
|
def get_filename_prefix(input_dir):
|
|
|
|
file_path = get_example_data_path(input_dir)
|
|
|
|
example_name = file_path.stem
|
|
|
|
prefix, well, exposure = example_name.rsplit("_", 2)
|
|
|
|
return prefix
|
|
|
|
|
|
|
|
|
|
|
|
def retrieve_spot_parameters(input_dir, scale):
|
|
|
|
parameters_path = _search_measurement_params_file(input_dir)
|
|
|
|
if parameters_path is None:
|
|
|
|
sys.exit(f"Could not find parameter files in {input_dir}")
|
|
|
|
|
|
|
|
array_parameters = get_array_parameters(parameters_path)
|
|
|
|
spot_parameters = get_spot_parameters(parameters_path, array_parameters)
|
|
|
|
|
|
|
|
example_data_path = get_example_data_path(input_dir)
|
|
|
|
example_data = parse_file(example_data_path)
|
|
|
|
|
|
|
|
pixel_size = calulate_pixel_size(example_data, array_parameters)
|
|
|
|
|
|
|
|
return recalculate(spot_parameters, scale / pixel_size)
|
|
|
|
|
|
|
|
|
|
|
|
def search_image_files(input_dir, wells, exposures):
|
|
|
|
input_path = Path(input_dir)
|
|
|
|
prefix = get_filename_prefix(input_path)
|
|
|
|
tmp_pattern = f"{prefix}_*{wells}*_*{exposures}.tif"
|
|
|
|
pattern = tmp_pattern.replace("***", "*").replace("**", "*")
|
|
|
|
return input_path.glob(pattern)
|
|
|
|
|
|
|
|
|
|
|
|
def create_file_map(input_dir, wells, exposures):
|
|
|
|
file_map = {}
|
|
|
|
for tif_path in search_image_files(input_dir, wells, exposures):
|
|
|
|
rest, exposure = tif_path.stem.rsplit("_", 1)
|
|
|
|
csv_path = tif_path.parent / f"{rest}_1.csv"
|
|
|
|
if csv_path.is_file():
|
|
|
|
if csv_path not in file_map:
|
|
|
|
file_map[csv_path] = []
|
|
|
|
file_map[csv_path].append(tif_path)
|
|
|
|
return file_map
|
|
|
|
|
|
|
|
|
|
|
|
def process_image(image_file, spot_parameters, spot_data, scale):
|
|
|
|
img = load_array_image(image_file, scale=scale)
|
|
|
|
annotate_image(img, spot_parameters, spot_data, scale)
|
|
|
|
return img
|
|
|
|
|
|
|
|
|
|
|
|
def create_crops(
|
|
|
|
output_path, img, image_path, spot_parameters, array_data, scale
|
|
|
|
):
|
|
|
|
base_name = image_path.stem
|
|
|
|
for index, spot_data in array_data.iterrows():
|
|
|
|
cropped_img = crop(img, spot_parameters, spot_data, scale)
|
|
|
|
new_path = output_path / f"{base_name}_{index + 1:03}.tif"
|
|
|
|
cropped_img.save(new_path)
|
|
|
|
|
|
|
|
|
|
|
|
def process(
|
|
|
|
input_dir,
|
|
|
|
output_dir,
|
|
|
|
wells="*",
|
|
|
|
exposures="*",
|
|
|
|
scale=3,
|
|
|
|
add_single_spots=False,
|
|
|
|
):
|
|
|
|
spot_parameters = retrieve_spot_parameters(input_dir, scale)
|
|
|
|
file_map = create_file_map(input_dir, wells, exposures)
|
|
|
|
output_path = Path(output_dir)
|
|
|
|
if not output_path.is_dir():
|
|
|
|
sys.exit(f"Could not find output directory: {output_dir}")
|
|
|
|
for data_file, image_files in file_map.items():
|
|
|
|
array_data = parse_file(data_file)
|
|
|
|
print(data_file)
|
|
|
|
for image_path in image_files:
|
|
|
|
img = process_image(image_path, spot_parameters, array_data, scale)
|
|
|
|
img.save(output_path / image_path.name)
|
|
|
|
if add_single_spots:
|
|
|
|
create_crops(
|
|
|
|
output_path,
|
|
|
|
img,
|
|
|
|
image_path,
|
|
|
|
spot_parameters,
|
|
|
|
array_data,
|
|
|
|
scale,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@click.command()
|
|
|
|
@click.argument(
|
|
|
|
"source",
|
|
|
|
type=click.Path(
|
|
|
|
exists=True,
|
|
|
|
file_okay=False,
|
|
|
|
dir_okay=True,
|
|
|
|
readable=True,
|
|
|
|
writable=True,
|
|
|
|
),
|
|
|
|
)
|
|
|
|
@click.option(
|
|
|
|
"-o",
|
|
|
|
"--output",
|
|
|
|
default=None,
|
|
|
|
help="Output directory name, defaults to new folder in parent of source",
|
|
|
|
)
|
|
|
|
@click.option(
|
|
|
|
"-w",
|
|
|
|
"--wells",
|
|
|
|
default="*",
|
|
|
|
show_default=True,
|
|
|
|
help="restrict to this wells, * = all",
|
|
|
|
)
|
|
|
|
@click.option(
|
|
|
|
"-e",
|
|
|
|
"--Exposures",
|
|
|
|
default="*",
|
|
|
|
show_default=True,
|
|
|
|
help="restrict to this exposure ids, * = all",
|
|
|
|
)
|
|
|
|
@click.option(
|
|
|
|
"-s",
|
|
|
|
"--scale",
|
|
|
|
type=int,
|
|
|
|
default=3,
|
|
|
|
show_default=True,
|
|
|
|
help="scale-up of images",
|
|
|
|
)
|
|
|
|
@click.option(
|
|
|
|
"--spots",
|
|
|
|
default=False,
|
|
|
|
is_flag=True,
|
|
|
|
show_default=True,
|
|
|
|
help="include cropped images of spots",
|
|
|
|
)
|
|
|
|
def run(source, output=None, wells="*", exposures="*", scale=3, spots=False):
|
|
|
|
source = Path(source)
|
|
|
|
if output is None or not Path(output).is_dir():
|
|
|
|
parent = source.absolute().parent
|
|
|
|
now = datetime.now().strftime("%Y-%m-%d %H-%M-%S")
|
|
|
|
output = parent / now
|
|
|
|
output.mkdir(exist_ok=True)
|
|
|
|
|
|
|
|
process(source, output, wells, exposures, scale, spots)
|