"""
OMIO – Open Microscopy Image I/O and OME-TIFF conversion utilities.
OMIO, acronym for Open Microscopy Image I/O, is a lightweight, research-oriented
Python module that provides a unified interface for reading, normalizing, merging,
visualizing, and writing multi-dimensional microscopy image data in OME-compliant
formats. It is designed as a practical glue layer between heterogeneous microscopy
file formats and downstream analysis or visualization tools, with a strong emphasis
on reproducible axis semantics, metadata integrity, and memory-aware workflows.
Scope and design goals
----------------------
OMIO addresses common pain points in microscopy data handling:
* Reading heterogeneous microscopy formats (TIFF, OME-TIFF, LSM, CZI, Thorlabs RAW)
through a single entry point.
* Enforcing a strict, explicit OME axis convention (TZCYX) internally, without
silently repairing incompatible data.
* Normalizing and validating metadata so that physical pixel sizes, time
increments, and axis sizes remain consistent and explicit.
* Providing controlled merge operations along selected axes (T, Z, or C), with
well-defined policies for strict compatibility checks versus zero-padding.
* Supporting both NumPy-based in-memory workflows and Zarr-based, chunked,
memory-efficient workflows for large datasets.
* Enabling direct visualization in napari, including scale-aware display and
channel handling.
* Writing standards-compliant OME-TIFF output suitable for ImageJ, Fiji, napari,
and downstream quantitative pipelines.
OMIO deliberately does not aim to replace format-specific libraries. Instead, it
orchestrates them under a consistent policy layer that makes assumptions explicit
and reproducible.
Core functionality overview
---------------------------
The module is structured around a small set of high-level entry points, supported
by internal helper utilities:
* imread
Universal reader that accepts files, folders, or folder stacks and returns
NumPy or Zarr arrays together with validated OME-style metadata. Supports
optional merging across files or folder stacks along a user-defined axis.
* imwrite
OME-TIFF writer that enforces axis order, handles BigTIFF decisions, embeds
physical scale metadata, and preserves provenance via OME MapAnnotations.
* imconvert
End-to-end converter that combines imread and imwrite to transform
arbitrary supported input data into OME-TIFF with minimal boilerplate.
* bids_batch_convert
Batch-level converter operating on a BIDS-like directory hierarchy, supporting
subject and experiment discovery, optional tagfolder logic, and controlled
merging policies.
* open_in_napari
Convenience interface for opening OMIO-handled data directly in napari,
supporting NumPy, Zarr, and Zarr+Dask backends with correct spatial scaling.
Axis and metadata policy
------------------------
Internally, OMIO assumes a strict five-dimensional axis order:
T Z C Y X
All merge, validation, and write operations rely on this convention. Axes are not
implicitly inferred or repaired beyond explicit user requests. Metadata fields
such as PhysicalSizeX/Y/Z and TimeIncrement are treated as first-class quantities
and are validated and propagated consistently across merges and conversions.
Merging semantics are intentionally conservative: incompatible inputs trigger
warnings and abort the merge unless zero-padding is explicitly enabled.
Intended audience and use cases
-------------------------------
OMIO is intended for researchers working with multi-dimensional microscopy data
who need a transparent and scriptable way to:
* Convert legacy or vendor-specific formats into OME-TIFF.
* Assemble time series, z-stacks, or channel stacks from multiple acquisitions.
* Prepare large datasets for downstream analysis without exceeding memory limits.
* Maintain explicit provenance and metadata across preprocessing steps.
The module favors clarity and explicit policy over aggressive automation, and is
therefore best suited for controlled analysis pipelines rather than black-box
end-user tools.
Author and provenance
---------------------
Author: Fabrizio Musacchio
First version: December 21, 2025
This module is part of the OMIO project and is developed in the context of
scientific microscopy data processing workflows.
"""
# %% IMPORTS
import os, re
import hashlib
from importlib.metadata import version, PackageNotFoundError, packages_distributions
import glob
from tabnanny import verbose
import warnings
from typing import Any, Dict, List, Tuple, Union
import shutil
import xml.etree.ElementTree as ET
import numpy as np
import napari
import tifffile
import czifile as czi
import datetime
import zlib
import xml.etree.ElementTree as ET
import zarr
from tqdm import tqdm
import dask.array as da
import yaml
# %% MODULE-SCOPE GLOBALS
def _resolve_omio_version() -> str:
# primary: known PyPI distribution name
try:
return version("omio-microscopy")
except PackageNotFoundError:
pass
# fallback: map import package -> installed distribution(s)
try:
dist_names = packages_distributions().get("omio", [])
for dist in dist_names:
try:
return version(dist)
except PackageNotFoundError:
continue
except Exception:
pass
return "0.0.0+unknown"
_OMIO_VERSION = _resolve_omio_version()
_OME_AXES = "TZCYX" # this is the canonical OME axes order. DO NOT CHANGE!
_AXIS_TO_INDEX = {"T": 0, "Z": 1, "C": 2, "Y": 3, "X": 4} # DO NOT CHANGE!
_ALLOWED_MERGE_AXES = {"T", "Z", "C"}
# make current _OMIO_VERSION available as 'version' attribute outside the module:
version = _OMIO_VERSION
# %% HELPER FUNCTIONS FOR READERS
# a simple hello world function (sanity check for external imports):
[docs]
def hello_world():
"""
Print a simple sanity-check message including the current OMIO version.
This function is intended as a minimal diagnostic utility to verify that
the OMIO package can be imported correctly, that external dependencies are
resolved, and that the module-level version variable is accessible at
runtime. It has no return value and produces output only via standard
output.
Side effects
------------
Prints a message of the form:
"Hello from omio.py! OMIO version: <version>"
"""
print("Hello from omio.py! OMIO version:", _OMIO_VERSION)
# function for correcting the axes order to OME-conform:
def _reorder_numpy(arr, axes_string, OME_axes, OME_axes_order):
"""
Reorder a NumPy array into OME-compliant axis order (TZCYX).
This helper performs the minimal, strictly in-RAM axis-normalization step used
in the NumPy branch of `_correct_for_OME_axes_order`. It takes an input array
together with its declared axis string and returns a new NumPy array where:
* all OME axes (T, Z, C, Y, X) are present,
* any missing axes are appended as singleton dimensions in the order defined
by the global OME axes sequence,
* the array is then permuted into the canonical OME axis order TZCYX.
The function assumes that `OME_axes` and `OME_axes_order` are defined in the
surrounding module scope. It does not alter metadata; it operates purely on
the numerical array representation.
Parameters
----------
arr : np.ndarray
The image array whose axes are described by `axes_string`.
axes_string : str
Axis declaration for `arr`, using characters from {T, Z, C, Y, X}.
Its length must match `arr.ndim`. Axes missing from this declaration
will be created as singleton dimensions and appended at the end.
Returns
-------
np.ndarray
A NumPy array with all OME axes present and ordered as TZCYX.
Notes
-----
* This function is intended only for cases where the full array fits in RAM.
For Zarr-backed arrays or large images, use the streaming variant inside
`_correct_for_OME_axes_order` instead.
* The returned array is a fully materialized NumPy array, even if the input
originated from a lazy source.
"""
curr_image = np.asarray(arr)
curr_axes_full = axes_string
for ax in OME_axes:
if ax not in curr_axes_full:
curr_image = np.expand_dims(curr_image, axis=-1)
curr_axes_full += ax
permute_from = np.arange(len(curr_axes_full), dtype=int)
permute_to = [OME_axes_order[ax] for ax in curr_axes_full]
curr_image = np.moveaxis(curr_image, permute_from, permute_to)
return curr_image
def _correct_for_OME_axes_order(image: Union[np.ndarray, zarr.core.array.Array],
metadata: Dict[str, Any],
memap_large_file: bool =False,
verbose: bool =True) -> Tuple[Union[np.ndarray, zarr.core.array.Array], tuple, str]:
"""
Normalize an image array to canonical OME axis order (TZCYX).
This internal helper ensures that image data and its associated axis metadata
are brought into the canonical OME axis convention TZCYX. It supports both
in-memory NumPy arrays and Zarr-backed arrays and selects the appropriate
strategy depending on input type and memory constraints.
Three execution paths are distinguished:
* NumPy input:
The array is fully reordered in RAM and returned as a NumPy array.
* Zarr input with memap_large_file=False:
The full Zarr array is read once into RAM, reordered as a NumPy array, and
then written back to a newly created Zarr store at the original location.
* Zarr input with memap_large_file=True:
The data are copied slice-wise into a temporary Zarr store on disk, iterating
over all non-spatial axes while streaming full (Y, X) planes. This mode avoids
loading the entire dataset into memory and is intended for large files.
Missing OME axes are inserted as singleton dimensions, and existing axes are
permuted into the canonical order. The function operates purely on array data
and axis ordering; it does not modify or regenerate higher-level metadata.
Parameters
----------
image : np.ndarray or zarr.core.array.Array
Input image data. Either a fully materialized NumPy array or a Zarr array.
metadata : dict
Metadata dictionary containing at least the key ``"axes"``, which declares
the current axis order of the input image using characters from
{T, Z, C, Y, X}. Optional entries such as ``"SizeY"`` and ``"SizeX"`` are
used to determine optimal chunk sizes when creating Zarr outputs.
memap_large_file : bool, optional
If True and the input is a Zarr array, reorder the data via slice-wise,
on-disk copying to avoid loading the full dataset into RAM. If False,
the Zarr array is fully read into memory before reordering. Default is False.
Returns
-------
image_out : np.ndarray or zarr.core.array.Array
The reordered image data in canonical OME axis order TZCYX. The return type
matches the chosen execution path.
shape_out : tuple
Shape of the reordered image array.
axes_out : str
The canonical OME axis string, equal to ``_OME_AXES`` (typically "TZCYX").
Raises
------
ValueError
If the length of ``metadata["axes"]`` does not match ``image.ndim``.
Notes
-----
* The canonical axis mapping and axis sequence are taken from the module-level
constants ``_AXIS_TO_INDEX`` and ``_OME_AXES``.
* For Zarr inputs, the original store is replaced on disk by the reordered
version. Temporary stores are removed once the operation completes.
* When no persistent Zarr store path is available, the function falls back to
returning a fully materialized NumPy array.
"""
if verbose:
print(" Correcting for OME axes order...")
# canonical OME axes: TZCYX
#OME_axes_order = {"T": 0, "Z": 1, "C": 2, "Y": 3, "X": 4}
#OME_axes = "TZCYX"
OME_axes_order = _AXIS_TO_INDEX
OME_axes = _OME_AXES
curr_axes = metadata["axes"]
curr_shape = image.shape
if len(curr_axes) != len(curr_shape):
raise ValueError(
f"Metadata axes '{curr_axes}' (len={len(curr_axes)}) does not match "
f"image.ndim={len(curr_shape)}")
# branch 1: pure NumPy arrays:
if not isinstance(image, zarr.core.array.Array):
if verbose:
print(" Got NumPy array as input. Will return reordered NumPy array.")
curr_image = _reorder_numpy(image, curr_axes, OME_axes, OME_axes_order)
return curr_image, curr_image.shape, OME_axes
# branch 2: Zarr array w/o streaming (full read in RAM):
if verbose:
print(" Got Zarr array as input...")
src = image
if not memap_large_file:
# in this case, in this case the Zarr source is fully read into RAM once:
if verbose:
print(" memap_large_file=False: Reading full Zarr into RAM for reordering...")
curr_image = _reorder_numpy(src[...], curr_axes, OME_axes, OME_axes_order)
try:
src_path = str(src.store_path).replace("file://", "")
except AttributeError:
# no path available, return NumPy array directly
if verbose:
print(" While memap_large_file=False, no store_path available, returning NumPy array.")
return curr_image, curr_image.shape, OME_axes
if os.path.exists(src_path):
shutil.rmtree(src_path)
size_y = metadata.get("SizeY", curr_image.shape[OME_axes_order["Y"]])
size_x = metadata.get("SizeX", curr_image.shape[OME_axes_order["X"]])
# 5D chunks: (T, Z, C, Y, X)
target_chunks = (1, 1, 1, size_y, size_x)
dst = zarr.open(
src_path,
mode="w",
shape=curr_image.shape,
dtype=curr_image.dtype,
chunks=target_chunks)
if verbose:
print(" Writing reordered data back to Zarr store...")
dst[...] = curr_image
image_out = zarr.open(src_path, mode="r+")
return image_out, image_out.shape, OME_axes
# branch 3: memory-mapped large file, streaming copy in (Y, X):
if verbose:
print(" memap_large_file=True: Copying data slice-wise into Zarr array on disk (will take some time)...")
# target shape in TZCYX; fill missing axes with singleton dimensions:
full_shape = [1] * len(OME_axes) # T, Z, C, Y, X
for i, ax in enumerate(curr_axes):
full_shape[OME_axes_order[ax]] = curr_shape[i]
full_shape = tuple(full_shape)
iy = OME_axes_order["Y"]
ix = OME_axes_order["X"]
outer_axes_idx = [k for k in range(len(OME_axes)) if k not in (iy, ix)]
outer_shape = tuple(full_shape[k] for k in outer_axes_idx)
total_outer = int(np.prod(outer_shape)) if outer_shape else 1
# "total_outer" is 1 if only Y and X are present; it actually counts the number of
# iterations needed over all non-spatial axes.
try:
src_path = str(src.store_path).replace("file://", "")
except AttributeError:
# fallback: when no path exists, read once into RAM:
if verbose:
print(" While memap_large_file=True, no store_path available, returning NumPy array.")
curr_image = _reorder_numpy(src[...], curr_axes, OME_axes, OME_axes_order)
return curr_image, curr_image.shape, OME_axes
tmp_path = src_path + "_ome_tmp"
if os.path.exists(tmp_path):
shutil.rmtree(tmp_path)
size_y = metadata.get("SizeY", full_shape[OME_axes_order["Y"]])
size_x = metadata.get("SizeX", full_shape[OME_axes_order["X"]])
# 5D chunks: (T, Z, C, Y, X)
target_chunks = (1, 1, 1, size_y, size_x)
dst = zarr.open(
tmp_path,
mode="w",
shape=full_shape,
dtype=src.dtype,
chunks=target_chunks)
if total_outer == 1:
if verbose:
print(" Only Y and X axes present, copying full data at once...")
# dst is of shape (1,1,1,Y,X) and we need to copy src with shape (Y,X):
dst[0,0,0,...] = src[...]
#dst[...] = src[...]
else:
iterator = tqdm(
np.ndindex(*outer_shape),
total=total_outer,
desc=" Reordering axes to TZCYX and copying to temporary Zarr store"
)
for outer_idx in iterator:
dest_index = [None] * len(OME_axes)
o_pos = 0
for k in range(len(OME_axes)):
if k in (iy, ix):
dest_index[k] = slice(None)
else:
dest_index[k] = outer_idx[o_pos]
o_pos += 1
src_index = []
for i, ax in enumerate(curr_axes):
if ax in ("Y", "X"):
src_index.append(slice(None))
else:
j = OME_axes_order[ax]
src_index.append(dest_index[j])
dst[tuple(dest_index)] = src[tuple(src_index)]
if os.path.exists(src_path):
shutil.rmtree(src_path)
os.rename(tmp_path, src_path)
image_out = zarr.open(src_path, mode="r+")
return image_out, image_out.shape, OME_axes
def _batch_correct_for_OME_axes_order(images: List[Union[np.ndarray, zarr.core.array.Array]],
metadatas: List[Dict[str, Any]],
memap_large_file: bool =False,
verbose: bool =True
) -> Tuple[List[Union[np.ndarray, zarr.core.array.Array]], List[Dict[str, Any]]]:
"""
Apply OME axis normalization to a batch of images.
This function is a thin batch wrapper around `_correct_for_OME_axes_order`. It
iterates over a list of images and their corresponding metadata dictionaries
and normalizes each image to the canonical OME axis order TZCYX.
Each image is processed independently using the same logic as in the single-image
function, including the choice between in-RAM reordering and slice-wise,
on-disk copying for Zarr arrays depending on `memap_large_file`.
The input lists are modified in place: both the image objects and the associated
metadata entries (``"shape"`` and ``"axes"``) are updated for each element.
Parameters
----------
images : list of np.ndarray or zarr.core.array.Array
List of input images to be reordered.
metadatas : list of dict
List of metadata dictionaries corresponding to `images`. Each dictionary
must contain the key ``"axes"`` describing the current axis order of the
associated image.
memap_large_file : bool, optional
Forwarded to `_correct_for_OME_axes_order`. If True, Zarr inputs are
reordered via slice-wise on-disk copying to limit memory usage. Default is
False.
Returns
-------
images_out : list of np.ndarray or zarr.core.array.Array
List of reordered images in canonical OME axis order TZCYX. Elements may be
NumPy arrays or Zarr arrays, depending on input type and processing mode.
metadatas_out : list of dict
The updated metadata dictionaries. For each entry, ``"shape"`` and
``"axes"`` reflect the reordered image.
Notes
-----
* Processing is performed sequentially; no parallelism is introduced.
* This function mutates its inputs in place.
"""
# ensure that both lists have the same length:
if len(images) != len(metadatas):
if verbose:
print("Error: In _batch_correct_for_OME_axes_order, images and metadatas have different lengths!")
print(f" len(images) = {len(images)}, len(metadatas) = {len(metadatas)}. Returning unmodified inputs.")
return images, metadatas
for image_i in range(len(images)):
images[image_i], metadatas[image_i]["shape"], metadatas[image_i]["axes"] = \
_correct_for_OME_axes_order(images[image_i], metadatas[image_i], memap_large_file=memap_large_file,
verbose=verbose)
return images, metadatas
# filter-function for removing non-OME-conform axes from CZI files:
def _filter_image_data_for_ome_tif(imagedata, axes):
"""
Filter image data to retain only OME-relevant axes.
This helper removes non-OME axes from an image array by selecting the first
index along any axis that is not part of the canonical OME axis set. The
resulting array contains only axes from the OME convention, while preserving
their original relative order.
The operation is purely index-based: non-OME axes are collapsed via integer
indexing, and no resampling or data modification beyond slicing is performed.
Parameters
----------
imagedata : np.ndarray or array-like
Input image data array.
axes : str
Axis declaration for `imagedata`. Its length must match
``imagedata.ndim``. Axes not present in the canonical OME axis set are
removed by slicing.
Returns
-------
filtered_data : np.ndarray
The image data restricted to OME-relevant axes.
filtered_axes : str
Axis string corresponding to `filtered_data`, containing only axes from
the canonical OME axis set and in the same relative order as in `axes`.
Notes
-----
* The canonical OME axis set is taken from the module-level constant
``_OME_AXES``.
* Non-OME axes are reduced by taking index 0 along that dimension, which
implicitly assumes that these axes are either singleton or that only the
first element is of interest.
* This function performs no validation of axis semantics beyond string
membership.
"""
# imagedata = CZI_image # for testing
# axes = metadata["axes"] # for testing
# define desired axes:
#desired_axes = 'TZCYX'
desired_axes = _OME_AXES
# determine the slices for the desired axes:
slices = [slice(None) if axes[i] in desired_axes else 0 for i in range(imagedata.ndim)]
# apply the slices to filter the data:
filtered_data = imagedata[tuple(slices)]
# filter the axis string:
filtered_axes = ''.join([axis for axis in axes if axis in desired_axes])
return filtered_data, filtered_axes
# extract the SizeX, SizeY, SizeZ, SizeC, SizeT, SizeS from the metadata:
def _get_ome_image_sizes(imageshape, metadata):
"""
Populate OME size fields from an image shape and axis declaration.
This helper derives the standard OME size entries (``SizeT``, ``SizeZ``,
``SizeC``, ``SizeY``, ``SizeX``) from the provided image shape and axis string.
All OME size fields are first initialized to 1 and then updated for axes that
are present in the image.
The function operates on a shallow copy of the input metadata dictionary and
does not modify the original object.
Parameters
----------
imageshape : tuple
Shape of the image array. Its length must match the length of
``metadata["axes"]``.
metadata : dict
Metadata dictionary containing an ``"axes"`` entry that declares the axis
order of the image using characters from {T, Z, C, Y, X}.
Returns
-------
metadata_update : dict
A copy of the input metadata with OME-compliant size entries added or
updated. For each axis in the canonical OME axis set, a corresponding
``Size<axis>`` key is present.
Notes
-----
* The canonical OME axis set is taken from the module-level constant
``_OME_AXES``.
* Axes not present in ``metadata["axes"]`` remain with size 1, consistent with
OME conventions for singleton dimensions.
* No validation is performed beyond positional correspondence between
`imageshape` and ``metadata["axes"]``.
"""
metadata_update = metadata.copy()
#default_OME_axes = 'TZCYX'
default_OME_axes = _OME_AXES
# initialize size metadata:
for axis in default_OME_axes:
metadata_update[f"Size{axis}"] = 1
# update size metadata:
for axis_i, axis in enumerate(metadata_update["axes"]):
metadata_update[f"Size{axis}"] = imageshape[axis_i]
return metadata_update
# function to dynamically extract namespace:
def _get_namespace(xml_root):
"""
Extract the XML namespace from an ElementTree root element.
This helper inspects the tag of an XML root element and extracts the namespace
URI if the tag is namespace-qualified. ElementTree represents such tags in the
form ``"{namespace}tagname"``. If no namespace is present, an empty string is
returned.
Parameters
----------
xml_root : xml.etree.ElementTree.Element
Root element of an XML document.
Returns
-------
namespace : str
The namespace URI extracted from ``xml_root.tag``, or an empty string if
the element is not namespace-qualified.
Notes
-----
* The function relies on a simple regular expression match and does not
validate the namespace URI.
* This helper is typically used when parsing OME-XML or similar
namespace-qualified XML formats.
"""
match = re.match(r'\{(.*)\}', xml_root.tag)
return match.group(1) if match else ''
# function to parse OME-XML metadata into human readable format:
def _parse_ome_metadata(ome_xml):
"""
Parse OME-XML metadata and extract commonly used fields into a plain dictionary.
This helper parses an OME-XML string and extracts a subset of pixel and
acquisition metadata into a Python dictionary with simple scalar values.
It is designed to be tolerant to missing attributes and to handle OME-XML
documents that use arbitrary XML namespaces.
The function focuses at the moment on two groups of information (and can be
extended in the future):
* The ``Pixels`` element:
Extracts image dimensions (``SizeX``, ``SizeY``, ``SizeZ``, ``SizeC``,
``SizeT``), physical voxel sizes (``PhysicalSizeX``, ``PhysicalSizeY``,
``PhysicalSizeZ``) including their units, and the temporal sampling
(``TimeIncrement`` and its unit). Additionally, it counts the number of
``Channel`` elements found under ``Pixels``.
* ``MapAnnotation`` elements:
Extracts key value pairs from ``MapAnnotation/Value/M`` entries and stores
them under ``metadata["Annotations"]``. The ``Namespace`` attribute of the
MapAnnotation is recorded if present.
Missing or malformed numeric attributes are left at default values, and unit
fields fall back to standard defaults.
Parameters
----------
ome_xml : str
OME-XML metadata as a string.
Returns
-------
metadata : dict
Dictionary containing extracted metadata fields. Keys include:
* ``SizeX``, ``SizeY``, ``SizeZ``, ``SizeC``, ``SizeT`` (int)
* ``PhysicalSizeX``, ``PhysicalSizeY``, ``PhysicalSizeZ`` (float)
* ``PhysicalSizeXUnit``, ``PhysicalSizeYUnit``, ``PhysicalSizeZUnit`` (str)
* ``TimeIncrement`` (float), ``TimeIncrementUnit`` (str)
* ``Channel_Count`` (int)
* ``Annotations`` (dict), present even if empty
Notes
-----
* XML parsing is performed via ``xml.etree.ElementTree``.
* Namespace handling is based on `_get_namespace`, and tags are queried through
a namespace mapping under the prefix ``"ome"``.
* The function is intentionally permissive: it does not raise on missing fields
and does not validate consistency across reported sizes and actual image data.
* The returned annotation dictionary is a flat mapping of keys to strings.
If multiple MapAnnotations contain identical keys, later entries will
overwrite earlier ones.
"""
# parse the XML content:
root = ET.fromstring(ome_xml)
namespace = _get_namespace(root)
ns = {'ome': namespace} # Namespace dictionary
# initialize metadata dictionary with default values:
metadata = {
'SizeX': 0,
'SizeY': 0,
'SizeZ': 0,
'SizeC': 0,
'SizeT': 0,
'PhysicalSizeX': 1.0,
'PhysicalSizeY': 1.0,
'PhysicalSizeZ': 1.0,
'PhysicalSizeXUnit': 'micron',
'PhysicalSizeYUnit': 'micron',
'PhysicalSizeZUnit': 'micron',
'TimeIncrement': 0.0,
'TimeIncrementUnit': 'seconds',
'Channel_Count': 0}
try:
# find the 'Pixels' element:
pixels = root.find('.//ome:Pixels', ns)
if pixels is not None:
# extract metadata with try-except for each attribute:
# SizeX:
try:
metadata['SizeX'] = int(pixels.attrib['SizeX'])
except (KeyError, ValueError):
pass
# SizeY:
try:
metadata['SizeY'] = int(pixels.attrib['SizeY'])
except (KeyError, ValueError):
pass
# SizeZ:
try:
metadata['SizeZ'] = int(pixels.attrib['SizeZ'])
except (KeyError, ValueError):
pass
# SizeC:
try:
metadata['SizeC'] = int(pixels.attrib['SizeC'])
except (KeyError, ValueError):
pass
# SizeT:
try:
metadata['SizeT'] = int(pixels.attrib['SizeT'])
except (KeyError, ValueError):
pass
# PhysicalSizeX:
try:
metadata['PhysicalSizeX'] = float(pixels.attrib['PhysicalSizeX'])
except (KeyError, ValueError):
pass
# PhysicalSizeY:
try:
metadata['PhysicalSizeY'] = float(pixels.attrib['PhysicalSizeY'])
except (KeyError, ValueError):
pass
# PhysicalSizeZ:
try:
metadata['PhysicalSizeZ'] = float(pixels.attrib['PhysicalSizeZ'])
except (KeyError, ValueError):
pass
metadata['PhysicalSizeXUnit'] = pixels.attrib.get('PhysicalSizeXUnit', 'micron')
metadata['PhysicalSizeYUnit'] = pixels.attrib.get('PhysicalSizeYUnit', 'micron')
metadata['PhysicalSizeZUnit'] = pixels.attrib.get('PhysicalSizeZUnit', 'micron')
try:
metadata['TimeIncrement'] = float(pixels.attrib['TimeIncrement'])
except (KeyError, ValueError):
pass
metadata['TimeIncrementUnit'] = pixels.attrib.get('TimeIncrementUnit', 'seconds')
# count channels:
channels = pixels.findall('.//ome:Channel', ns)
metadata['Channel_Count'] = len(channels)
except ET.ParseError:
print("Error: Invalid XML content. Could not extract Pixels metadata from OME-XML.")
# find 'MapAnnotation's:
try:
# collect all Map Annotations in a separate sub-dictionary:
metadata['Annotations'] = {}
# there COULD be multiple MapAnnotations, so we loop over them:
for ma in root.findall('.//ome:MapAnnotation', ns):
# ma = root.findall('.//ome:MapAnnotation', ns)[0] # for testing
# extract Namespace attribute:
try:
ns_attr = ma.get('Namespace', '')
except:
ns_attr = 'unknown'
metadata['Annotations']['Namespace'] = ns_attr
# check whether there is a <Value> element, otherwise skip:
value_elem = ma.find('ome:Value', ns)
if value_elem is None:
continue
# read all <M K="...">value</M> elements:
for m in value_elem.findall('ome:M', ns):
key = m.get('K')
if not key:
continue
val = (m.text or '').strip()
metadata['Annotations'][key] = val
except ET.ParseError:
print("Could not extract MapAnnotation from OME-XML.")
return metadata
# function to standardize read imagej_metadata:
def _rational_to_float(r):
"""
Convert a TIFF rational value to a float.
Parameters
----------
r : tuple, list, or float
The rational value, typically as (numerator, denominator) or a float.
Returns
-------
float or None
The converted float value, or None if conversion fails.
Notes
-----
* TIFF rationals are often stored as (num, den) tuples. If the denominator is zero,
None is returned to avoid division errors.
* If `r` is already a float or can be directly converted, that value is returned.
* If `r` is None or cannot be converted, the function returns None.
"""
# TIFF rationals often come as (num, den):
if r is None:
return None
if isinstance(r, (tuple, list)) and len(r) == 2:
num, den = r
num = float(num)
den = float(den)
if den == 0:
return None
return num / den
try:
return float(r)
except Exception:
return None
def _unit_to_um_factor_from_resolutionunit(v):
"""
Convert a TIFF ResolutionUnit value to a micron scaling factor.
Parameters
----------
v : int or str
The TIFF ResolutionUnit value, either as an integer code or a descriptive string.
Returns
-------
float or None
The scaling factor to convert from the specified unit to microns, or None if
the unit is unrecognized.
Notes
-----
* Standard TIFF ResolutionUnit codes are:
- 1: None (interpreted here as microns)
- 2: Inches (1 inch = 25400 microns)
- 3: Centimeter (1 cm = 10000 microns)
* Descriptive strings such as "inch", "centimeter", "millimeter", "micron", and "meter"
are also recognized in a case-insensitive manner.
* If `v` is None or does not match any known unit, the function returns None.
"""
# TIFF ResolutionUnit: usually int codes or strings.
# Standard: 2=inches, 3=centimeter.
# Set by default in OMIO: 1=None (actually; in OMIO, we interprete this as microns)
if v is None:
return None
if isinstance(v, int):
if v == 2:
return 25400.0
if v == 3:
return 10000.0
if v == 1:
return 1.0
return None
s = str(v).strip().lower()
if "inch" in s:
return 25400.0
if "centimeter" in s or s == "cm":
return 10000.0
if "millimeter" in s or s == "mm":
return 1000.0
if "micron" in s or s == "µm" or s == "um":
return 1.0
if "meter" in s or s == "m":
return 1e6
return None
def _standardize_imagej_metadata(imagej_metadata: Dict[str, Any],
tags: Union[list, None] = None,
verbose: bool = False
) -> Dict[str, Any]:
"""
Standardize ImageJ metadata keys and recover physical pixel sizes when possible.
This helper normalizes the key casing of ImageJ metadata to a consistent
OME-like naming scheme (for example ``sizex`` to ``SizeX`` and
``physicalsizex`` to ``PhysicalSizeX``) while leaving unknown keys unchanged.
It additionally attempts to recover missing physical pixel size fields from
common ImageJ encodings.
If ``PhysicalSizeX`` is absent but an ``Info`` field is present, the function
parses the ``Info`` string line-by-line and looks for entries of the form
``Scaling|Distance|...``. When found, it converts the stored scaling values into
micron-based physical sizes and populates ``PhysicalSizeX``, ``PhysicalSizeY``,
and ``PhysicalSizeZ`` accordingly. If ``PhysicalSizeZ`` is still missing after
this step, the function falls back to ImageJ's ``spacing`` field if available.
Parameters
----------
imagej_metadata : dict
ImageJ metadata dictionary. The mapping table assumes keys are already
lowercased, but any keys are accepted. Values are preserved as-is.
Returns
-------
standardized_metadata : dict
New dictionary containing standardized keys. Non-standard keys are carried
over unchanged. Physical size entries may be added if they can be inferred
from ``Info`` or ``spacing``.
Notes
-----
* Key standardization is performed via a fixed mapping table and is therefore
conservative: only known keys are renamed.
* The ``Info`` parsing logic is heuristic and depends on ImageJ writing a
flattened scaling structure using keys such as ``Scaling|Distance|Id #1``,
``Scaling|Distance|Value #1``, and ``Scaling|Distance|DefaultUnitFormat #1``.
* Physical size reconstruction from ``Info`` is best-effort. Failures are caught
and reported via printing, and missing values are left unset.
* If both reconstructed ``PhysicalSizeZ`` and ``spacing`` are present, the
reconstructed value takes precedence.
"""
# key mapping: lowercase keys to their standardized letter case:
key_mapping = {
'axes': 'axes',
'shape': 'shape',
'sizex': 'SizeX',
'sizey': 'SizeY',
'sizec': 'SizeC',
'sizet': 'SizeT',
'sizes': 'SizeZ',
'physicalsizex': 'PhysicalSizeX',
'physicalsizey': 'PhysicalSizeY',
'physicalsizez': 'PhysicalSizeZ',
'unit': 'unit',
'physicalsizexunit': 'PhysicalSizeXUnit',
'physicalsizeyunit': 'PhysicalSizeYUnit',
'timeincrement': 'TimeIncrement',
'timeincrementunit': 'TimeIncrementUnit',
'frame_rate': 'frame_rate',
'structuredannotations': 'StructuredAnnotations'}
# initialize new dictionary to hold standardized metadata:
standardized_metadata = {}
# process each key in the input dictionary:
for key, value in imagej_metadata.items():
# if the key is in the mapping, use the standardized key:
standardized_key = key_mapping.get(key, key)
standardized_metadata[standardized_key] = value
"""
In some imagej metadata, PhysicalSizeX and PhysicalSizeY are written into a collapsed
XML/JSON structure under "Info", where relevant infos are stored under:
Scaling|Distance|DefaultUnitFormat #1 = µm
Scaling|Distance|DefaultUnitFormat #2 = µm
Scaling|Distance|DefaultUnitFormat #3 = µm
Scaling|Distance|Id #1 = X
Scaling|Distance|Id #2 = Y
Scaling|Distance|Id #3 = Z
Scaling|Distance|Value #1 = 1.135E-07
Scaling|Distance|Value #2 = 1.135E-07
Scaling|Distance|Value #3 = 5E-07
Since DefaultUnitFormat is, e.g., here 'µm', 'Scaling|Distance|Value' is the actual dispersion
which needs to be converted into micron units:
PhysicalSizeX = Scaling|Distance|Value #1 * factor to convert DefaultUnitFormat to micron
...
"""
unit_map_info = {'µm': 1e6, 'nm': 1e4, 'mm': 1e3, 'cm': 1e-3, 'm': 1.0}
#unit_map_info = {'µm': 1.0,'um': 1.0,'nm': 1e-3,'mm': 1e3,'cm': 1e4,'m': 1e6,}
unit_map_tags = {'inch': 25400.0, 'centimeter': 10000.0, 'millimeter': 1000.0, 'micron': 1.0, 'meter': 1e6}
if "PhysicalSizeX" not in standardized_metadata or "PhysicalSizeY" not in standardized_metadata:
# we do not also check for PhysicalSizeY/Z here, since they often come/miss together.
# check whether standardized_metadata contains 'Info' key:
if "Info" in standardized_metadata:
info_str = standardized_metadata["Info"]
# info_str is a string of form "' BitsPerPixel = 14\n DimensionOrder = XYCZT\n IsInterleaved = false\n IsRGB = false\n ...",
# thus we need to parse it line by line:
info_lines = info_str.split('\n')
scaling_distance = {}
for line in info_lines:
line = line.strip()
if line.startswith("Scaling|Distance|"):
parts = line.split(' = ')
if len(parts) == 2:
key_part = parts[0].replace("Scaling|Distance|", "")
value_part = parts[1]
scaling_distance[key_part] = value_part
# now extract PhysicalSizeX, PhysicalSizeY, PhysicalSizeZ:
try:
for i in range(1, 4):
id_key = f"Id #{i}"
value_key = f"Value #{i}"
unit_key = f"DefaultUnitFormat #{i}"
if id_key in scaling_distance and value_key in scaling_distance and unit_key in scaling_distance:
axis_id = scaling_distance[id_key]
axis_value = float(scaling_distance[value_key])
axis_unit = scaling_distance[unit_key]
if axis_unit in unit_map_info:
physical_size = axis_value * unit_map_info[axis_unit]
if axis_id == 'X':
standardized_metadata["PhysicalSizeX"] = physical_size
elif axis_id == 'Y':
standardized_metadata["PhysicalSizeY"] = physical_size
elif axis_id == 'Z':
standardized_metadata["PhysicalSizeZ"] = physical_size
except Exception as e:
print(f" Error while extracting PhysicalSize from Info: {e}")
print(f" Leaving PhysicalSize entries empty.")
# PhysicalSizeX/Y could now still be missing; try to extract from tags:
if "PhysicalSizeX" not in standardized_metadata or "PhysicalSizeY" not in standardized_metadata:
if tags is not None:
# sometimes, the tags list contains 'XResolution' and 'YResolution' entries:
try:
# at the moment, we only consider tags[0], but there could be multiple tags
# (otherwise run the following loop additionally for all tags in tags, for tag in tags:):
tag0 = tags[0] if isinstance(tags, list) and len(tags) > 0 else tags
XRes = None
YRes = None
ResUnit = None
for _, t in tag0.items():
name = getattr(t, "name", None)
if name == "XResolution":
XRes = getattr(t, "value", None)
if verbose:
print(f" Found XResolution tag with value: {XRes}")
elif name == "YResolution":
YRes = getattr(t, "value", None)
if verbose:
print(f" Found YResolution tag with value: {YRes}")
elif name == "ResolutionUnit":
ResUnit = getattr(t, "value", None)
if verbose:
print(f" Found ResolutionUnit tag with value: {ResUnit}")
x_pixels_per_unit = _rational_to_float(XRes)
y_pixels_per_unit = _rational_to_float(YRes)
factor_um = _unit_to_um_factor_from_resolutionunit(ResUnit)
# pixels_per_unit must be > 0 to avoid division by zero:
if (x_pixels_per_unit is not None and x_pixels_per_unit > 0 and
y_pixels_per_unit is not None and y_pixels_per_unit > 0 and
factor_um is not None):
standardized_metadata["PhysicalSizeX"] = factor_um / x_pixels_per_unit
standardized_metadata["PhysicalSizeY"] = factor_um / y_pixels_per_unit
standardized_metadata.setdefault("PhysicalSizeXUnit", "micron")
standardized_metadata.setdefault("PhysicalSizeYUnit", "micron")
if verbose:
print(f" Calculated PhysicalSizeX = {standardized_metadata['PhysicalSizeX']} micron")
print(f" Calculated PhysicalSizeY = {standardized_metadata['PhysicalSizeY']} micron")
else:
if verbose:
print(" Could not extract PhysicalSizeX/Y from tags due to missing or invalid values.")
except Exception as e:
print(f" Error while extracting PhysicalSize from tags: {e}")
print(f" Leaving PhysicalSizeX/Y entries empty.")
# handle missing PhysicalSizeZ by checking 'spacing' key:
if "PhysicalSizeZ" not in standardized_metadata:
if "spacing" in imagej_metadata:
standardized_metadata["PhysicalSizeZ"] = imagej_metadata["spacing"]
if verbose:
print(f" Extracted PhysicalSizeZ from 'spacing': {standardized_metadata['PhysicalSizeZ']}")
if 'unit' in standardized_metadata:
standardized_metadata["PhysicalSizeZUnit"] = standardized_metadata['unit']
# convert to PhysicalSizeZ in micron:
unit = standardized_metadata['unit'].lower()
if unit in unit_map_tags:
factor = unit_map_tags[unit]
standardized_metadata["PhysicalSizeZ"] = standardized_metadata["PhysicalSizeZ"] * factor
standardized_metadata["PhysicalSizeZUnit"] = "micron"
if verbose:
print(f" Converted PhysicalSizeZ to micron: {standardized_metadata['PhysicalSizeZ']} micron")
return standardized_metadata
# function to standardize read lsm_metadata:
def _standardize_lsm_metadata(lsm_metadata):
"""
Standardize Zeiss LSM metadata to an OME and ImageJ-compatible key scheme.
This helper converts selected keys from Zeiss LSM metadata into a standardized
naming convention aligned with the keys used for ImageJ and OME metadata. Only
fields with a clear semantic correspondence are mapped; all other entries are
copied verbatim.
The function operates on a new dictionary and does not modify the input
metadata object.
Parameters
----------
lsm_metadata : dict
Metadata dictionary as returned by ``tifffile.lsm_metadata``.
Returns
-------
standardized_metadata : dict
Metadata dictionary with standardized keys. Dimension and voxel size fields
are renamed to OME-style ``Size*`` and ``PhysicalSize*`` entries, and
temporal sampling is mapped to ``TimeIncrement``.
Notes
-----
* Zeiss LSM uses the non-standard spelling ``TimeIntervall``; this key is
explicitly mapped to ``TimeIncrement``.
* No unit conversion is performed. Values are transferred as-is and are
assumed to be expressed in the units provided by the original LSM metadata.
* Keys without an explicit mapping are preserved unchanged.
"""
# mapping LSM → standardized ImageJ-like terminology:
key_mapping = {
'DimensionX': 'SizeX',
'DimensionY': 'SizeY',
'DimensionZ': 'SizeZ',
'DimensionChannels': 'SizeC',
'DimensionTime': 'SizeT',
'VoxelSizeX': 'PhysicalSizeX',
'VoxelSizeY': 'PhysicalSizeY',
'VoxelSizeZ': 'PhysicalSizeZ',
# Zeiss uses "TimeIntervall" (typo in original format)
'TimeIntervall': 'TimeIncrement'
}
standardized_metadata = {}
for key, value in lsm_metadata.items():
# apply mapping if available, otherwise preserve key
standardized_key = key_mapping.get(key, key)
standardized_metadata[standardized_key] = value
return standardized_metadata
# function to add file properties to metadata:
def _add_file_properties_to_metadata(metadata, fname, original_metadata_type="N/A"):
"""
Augment a metadata dictionary with file-level provenance information.
This helper ensures that a set of standard file-related metadata fields is
present in the provided metadata dictionary. Missing entries are populated
from the file system using the supplied file path. Existing keys are preserved
and not overwritten.
The added fields capture basic provenance information such as the original
file name, file type, parent directory, metadata source, and a timestamp
derived from the file system.
Parameters
----------
metadata : dict or None
Metadata dictionary to be updated. If None, a new dictionary is created.
fname : str
Full path to the source file.
original_metadata_type : str, optional
Identifier describing the origin or format of the original metadata
(for example ``"OME_XML"``, ``"ImageJ"``, or ``"LSM"``). Default is ``"N/A"``.
Returns
-------
metadata : dict
The updated metadata dictionary containing file provenance fields.
Notes
-----
* File properties are added only if the corresponding keys are not already
present in the dictionary.
* The file type is derived from the filename extension without the leading
dot.
* The timestamp is obtained via ``os.path.getctime`` and expressed in UTC using
an ISO-like string format. On some platforms, this value may represent the
last metadata change time rather than true file creation time.
* If file system access fails, the creation or change date is set to ``"N/A"``.
"""
# ensure metadata dictionary exists:
if metadata is None:
metadata = {}
# file path and name properties:
folder_path = os.path.dirname(fname)
fname_base, fname_extension = os.path.splitext(os.path.basename(fname))
# add missing keys with derived values:
metadata.setdefault("original_filetype", fname_extension[1:]) # remove leading '.'
metadata.setdefault("original_filename", fname_base + fname_extension)
metadata.setdefault("original_parentfolder", folder_path)
metadata.setdefault("original_metadata_type", original_metadata_type)
# add creation or change date:
try:
creation_date = datetime.datetime.fromtimestamp(
os.path.getctime(fname), datetime.timezone.utc).strftime('%Y-%m-%dT%H:%M:%S')
metadata.setdefault("original_creation_or_change_date", creation_date)
except Exception:
metadata.setdefault("original_creation_or_change_date", "N/A")
return metadata
# function to check and update metadata units:
def _metadata_units_check(metadata, pixelunit="micron"):
"""
Normalize unit fields in a metadata dictionary.
This helper ensures that physical size unit entries are present and expressed
using a consistent textual representation. Missing unit fields are populated
with a default unit, and the commonly used symbol ``"µm"`` is normalized to the
string ``"micron"``.
The function operates in place on the provided metadata dictionary.
Parameters
----------
metadata : dict
Metadata dictionary to be checked and updated.
pixelunit : str, optional
Default unit string to assign when a unit field is missing. Default is
``"micron"``.
Returns
-------
metadata : dict
The updated metadata dictionary with normalized unit entries.
Notes
-----
* The following keys are checked: ``PhysicalSizeXUnit``, ``PhysicalSizeYUnit``,
``PhysicalSizeZUnit``, and ``unit``.
* Only a simple string substitution is performed; no numerical unit conversion
of the corresponding physical size values is applied.
* The function mutates the input dictionary and also returns it for convenience.
"""
# define the keys to check and their default value:
unit_keys = [
'PhysicalSizeXUnit',
'PhysicalSizeYUnit',
'PhysicalSizeZUnit',
'unit']
# loop over each key and check/update:
for key in unit_keys:
# add key with default value if missing:
if key not in metadata:
metadata[key] = pixelunit
# convert 'µm' to 'micron' if present:
elif metadata[key] == 'µm':
metadata[key] = 'micron'
# "unit"
return metadata
def _normalize_tiff_axes_string(reference_axes: str) -> str:
"""
Normalize TIFF axis labels to OMIO's expected conventions.
"""
if not isinstance(reference_axes, str):
return reference_axes
# in some weird tifs, an "I" is put instead of "T", so we correct for that:
reference_axes = reference_axes.replace('I', 'T')
# if reference_axes=="YXS", we assume we got a RGB image and thus we convert S to C:
if reference_axes == "YXS":
reference_axes = "YXC"
# if there is a "Q" in reference_axes, we convert it to "C", "T" or "Z" (depending
# on what is missing and in this order):
if 'Q' in reference_axes:
if 'C' not in reference_axes:
reference_axes = reference_axes.replace('Q', 'C')
elif 'T' not in reference_axes:
reference_axes = reference_axes.replace('Q', 'T')
elif 'Z' not in reference_axes:
reference_axes = reference_axes.replace('Q', 'Z')
elif 'P' not in reference_axes:
reference_axes = reference_axes.replace('Q', 'P')
else:
raise ValueError(
"Error: Unable to map axis 'Q' to C, T, Z or P, as all are already present in reference axes."
)
return reference_axes
def _get_axes_from_shaped_metadata(shaped_metadata):
"""
Extract an axis string from tifffile's shaped metadata if available.
"""
if isinstance(shaped_metadata, dict):
candidates = [shaped_metadata]
elif isinstance(shaped_metadata, (list, tuple)):
candidates = [item for item in shaped_metadata if isinstance(item, dict)]
else:
candidates = []
for item in candidates:
axes = item.get("axes")
if isinstance(axes, str) and axes:
return axes
return None
# function to check and update metadata axes and its correct order from reading:
def _ensure_axes_in_metadata(metadata, tif):
"""
Ensure that axis metadata matches the axis order reported by a TIFF file.
This helper verifies that the ``"axes"`` entry in a metadata dictionary is
present and consistent with the axis declaration provided by
``tif.series[0].axes``. If the key is missing or inconsistent, it is updated
to match the TIFF reference.
A known non-standard convention in some TIFF files, where the time axis is
encoded as ``"I"`` instead of ``"T"``, is explicitly corrected.
Parameters
----------
metadata : dict
Metadata dictionary to be updated.
tif : tifffile.TiffFile
Opened TIFF file object from which the reference axis order is obtained.
Returns
-------
metadata : dict
The updated metadata dictionary with a validated ``"axes"`` entry.
Notes
-----
* The function attempts to read ``tif.series[0].axes`` and falls back to the
string ``"unknown"`` if this fails.
* If an ``"axes"`` entry already exists and differs from the TIFF reference,
it is overwritten and a diagnostic message is printed.
* The input dictionary is modified in place and also returned for convenience.
"""
try:
# reference axes from tif.series[0]:
reference_axes = tif.series[0].axes
except (IndexError, AttributeError):
print("Error: Unable to extract axes from tif.series[0]. Setting to 'unknown'.")
reference_axes = 'unknown'
reference_axes = _normalize_tiff_axes_string(reference_axes)
shaped_axes = _normalize_tiff_axes_string(
_get_axes_from_shaped_metadata(getattr(tif, "shaped_metadata", None))
)
target_ndim = len(metadata.get("shape", ())) if metadata.get("shape") is not None else 0
if shaped_axes and target_ndim and len(reference_axes) != target_ndim and len(shaped_axes) == target_ndim:
reference_axes = shaped_axes
existing_axes = metadata.get("axes")
if (
isinstance(existing_axes, str)
and target_ndim
and len(existing_axes) == target_ndim
and len(reference_axes) != target_ndim
):
reference_axes = _normalize_tiff_axes_string(existing_axes)
if 'axes' in metadata:
# overwrite if the existing axes do not match:
if metadata['axes'] != reference_axes:
print(f"Mismatch found: existing axes '{metadata['axes']}' does not match reference axes '{reference_axes}'. Overwriting.")
metadata['axes'] = reference_axes
else:
# add the 'axes' key if it is missing:
metadata['axes'] = reference_axes
return metadata
# function to ensure shape in metadata:
def _ensure_shape_in_metadata(metadata, image_shape):
"""
Ensure that shape metadata matches the actual image array shape.
This helper verifies that the ``"shape"`` entry in a metadata dictionary is
present and consistent with the provided image shape. If the key is missing or
contains a different value, it is updated to reflect the true shape of the
image array.
Differences between the stored metadata shape and the actual array shape can
occur when readers collapse singleton dimensions. Such mismatches are corrected
and reported via diagnostic messages.
Parameters
----------
metadata : dict
Metadata dictionary to be updated.
image_shape : tuple
Actual shape of the image array.
Returns
-------
metadata : dict
The updated metadata dictionary with a validated ``"shape"`` entry.
Notes
-----
* If a mismatch is detected, the metadata value is overwritten and a diagnostic
message is printed.
* The input dictionary is modified in place and also returned for convenience.
"""
if 'shape' in metadata:
# overwrite if the existing shape does not match:
if metadata['shape'] != image_shape:
print(f" Info: Mismatch found between actual image shape {image_shape} and shape {metadata['shape']}")
print(f" read from its metadata. Correcting metadata entry. This is nothing to worry about, as")
print(f" the tifffile reader either squashed singleton dimensions in the shape or OMIO folded S into C.")
metadata['shape'] = image_shape
else:
# add the 'shape' key if it is missing:
metadata['shape'] = image_shape
return metadata
# function to fold sample axis 'S' into channel axis 'C':
def _fold_samples_axis_into_channel(image,
axes: str,
zarr_store: str | None = None,
cache_folder: str | None = None,
base_name: str = "omio",
verbose: bool = True):
"""
Fold tifffile sample axis 'S' (e.g. RGB samples per pixel) into channel axis 'C'.
Behavior
* If 'S' not in axes: return unchanged.
* If no 'C' exists: rename S -> C (no folding, just renaming).
* If both 'C' and 'S' exist: fold into a single channel axis: C_new = C_old * S.
For Zarr inputs, this creates a new Zarr array and copies slice-wise.
Parameters
----------
image : np.ndarray or zarr.core.array.Array
axes : str
zarr_store : {None, "memory", "disk"}
If image is Zarr and zarr_store is not None, keep result as Zarr.
If None, Zarr input will be materialized to NumPy.
cache_folder : str or None
Required for zarr_store="disk". Folder where a new .zarr store is created.
base_name : str
Used to name disk stores.
"""
if "S" not in axes:
return image, axes
s_idx = axes.index("S")
# case A: no channel axis exists, typical RGB: YXS -> YXC:
if "C" not in axes:
if verbose:
print(" Info: Found sample axis 'S' without channel axis. Renaming S->C.")
return image, axes.replace("S", "C")
c_idx = axes.index("C")
# For simplicity and predictability, enforce that C is before S.
# If not, we will treat it logically anyway.
axes_out = axes.replace("S", "")
# NumPy path:
if not isinstance(image, zarr.core.array.Array):
if verbose:
print(" Info: Found sample axis 'S' and channel axis 'C'. Folding S into C (NumPy).")
arr = np.asarray(image)
# move S next to C (right after C) if needed:
if s_idx != c_idx + 1:
arr = np.moveaxis(arr, s_idx, c_idx + 1)
axes_list = list(axes)
s_char = axes_list.pop(s_idx)
axes_list.insert(c_idx + 1, s_char)
axes = "".join(axes_list)
s_idx = c_idx + 1
c_size = arr.shape[c_idx]
s_size = arr.shape[s_idx]
new_c = int(c_size) * int(s_size)
new_shape = list(arr.shape)
new_shape[c_idx] = new_c
new_shape.pop(s_idx)
arr = arr.reshape(tuple(new_shape))
return arr, axes_out
# zarr path:
if zarr_store not in (None, "memory", "disk"):
raise ValueError(f"_fold_samples_axis_into_channel: invalid zarr_store={zarr_store!r}")
if zarr_store is None:
# policy: if caller did not request Zarr persistence, we materialize
if verbose:
print(" Info: Zarr input but zarr_store=None. Materializing to NumPy for S->C folding.")
arr = np.asarray(image[...])
return _fold_samples_axis_into_channel(arr, axes, zarr_store=None, verbose=verbose)
if verbose:
print(" Info: Found sample axis 'S' and channel axis 'C'. Folding S into C (Zarr, slice-wise).")
# build output shape by replacing C with C*S and dropping S:
src = image
src_shape = src.shape
c_size = int(src_shape[c_idx])
s_size = int(src_shape[s_idx])
new_c = c_size * s_size
out_shape = list(src_shape)
out_shape[c_idx] = new_c
out_shape.pop(s_idx)
out_shape = tuple(out_shape)
# determine output chunks based on axes_out and out_shape:
out_chunks = compute_default_chunks(out_shape, axes_out)
# create output Zarr array:
if zarr_store == "memory":
store = zarr.storage.MemoryStore()
dst = zarr.open(store=store, mode="w", shape=out_shape, dtype=src.dtype, chunks=out_chunks)
else:
if cache_folder is None:
raise ValueError("_fold_samples_axis_into_channel: cache_folder must be provided for zarr_store='disk'")
os.makedirs(cache_folder, exist_ok=True)
out_path = os.path.join(cache_folder, f"{base_name}_Sfold.zarr")
if os.path.exists(out_path):
shutil.rmtree(out_path)
dst = zarr.open(out_path, mode="w", shape=out_shape, dtype=src.dtype, chunks=out_chunks)
# copy slice-wise:
# we copy per outer index over all dims except (C, S, Y, X), and for each (c, s)
# write one (Y, X) plane into the correct folded channel.
iy = axes.index("Y")
ix = axes.index("X")
outer_axes = [k for k in range(len(axes)) if k not in (c_idx, s_idx, iy, ix)]
outer_shape = tuple(src_shape[k] for k in outer_axes)
total_outer = int(np.prod(outer_shape)) if outer_shape else 1
iterator = tqdm(np.ndindex(*outer_shape), total=total_outer, desc=" Folding S into C")
for outer_idx in iterator:
# build a template index for src of length src.ndim:
src_index = [slice(None)] * len(axes)
pos = 0
for k in outer_axes:
src_index[k] = outer_idx[pos]
pos += 1
# now loop channels and samples and copy planes:
for c in range(c_size):
for s in range(s_size):
src_index[c_idx] = c
src_index[s_idx] = s
# dest index is like src but without S, and C is folded:
dst_index = []
for k in range(len(axes)):
if k == s_idx:
continue
if k == c_idx:
dst_index.append(c * s_size + s)
else:
dst_index.append(src_index[k])
dst[tuple(dst_index)] = src[tuple(src_index)]
return dst, axes_out
# function to pick first array from zarr group according OMIO multi-series policy:
def _zarr_pick_first_array(z, prefer_keys=("0",), verbose=True):
"""
Return a Zarr array from a Zarr object that might be a Group.
Policy: prefer common full-resolution keys ("0"), otherwise take the first array-like entry.
"""
# already an array-like object:
if hasattr(z, "shape") and hasattr(z, "dtype"):
return z
# group-like: try to find arrays:
keys = []
try:
# zarr Group has keys() in both zarr2 and zarr3:
keys = list(z.keys())
except Exception:
keys = []
# 1) prefer known keys:
for k in prefer_keys:
if k in keys:
cand = z[k]
if hasattr(cand, "shape") and hasattr(cand, "dtype"):
if verbose:
print(f" Info: Zarr Group detected. Using array key '{k}' with shape {cand.shape}.")
return cand
# 2) otherwise take the first array-like entry in sorted key order:
for k in sorted(keys):
cand = z[k]
if hasattr(cand, "shape") and hasattr(cand, "dtype"):
if verbose:
print(f" Info: Zarr Group detected. Using first array-like key '{k}' with shape {cand.shape}.")
return cand
raise TypeError(
"read_tif: aszarr=True returned a Zarr Group, but no array-like entries were found.")
# helper-function to copy large arrays in (Y,X) slices memory-friendly into Zarr:
def _copy_to_zarr_in_xy_slices(src, dst, desc="slice-wise copying to Zarr"):
"""
Copy an array to a Zarr destination by streaming (Y, X) slices.
This helper performs a memory-friendly copy from `src` to `dst` by iterating
over all outer dimensions and copying one full spatial plane at a time. It is
intended for large arrays where copying the entire dataset into RAM would be
undesirable.
The function assumes that the last two axes of `src` and `dst` correspond to
the spatial dimensions (Y, X). For arrays with two or fewer dimensions, the
copy is performed in a single assignment.
Parameters
----------
src : array-like
Source array supporting NumPy-style slicing. Typically a Zarr array or a
NumPy array.
dst : zarr.core.array.Array or array-like
Destination array supporting NumPy-style slicing and assignment. Typically
a Zarr array that has the same shape as `src`.
desc : str, optional
Description passed to the progress bar. Default is
``"slice-wise copying to Zarr"``.
Returns
-------
None
Notes
-----
* The copy is performed slice-wise over all indices of ``src.shape[:-2]`` and
transfers full ``(:, :)`` planes for the last two dimensions.
* The function does not perform shape or dtype validation; callers are expected
to ensure compatibility between `src` and `dst`.
* Progress reporting is provided via ``tqdm``.
"""
src_shape = src.shape
# trivial case: 0D, 1D or 2D -> copy in one go:
if len(src_shape) <= 2:
dst[...] = src[...]
return
outer_shape = src_shape[:-2]
# determine number of slices to process for tqdm:
total = int(np.prod(outer_shape))
for outer_idx in tqdm(np.ndindex(*outer_shape), total=total, desc=desc):
# build full index: (i0, i1, ..., i_{n-3}, :, :)
idx = outer_idx + (slice(None), slice(None))
dst[idx] = src[idx]
# function to compute default chunking for Zarr arrays out of image shape and axes:
def compute_default_chunks(shape, axes, max_xy_chunk=1024):
"""
Compute a default chunk pattern for Zarr arrays given a shape and axis string.
Policy:
- All non-spatial axes (e.g. T, Z, C) are chunked with size 1.
- Spatial axes Y and X get chunk sizes up to `max_xy_chunk`,
limited by the actual dimension size.
- The order of chunk sizes follows `shape` and `axes` one-to-one.
Parameters
----------
shape : tuple of int
Full array shape, e.g. (T, Z, C, Y, X).
axes : str
Axis string describing the layout, e.g. "TZCYX".
max_xy_chunk : int, optional
Maximum chunk size along Y and X. Defaults to 1024.
Returns
-------
tuple of int
Chunk sizes for each axis, same length as `shape`.
"""
if len(shape) != len(axes):
raise ValueError(
f"Shape {shape} and axes '{axes}' have different lengths "
f"({len(shape)} vs {len(axes)}).")
chunks = [1] * len(shape)
axis_to_index = {ax: i for i, ax in enumerate(axes)}
# Y chunk:
if "Y" in axis_to_index:
iy = axis_to_index["Y"]
chunks[iy] = min(shape[iy], max_xy_chunk)
# X chunk:
if "X" in axis_to_index:
ix = axis_to_index["X"]
chunks[ix] = min(shape[ix], max_xy_chunk)
return tuple(chunks)
# function to find a single yaml file in a folder (used for Thorlabs RAW metadata):
def _find_single_yaml(folder):
"""
Locate a single YAML metadata file in a directory.
This helper scans a directory for files with ``.yaml`` or ``.yml`` extensions
and returns the path to a YAML file if present. It is primarily used to locate
Thorlabs RAW metadata stored alongside image data.
If no YAML files are found, the function returns ``None``. If multiple YAML
files are present, a warning is issued and the first file encountered is
returned.
Parameters
----------
folder : str
Path to the directory to be searched.
Returns
-------
yaml_path : str or None
Full path to the YAML file if at least one is found, otherwise ``None``.
Notes
-----
* When multiple YAML files are detected, the function does not attempt to
disambiguate them beyond issuing a warning.
* The order in which files are inspected follows ``os.listdir`` and is
therefore platform-dependent.
"""
yamls = [f for f in os.listdir(folder) if f.lower().endswith((".yaml", ".yml"))]
if len(yamls) == 0:
return None
if len(yamls) > 1:
warnings.warn(
f"Multiple YAML metadata files found\n in {folder}: \n {yamls}\n"
" Please keep exactly one .yaml/.yml file for Thorlabs RAW metadata.\n"
" Will now take the first one found.")
return os.path.join(folder, yamls[0])
def _get_primary_czi_scene(czi_file):
"""
Return the first available scene object from a ``czifile.CziFile``.
Recent ``czifile`` revisions moved axis-related metadata from the file object
to scene-level ``CziImage`` objects. This helper provides a version-tolerant
way to obtain that primary scene.
"""
scenes = getattr(czi_file, "scenes", None)
if scenes is None:
return None
try:
return scenes[0]
except Exception:
pass
if hasattr(scenes, "values"):
try:
return next(iter(scenes.values()))
except Exception:
pass
try:
first = next(iter(scenes))
except Exception:
return None
if hasattr(first, "axes") or hasattr(first, "dims"):
return first
try:
return scenes[first]
except Exception:
return None
def _get_czi_axes(czi_file):
"""
Resolve axis metadata across old and new ``czifile`` APIs.
"""
axes = getattr(czi_file, "axes", None)
if isinstance(axes, str) and axes:
return axes
scene = _get_primary_czi_scene(czi_file)
if scene is not None:
scene_axes = getattr(scene, "axes", None)
if isinstance(scene_axes, str) and scene_axes:
return scene_axes
dims = getattr(scene, "dims", None)
if dims:
return "".join(str(axis) for axis in dims)
raise AttributeError(
"Could not determine CZI axes from czifile metadata. "
"Neither CziFile.axes nor scene axes/dims were available."
)
def _get_czi_metadata_dict(czi_file):
"""
Return structured CZI metadata across old and new ``czifile`` APIs.
"""
metadata_func = getattr(czi_file, "metadata", None)
if not callable(metadata_func):
raise AttributeError("czifile CziFile object does not provide metadata().")
try:
metadata = metadata_func(asdict=True)
except TypeError:
metadata = None
if isinstance(metadata, dict):
return metadata
try:
metadata = metadata_func(raw=False)
except TypeError as exc:
raise TypeError(
"Could not retrieve structured CZI metadata from czifile. "
"Expected either metadata(asdict=True) or metadata(raw=False)."
) from exc
if isinstance(metadata, dict):
return metadata
raise TypeError(
"czifile metadata() did not return a dictionary for CZI metadata extraction."
)
# function to load yaml metadata (used for Thorlabs RAW metadata):
def _load_yaml_metadata(yaml_path):
"""
Load YAML metadata from a file into a dictionary.
This helper reads a YAML file from disk and parses its contents into a Python
dictionary. It is intended for loading auxiliary metadata, such as Thorlabs RAW
metadata stored alongside image data.
The function requires PyYAML to be installed and uses ``yaml.safe_load`` for
parsing. Empty YAML files are treated as empty dictionaries.
Parameters
----------
yaml_path : str
Path to the YAML metadata file.
Returns
-------
data : dict
Dictionary containing the parsed YAML metadata. If the file is empty, an
empty dictionary is returned.
Raises
------
ImportError
If PyYAML is not installed.
ValueError
If the top-level YAML object is not a mapping/dictionary.
Notes
-----
* Parsing is performed using ``yaml.safe_load`` to avoid execution of arbitrary
code.
* The function assumes UTF-8 encoding when reading the file.
"""
if yaml is None:
raise ImportError(
"PyYAML is not installed, but a YAML metadata file was found. "
"Install with: pip install pyyaml")
with open(yaml_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f)
if data is None:
data = {}
if not isinstance(data, dict):
raise ValueError(f"YAML file {yaml_path} must contain a mapping/dict at top-level.")
return data
# function that creates a dummy YAML files at fname's folder with the required keys:
def create_thorlabs_raw_yaml(fname: str,
T: int = 1, Z: int = 1, C: int = 1, Y: int = 1024, X: int=1024, bits: int = 16,
physicalsize_xyz: Union[tuple, list, None] = None,
pixelunit: str = "micron",
time_increment: Union[float, None] = None, time_increment_unit: Union[str, None] = None,
annotations: Union[dict, None] = None, verbose: bool = True):
"""
Create a dummy YAML file with the required keys for Thorlabs RAW metadata.
This utility generates a YAML file in the same folder as the specified RAW file
(`fname`) containing the necessary keys for reading the RAW file with
`read_thorlabs_raw`. The generated YAML file serves as a metadata source when
no XML metadata is available.
Parameters
----------
fname : str
Path to the Thorlabs RAW file. The YAML file will be created in the same
folder.
T : int
Number of time points. Default is 1.
Z : int
Number of Z slices. Default is 1.
C : int
Number of channels. Default is 1.
Y : int
Image height in pixels. Default is 1024.
X : int
Image width in pixels. Default is 1024.
bits : int
Bit depth per pixel (e.g., 8, 16, 32). Default is 16.
physicalsize_xyz : tuple of float or None, optional
Voxel sizes in the order ``(PhysicalSizeX, PhysicalSizeY, PhysicalSizeZ)``.
Default is None.
pixelunit : str, optional
Unit string for pixel sizes. Default is ``"micron"``.
time_increment : float or None, optional
Time increment between frames. Default is None.
time_increment_unit : str or None, optional
Unit for the time increment. Default is None.
annotations : dict or None, optional
Additional key-value pairs to include in the YAML file. Default is None.
verbose : bool, optional
If True, print diagnostic messages. Default is True.
Returns
-------
None
Raises
------
IOError
If the YAML file cannot be written.
Notes
-----
* The generated YAML file includes the required keys for Thorlabs RAW reading.
* Additional annotations can be included via the `annotations` parameter.
"""
folder = os.path.dirname(fname)
fname_base, _ = os.path.splitext(os.path.basename(fname))
yaml_path = os.path.join(folder, fname_base + "_metadata.yaml")
ymd = {
"T": T,
"Z": Z,
"C": C,
"Y": Y,
"X": X,
"bits": bits,
}
if physicalsize_xyz is not None:
ymd["PhysicalSizeX"] = physicalsize_xyz[0]
ymd["PhysicalSizeY"] = physicalsize_xyz[1]
ymd["PhysicalSizeZ"] = physicalsize_xyz[2]
if pixelunit is not None:
ymd["PixelUnit"] = pixelunit
if time_increment is not None:
ymd["TimeIncrement"] = time_increment
if time_increment_unit is not None:
ymd["TimeIncrementUnit"] = time_increment_unit
if annotations is not None:
ymd.update(annotations)
with open(yaml_path, "w") as f:
yaml.dump(ymd, f)
if verbose:
print(f"Created dummy YAML metadata file at {yaml_path}")
# function to require integer from dictionary (for housekeeping):
def _require_int(d, key):
"""
Retrieve and cast a dictionary value to an integer.
This helper enforces the presence of a specific key in a dictionary and returns
its value cast to an integer. It is intended for simple validation and
housekeeping tasks where integer-valued entries are required.
Parameters
----------
d : dict
Dictionary from which the value is retrieved.
key : hashable
Key that must be present in the dictionary.
Returns
-------
value : int
Integer value associated with `key`.
Raises
------
KeyError
If `key` is not present in the dictionary.
ValueError
If the value associated with `key` cannot be converted to an integer.
"""
if key not in d:
raise KeyError(key)
return int(d[key])
# function to check for not yet covered metadata in tifffile:
def _check_for_not_covered_metadata(tif, yet_covered_metadata, ignore_metadata=None):
"""
Report metadata entries provided by tifffile that are not yet handled.
This helper inspects a ``tifffile.TiffFile`` object for available ``*_metadata``
attributes beyond those that are already covered by the current implementation.
For each uncovered metadata entry that is present and non-null, a diagnostic
message is printed to inform the user that additional metadata types exist but
are not yet supported.
The function is intended as a developer and user-facing diagnostic to highlight
potentially relevant metadata formats and to encourage reporting of unsupported
cases.
Parameters
----------
tif : tifffile.TiffFile
Opened TIFF file object to be inspected for available metadata attributes.
yet_covered_metadata : iterable of str
Collection of metadata attribute names that are already handled and should
be ignored during inspection.
ignore_metadata : iterable of str or None, optional
Additional metadata attribute names to be ignored during inspection.
Returns
-------
None
Notes
-----
* The function looks for attributes whose names end with ``"_metadata"``.
* Metadata attributes listed in ``yet_covered_metadata`` are explicitly skipped.
* Only metadata attributes that exist and return a non-``None`` value are
reported.
* The function produces output via printing and does not return structured
information.
"""
available_methods = dir(tif)
available_metadata = []
for method_name in available_methods:
# we do not add imagej_metadata, ome_metadata or lsm_metadata again:
if method_name in yet_covered_metadata:
continue
if method_name.endswith("_metadata"):
try:
#metadata_value = getattr(tif, method_name)
available_metadata.append(method_name)
except Exception as e:
print(f" Could not read metadata '{method_name}': {e}")
#print("Available metadata entries in tifffile:", available_metadata.keys())
# loop through available_metadata and check, which tif.available_metadata[i] is not None:
not_readables = []
for metadata_name in available_metadata:
try:
metadata_value = getattr(tif, metadata_name)
if metadata_value is not None and (ignore_metadata is None or metadata_name not in ignore_metadata):
print(f" Found available metadata '{metadata_name}' which is not yet implemented. Please contact")
print(f" the developers at https://github.com/FabrizioMusacchio/omio/issues and provide")
print(f" details and an example file. Please refer to the documentation for more information.")
except Exception as e:
not_readables.append(metadata_name)
# print(f" _check_for_not_covered_metadata: Could not read metadata '{metadata_name}': {e}")
""" if len(not_readables) > 0:
print(f"\n _check_for_not_covered_metadata couldn't check all available metadata due to errors:\n {not_readables}") """
# function for post-hoc shifting non-reserved OME-metadata into Annotations:
# %% READER FUNCTIONS
# tif or lsm file reader (including series and paginated files):
[docs]
def read_tif(fname, physicalsize_xyz=None, pixelunit="micron",
zarr_store=None, return_list=False, verbose=True):
"""
Read TIFF family files into OMIO's canonical representation.
This function reads TIFF, OME-TIFF, multi file OME-TIFF series, and
Zeiss LSM files using `tifffile`, extracts available metadata (OME-XML, ImageJ
metadata, and LSM metadata), standardizes metadata keys, and normalizes axis
handling to canonical OME order TZCYX. Depending on configuration, the returned
image is either a NumPy array in RAM or a Zarr array backed by an in-memory or
on-disk store.
If the input is a paginated TIFF or LSM (axis "P"), OMIO splits the dataset into
individual pages and returns a list of images together with a list of matching
metadata dictionaries. In that case, lists are returned regardless of
`return_list`, because a single object return would be semantically ambiguous.
Parameters
----------
fname : str
Path to the input file. Note: read_tif is the core function
for TIF and LSM file reading; omio.read() dispatches to this function when
encountering a .tif or .lsm file. read_tif can only handle TIF and LSM files
but no folder paths (for this, please use read_thorlabs_raw_folder).
physicalsize_xyz : tuple of float or None, optional
Manual override for voxel sizes in the order
``(PhysicalSizeX, PhysicalSizeY, PhysicalSizeZ)``. If provided, these values
override metadata-derived sizes. If None, missing sizes fall back to 1.0.
Default is None.
pixelunit : str, optional
Unit string used for pixel size fields and unit normalization. Default is
``"micron"``.
zarr_store : {None, "memory", "disk"}, optional
Controls the representation of the returned image data.
* None: load fully into RAM and return a NumPy array
* "memory": return a Zarr array backed by an in-memory store
* "disk": return a Zarr array stored in the cache folder
``{parent}/.omio_cache/<basename>.zarr``
Default is None.
return_list : bool, optional
If True, force backward-compatible list return for non-paginated inputs by
returning ``[image]`` and ``[metadata]``. Default is False.
verbose : bool, optional
If True, print diagnostic progress messages. Default is True.
Returns
-------
image : np.ndarray or zarr.core.array.Array or list
Image data in canonical OME axis order TZCYX. For paginated inputs, a list
of per-page arrays is returned.
metadata : dict or list
Metadata dictionary aligned with the returned image. For paginated inputs,
a list of per-page metadata dictionaries is returned.
Raises
------
ValueError
If `zarr_store` is not one of {None, "memory", "disk"}.
Notes
-----
* Metadata sources are merged in the order they are read. Missing essentials
are filled from the image shape and default values.
* Unit normalization updates unit fields only. Numerical unit conversion is not
performed except for specific paginated LSM cases where Zeiss voxel sizes are
converted from meters to micrometers.
* If `zarr_store` is not None, tifffile's ``aszarr=True`` path is used and then
materialized into a concrete Zarr store to ensure predictable downstream
behavior. Data transfer uses slice-wise copying over the last two spatial
dimensions to limit peak memory use.
* Axis normalization to TZCYX may insert singleton dimensions for missing OME
axes and may reorder existing axes. The updated axis string is stored in the
returned metadata.
* When `zarr_store="disk"`, the function may create and overwrite paths under
``.omio_cache``.
* Multi-file OME-TIFF series are supported. In this layout, individual OME-TIFF
files each store subsets of the full dataset (e.g. single time points,
channels, or z-slices). OMIO/tifffile reconstructs the complete logical image by
following the OME-XML metadata references across files. It is therefore
sufficient to pass the path of a single file belonging to the series; all
referenced files are discovered and read implicitly. The resulting image is
returned as a contiguous and complete stack in canonical OME axis order.
General note on series and pages
--------------------------------
TIFF family containers can store data in two different structural layers that are
easy to confuse:
* Series are top level image datasets within a container. Each series can have its
own dimensionality, axis semantics, pixel type, and metadata context. In tifffile,
these are exposed via `tif.series`.
* Pages are the lower level IFD entries that physically store image planes or tiles.
Depending on the file layout, pages can represent planes along Z, C, or T, pyramid
levels, tiles, or other internal subdivisions. In tifffile, these are exposed via
`tif.pages`.
In many microscopy TIFF variants, tifffile reconstructs a logical N dimensional array
for a series by reading and stacking its pages. The exact mapping depends on the file
and on tifffile’s internal interpretation of the container structure. OMIO therefore
treats `tif.series` as the authoritative high level grouping and applies explicit,
deterministic policies where the container structure could otherwise lead to ambiguous
outcomes.
OMIO behavior for paginated files
----------------------------------
Some TIFF and LSM files are stored as paginated stacks and expose an explicit pagination
axis `P` in the inferred axis string. OMIO treats pagination as a semantic split into
independent image stacks:
* If the input is detected as paginated (axis `P` present), OMIO splits the dataset into
per page images and returns `images` and `metadatas` as lists with matching length.
* Lists are returned regardless of `return_list`, because a single object return would be
semantically ambiguous once pagination is present.
* Each returned metadata dictionary corresponds to exactly one page and reflects the page
specific axis string with the pagination axis removed.
* If `zarr_store` is set, each page is materialized into its own Zarr array according to
the selected backend (memory or disk).
* After splitting, OMIO applies axis normalization to each page so that each page is
returned in canonical OME axis order.
OMIO restrictions for multi-series TIFF/LSM files
-------------------------------------------------
TIFF and LSM containers may store multiple datasets ("series") in a single file.
While tifffile exposes these as `tif.series`, OMIO enforces a strict and predictable
policy to avoid ambiguous interpretations:
* If a file contains exactly one series (`len(tif.series) == 1`), OMIO guarantees
correct reading and normalization to canonical OME axis order (TZCYX).
* If a file contains multiple series (`len(tif.series) > 1`), OMIO will process
**only the first series (series 0)** and ignore all others.
* A warning is emitted in this case, and the policy decision is recorded in the
returned metadata.
* OMIO does not attempt to infer relationships between multiple series, does not
concatenate them, and does not inspect their shapes, axes, or photometric
interpretation beyond series 0.
This policy is intentional and favors reproducibility and explicit behavior over
heuristic reconstruction of complex TIFF layouts.
"""
# validate zarr_store parameter:
if zarr_store not in (None, "memory", "disk"):
raise ValueError(
"read_tif: zarr_store must be one of None, 'memory', or 'disk'. "
f"Got: {zarr_store!r}")
# check, whether the user wants to set the pixel size manually:
if not physicalsize_xyz:
physicalsize_xyz_ext = (1.0,1.0,1.0)
set_input_pixelsize = False
else:
physicalsize_xyz_ext = tuple(float(v) for v in physicalsize_xyz)
set_input_pixelsize = True
# read the tif file:
with tifffile.TiffFile(fname) as tif:
# find out, how many series/pages exist:
nseries = len(tif.series)
npages = len(tif.pages)
# OMIO multi-series policy:
if nseries > 1:
if verbose:
print(
f"WARNING: OMIO detected a multi-series TIFF/LSM file with {nseries} series.\n"
f" OMIO currently processes only the first series (series 0).\n"
f" All additional series are ignored.")
# record policy decision in metadata later:
series_shapes = []
series_axes = []
series_photometric = []
for i in range(nseries):
try:
series_shapes.append(list(tif.series[i].shape))
except Exception:
series_shapes.append(None)
try:
series_axes.append(str(tif.series[i].axes))
except Exception:
series_axes.append(None)
try:
series_photometric.append(str(tif.series[i].pages[0].photometric.name))
except Exception:
series_photometric.append(None)
multi_series_info = {"OMIO_MultiSeriesDetected": True,
"OMIO_TotalSeries": nseries,
"OMIO_ProcessedSeries": 0,
"OMIO_MultiSeriesPolicy": "only_series_0",
"OMIO_MultiSeriesShapes": series_shapes,
"OMIO_MultiSeriesAxes": series_axes,
"OMIO_MultiSeriesPhotometric": series_photometric}
else:
multi_series_info = {"OMIO_MultiSeriesDetected": False}
"""
The difference between series and pages:
A TIFF file can contain multiple SERIES, each representing a distinct
image dataset with its own dimensions and metadata. Each series can be
composed of multiple PAGES, where each page corresponds to a single image
plane or slice within that series. Thus, series are higher-level groupings
of related image data, while pages are the individual components that make
up those datasets.
However, "pages" in tifffile can also refer to channels or slices within
a single series, depending on the context. This dual usage can lead to confusion.
Furthermore, tifffile sometimes reads paginated tiffs as an array of image series
in paginated images, but sometimes it only reads the first series and skips the
rest. Thus, we would need to check whether a single image is read, but nseries > 1
exist. This is complicated at the moment, as I do not know how tifffile decides
along which axes it concatenates SERIES into a single array and when it does not.
I.e., I can not simply check whether len(image) == nseries, and if not, try to
loop over tif.series to read all series separately. Thus, for now, we need to
restrict OMIO's tif reader to only allow cases where either a single series exists
(single stack case) or where the tif is paginated (for me, this seems only to be
the case for paginated LSM files so far).
In lsm files, what I figured out so far, is, that the series are sets of different
image scales of the same data (e.g., downsampled versions) + some photographed image
description sheets. Thus, if tifffile fetches in multiple series only the first,
multi-layered image series, that seems to be okay.
Update: I think, I figured it out that tifffile reads a multi-series tiff/lsm
into a single array only if all series have the same shape and axes. And this is
what we accept for now in OMIO, i.e., we do not guarantee to read other mixed
multi-series shapes.
"""
# read image data either fully into RAM or as Zarr;
# first, NumPy array in RAM:
if zarr_store is None:
if verbose:
print("Reading TIFF fully into RAM...")
image = tif.asarray()
""" print(f"len(tif.series): {len(tif.series)}, nseries: {nseries}, len(image): {len(image)}")
print(f"image.shape: {image.shape}")
for series in range(len(tif.series)):
print(f"tif.series[series].axes: {tif.series[series].axes}, tif.series[series].shape: {tif.series[series].shape}")
tags = []
for tag in range(len(tif.pages)):
tags.append(tif.pages[tag].tags)
for tag in tags:
for key in tag.keys():
print(key, tag[key])
print("-----") """
""" DRAFT for multi-series handling; see comments above and herein:
print(f"len(tif.series): {len(tif.series)}, nseries: {nseries}, len(image): {len(image)}")
for series in range(len(tif.series)):
print(tif.series[series].axes, tif.series[series].shape)
#print(tif.series[series].pages.shape)
if len(tif.series) > 1 and len(tif.series[0].shape) == 3:
# len(tif.series[0].shape) == 3 ensures that we get a true RGB YXS image
# try to read all series separately into a list:
image_list = []
image_list.append(tifffile.imread(fname, series=0)) # read first series
image0_shape = tif.series[0].shape
image0_axes = tif.series[0].axes
for series in range(1, len(tif.series)):
if tif.series[series].axes == image0_axes and tif.series[series].shape == image0_shape:
image_list.append(tifffile.imread(fname, series=series))
# after all series are read, concatenate all arrays in the list...but along which axis?
# For now, I can't resolve this, so this if-block is disabled and the restrict OMIO to only
# guarantee single-series or lsm paginated tiffs with non-complex axis/series/pages layouts.
# UPDATE: We do it like FIJI: We concatenate in T so that we get a TZCYX array in the end.
# create an empty array with the final shape:
T_N = len(image_list)
final_shape = (T_N,) + image0_shape
image = np.zeros(final_shape, dtype=image_list[0].dtype)
for t in range(T_N):
image[t, ...] = image_list[t]
else:
image = tif.asarray()
"""
else:
if verbose:
print("Reading TIFF as Zarr...")
src_store = tifffile.imread(fname, aszarr=True)
src = zarr.open(src_store, mode="r")
# IMPORTANT: OME-TIFF and pyramidal TIFFs may open as a Zarr Group, not an Array.
# OMIO policy: only use one dataset, deterministically.
src_array = _zarr_pick_first_array(src, prefer_keys=("0",), verbose=verbose)
image = src_array # from here on, we require array semantics (shape, dtype, slicing)
# create target Zarr (memory or disk):
fname_base, _ = os.path.splitext(os.path.basename(fname))
chunks = getattr(src_array, "chunks", None)
# If chunks are not known, compute them from shape/axes later after metadata exists.
# For now, keep a placeholder and compute after _ensure_axes_in_metadata().
target = None
#image = src # temporary; may be replaced by target after we know axes/chunks
""" DRAFT warning for multi-series handling; see comments above and herein:
# I cannot do the following check here, as an RGB is read like YXS and thus
# len(image) equal the size of Y, which is not what we want to check here.
# warn user if we have multi-series tif but only a single image read:
if len(tif.series)>1 and len(image) == 1:
print(f"WARNING: read_tif: Encountered multi-series TIFF with {len(tif.series)} series,")
print(f" but only a single image array was read with shape {image.shape}.")
print(f" OMIO currently only guarantees correct reading of single-series")
print(f" TIFF files or paginated LSM files. Please report this issue to")
print(f" the developers at https://github.com/FabrizioMusacchio/omio/issues.")
"""
image_shape = image.shape
# try to extract metadata from tag pages (if any):
try:
tags = []
for tag in range(len(tif.pages)):
tags.append(tif.pages[tag].tags)
except Exception:
tags = None
"""
for tag in tags:
for key in tag.keys():
print(key, tag[key])
print("-----")
tags = tif.pages[0].tags
for key in tags.keys():
print(key, tags[key])
"""
imagej_metadata = tif.imagej_metadata
ome_metadata = tif.ome_metadata
lsm_metadata = tif.lsm_metadata
#shaped_metadata = tif.shaped_metadata
# check for not yet covered metadata and give feedback to user (if any):
yet_covered_metadata = ["imagej_metadata", "ome_metadata", "lsm_metadata"]
ignore_metadata = ["shaped_metadata"] # empirically, shaped_metadata this always contains
# just the image shape, so we ignore it for now
_check_for_not_covered_metadata(tif, yet_covered_metadata, ignore_metadata)
metadata = {}
if ome_metadata is not None:
md_ome = _parse_ome_metadata(ome_metadata)
metadata.update(md_ome)
#metadata = _parse_ome_metadata(ome_metadata) # extract relevant fields from OME-XML
metadata = _add_file_properties_to_metadata(metadata, fname, original_metadata_type="OME_XML")
#metadata["axes"], metadata["shape"] = _extract_axes_from_ome(ome_metadata) # this is actually obsolete, as we overwrite it later
if imagej_metadata is not None:
md_ij = _standardize_imagej_metadata(imagej_metadata, tags=tags, verbose=verbose)
metadata.update(md_ij)
metadata = _add_file_properties_to_metadata(metadata, fname, original_metadata_type="imagej_metadata")
if lsm_metadata is not None:
md_lsm = _standardize_lsm_metadata(lsm_metadata)
metadata.update(md_lsm)
#metadata = _standardize_lsm_metadata(lsm_metadata) # correct lsm keys
metadata = _add_file_properties_to_metadata(metadata, fname, original_metadata_type="lsm_metadata")
# let's check whether metadata is empty; if so, we create a minimal default
# description based only on image shape and a unit-less pixel grid:
if not metadata:
# populate metadata with the default keys from _standardize_imagej_metadata; put as
# PhysicalSizeX/Y/Z -> 1.0 and SizeX/Y/Z -> image.shape accordingly:
# First, we need to check whether we read an RGB tif; in this case, the axes order
# differs from {T/C/Z}YX to YX{T/C/Z/S}; this we can find out via a key in tags[0]',
# that looks like: 262 TiffTag 262 PhotometricInterpretation @58 SHORT @66 = RGB.
# We only take into account tags[0], i.e., the first page's tags, as we assume that
# all pages have the same PhotometricInterpretation. OMIO cannot handle multi-page
# tif with mixed photometric interpretations at the moment. Therefore:
# NOTE: Under OMIO policy, only series 0 is considered. RGB detection via the first
# page is therefore sufficient and INTENTIONALLY limited in scope.
try:
photometric = tags[0].get("PhotometricInterpretation", None).value
except Exception:
photometric = None
if photometric is not None and photometric == photometric.RGB:
# RGB tif; we need to address the axes differently;
# photometric == photometric.MINISBLACK would be grayscale tif and we would thus
# have default axes {T/C/Z}YX handling.
if len(image_shape) == 3:
# with our current knowledge of RGB tif file structures, we can assume that the
# shape is (SizeY, SizeX, SizeC), and, thus, we can only have 3 axes:
# extract SizeX, SizeY, SizeC from shape correctly:
sizey = image_shape[-3]
sizex = image_shape[-2]
sizec = image_shape[-1]
sizez = 1
metadata = {
"SizeX": sizex,
"SizeY": sizey,
"SizeZ": sizez,
"SizeC": sizec,
"PhysicalSizeX": 1.0,
"PhysicalSizeY": 1.0,
"PhysicalSizeZ": 1.0,
"unit": pixelunit,
"PhysicalSizeXUnit": pixelunit,
"PhysicalSizeYUnit": pixelunit,
"PhysicalSizeZUnit": pixelunit,
'original_metadata_type': 'multipage RGB TIFF'}
else:
# unexpected shape for RGB tif:
raise ValueError(
f"read_tif: Encountered RGB TIFF with unexpected shape {image_shape}. "
"Expected shape (SizeY, SizeX, SizeC). Please report this issue "
"to the developers at https://github.com/FabrizioMusacchio/omio/issues.")
else:
metadata = {
"SizeX": image.shape[-1] if len(image.shape)>=1 else 1,
"SizeY": image.shape[-2] if len(image.shape)>=2 else 1,
"SizeZ": image.shape[-3] if len(image.shape)>=3 else 1,
"PhysicalSizeX": 1.0,
"PhysicalSizeY": 1.0,
"PhysicalSizeZ": 1.0,
"unit": pixelunit,
"PhysicalSizeXUnit": pixelunit,
"PhysicalSizeYUnit": pixelunit,
"PhysicalSizeZUnit": pixelunit}
metadata = _add_file_properties_to_metadata(metadata, fname, original_metadata_type="N/A")
# fallback if SizeX/Y/Z are missing:
if "SizeX" not in metadata:
metadata["SizeX"] = image.shape[-1] if len(image.shape)>=1 else 1
if "SizeY" not in metadata:
metadata["SizeY"] = image.shape[-2] if len(image.shape)>=2 else 1
if "SizeZ" not in metadata:
metadata["SizeZ"] = image.shape[-3] if len(image.shape)>=3 else 1
# tiffwriter has problems with the µ-symbol, thus we replace it by "micron":
# UPDATE: this is OBSOLETE as we use OME-XML for writing metadata now!
metadata = _metadata_units_check(metadata, pixelunit=pixelunit)
# fallback/ensure basic physical sizes exist:
if "PhysicalSizeX" not in metadata:
print(f"WARNING: PhysicalSizeX missing in metadata; setting to default or user-provided value: {physicalsize_xyz_ext[0]}")
metadata["PhysicalSizeX"] = physicalsize_xyz_ext[0]
if "PhysicalSizeY" not in metadata:
print(f"WARNING: PhysicalSizeY missing in metadata; setting to default or user-provided value: {physicalsize_xyz_ext[1]}")
metadata["PhysicalSizeY"] = physicalsize_xyz_ext[1]
if "PhysicalSizeZ" not in metadata:
print(f"WARNING: PhysicalSizeZ missing in metadata; setting to default or user-provided value: {physicalsize_xyz_ext[2]}")
metadata["PhysicalSizeZ"] = physicalsize_xyz_ext[2]
# annotate OMIO multi-series policy in metadata
if "multi_series_info" in locals():
metadata.update(multi_series_info)
# ensure shape correctness in metadata:
metadata = _ensure_shape_in_metadata(metadata, image_shape)
# ensure axes correctness in metadata:
metadata = _ensure_axes_in_metadata(metadata, tif)
# conversion factor from meter to micrometer:
conv_um = 10 ** 6
# sanity check for read Zarr array existence:
if zarr_store is not None and not isinstance(image, zarr.core.array.Array):
# This branch should not happen: image is either np.ndarray (None) or zarr.Array (aszarr path)
pass
# materialize from tifffile's aszarr-backed array into a real Zarr store (if Zarr):
if zarr_store is not None:
if verbose:
print(f" zarr_store requested: {zarr_store}")
print(f" Preparing target Zarr array on/in {zarr_store}...")
# get fname base for cache path:
fname_base, _ = os.path.splitext(os.path.basename(fname))
# compute robust chunks using our helper (preferred over tifffile's internal chunking):
chunks = compute_default_chunks(image.shape, metadata["axes"])
if verbose:
print(f" Using chunks: {chunks} (image shape is {image.shape}, axes are '{metadata['axes']}')")
if zarr_store == "memory":
store = zarr.storage.MemoryStore()
zarr_array = zarr.open(
store=store,
mode="w",
shape=image.shape,
dtype=image.dtype,
chunks=chunks)
else:
zarr_cache_folder = os.path.join(os.path.dirname(fname), ".omio_cache")
os.makedirs(zarr_cache_folder, exist_ok=True)
zarr_cache_path = os.path.join(zarr_cache_folder, fname_base + ".zarr")
if os.path.exists(zarr_cache_path):
shutil.rmtree(zarr_cache_path)
zarr_array = zarr.open(
zarr_cache_path,
mode="w",
shape=image.shape,
dtype=image.dtype,
chunks=chunks)
# Copy strategy: for TIFF, the source is already lazy and chunked; slice-wise XY copy is still safe.
if verbose:
print(" Copying TIFF data into Zarr...")
_copy_to_zarr_in_xy_slices(image, zarr_array, desc=" Slice-wise copying TIFF to Zarr")
image = zarr_array # from now on, downstream uses Zarr
# fold sample axis 'S' into channel axis 'C' while keeping Zarr (if requested)
if "S" in metadata["axes"]:
fname_base, _ = os.path.splitext(os.path.basename(fname))
if zarr_store == "disk":
cache_folder = os.path.join(os.path.dirname(fname), ".omio_cache")
else:
cache_folder = None
image, metadata["axes"] = _fold_samples_axis_into_channel(
image,
metadata["axes"],
zarr_store=zarr_store,
cache_folder=cache_folder,
base_name=fname_base,
verbose=verbose)
image_shape = image.shape
metadata = _ensure_shape_in_metadata(metadata, image_shape)
# handle paginated TIFFs (axis 'P'):
if "P" in metadata["axes"]:
axis_to_use = "P"
""" if "P" in metadata["axes"]:
axis_to_use = "P"
elif "S" in metadata["axes"]:
axis_to_use = "S"
# rename S to P in axes string for consistency:
#metadata["axes"] = metadata["axes"].replace("S", "P") """
if verbose:
print(f" Detected paginated TIFF/LSM (axis '{axis_to_use}'); splitting into individual pages.")
p_index = metadata["axes"].index(axis_to_use)
metadata["original_metadata_type"] = "paginated_tif/lsm"
try:
multi_page_metadata = tif.pages[0].tags["CZ_LSMINFO"].value
metadata["PhysicalSizeX"] = multi_page_metadata["VoxelSizeX"] * conv_um
metadata["PhysicalSizeY"] = multi_page_metadata["VoxelSizeY"] * conv_um
metadata["PhysicalSizeZ"] = multi_page_metadata["VoxelSizeZ"] * conv_um
metadata["original_metadata_type"] = "CZ_LSMINFO"
except Exception:
metadata["PhysicalSizeX"] = physicalsize_xyz_ext[0]
metadata["PhysicalSizeY"] = physicalsize_xyz_ext[1]
metadata["PhysicalSizeZ"] = physicalsize_xyz_ext[2]
metadata["original_metadata_type"] = "N/A"
if set_input_pixelsize:
metadata["PhysicalSizeX"] = physicalsize_xyz_ext[0]
metadata["PhysicalSizeY"] = physicalsize_xyz_ext[1]
metadata["PhysicalSizeZ"] = physicalsize_xyz_ext[2]
metadata["spacing"] = metadata["PhysicalSizeZ"]
metadata["PhysicalSizeXUnit"] = metadata["unit"]
metadata["PhysicalSizeYUnit"] = metadata["unit"]
metadata["OMIO_VERSION"] = _OMIO_VERSION
nP = image.shape[p_index]
#axes_wo_P = metadata["axes"].replace(axis_to_use, "")
p_index = metadata["axes"].index(axis_to_use)
axes_wo_P = metadata["axes"][:p_index] + metadata["axes"][p_index+1:]
images = []
metadatas = []
# For paginated stacks:
# if zarr_store is None: return NumPy pages
# if zarr_store is "disk": write per-page Zarr to disk (memory-friendly)
# if zarr_store is "memory": create per-page MemoryStore Zarr arrays
for p in range(nP):
# p = 0 # for testing only
slicer = [slice(None)] * image.ndim
slicer[p_index] = p
page_data = image[tuple(slicer)]
# Remove singleton P/S-axis if it still exists
# (Depending on backend, indexing may keep dims)
if page_data.ndim == image.ndim:
page_data = np.squeeze(page_data, axis=p_index)
page_md = metadata.copy()
page_md["axes"] = axes_wo_P
page_md["shape"] = page_data.shape
if zarr_store is None:
images.append(np.asarray(page_data))
else:
fname_base, _ = os.path.splitext(os.path.basename(fname))
page_shape = page_data.shape
chunks = compute_default_chunks(page_shape, axes_wo_P)
if verbose:
print(f" Page {p}: using chunks {chunks}")
if zarr_store == "memory":
store = zarr.storage.MemoryStore()
page_zarr = zarr.open(
store=store,
mode="w",
shape=page_shape,
dtype=page_data.dtype,
chunks=chunks)
else:
zarr_cache_folder = os.path.join(os.path.dirname(fname), ".omio_cache")
os.makedirs(zarr_cache_folder, exist_ok=True)
page_name = f"{fname_base}_P{p}.zarr"
page_path = os.path.join(zarr_cache_folder, page_name)
if os.path.exists(page_path):
shutil.rmtree(page_path)
page_zarr = zarr.open(
page_path,
mode="w",
shape=page_shape,
dtype=page_data.dtype,
chunks=chunks)
# Copy page data
_copy_to_zarr_in_xy_slices(page_data, page_zarr, desc=f" Copying page {p} to Zarr")
images.append(page_zarr)
# Post-hoc OME metadata checkup
page_md = OME_metadata_checkup(page_md, verbose=verbose)
metadatas.append(page_md)
# reorder to OME axes (batch):
memap_large_file = False
if zarr_store=="disk":
memap_large_file = True
images, metadatas = _batch_correct_for_OME_axes_order(images, metadatas, memap_large_file, verbose=verbose)
# for paginated inputs, always return lists, because splitting is the point (OMIO policy):
if verbose:
print(f" Finished splitting paginated TIFF into {nP} pages.")
print("Reading paginated TIFF completed.")
return images, metadatas
# normal single-stack TIFF handling:
metadata = _get_ome_image_sizes(image.shape, metadata)
# external pixel size override:
if set_input_pixelsize:
metadata["PhysicalSizeX"] = physicalsize_xyz_ext[0]
metadata["PhysicalSizeY"] = physicalsize_xyz_ext[1]
metadata["PhysicalSizeZ"] = physicalsize_xyz_ext[2]
# sanity fallback if physically unreasonable:
if metadata["PhysicalSizeX"] <= 0:
metadata["PhysicalSizeX"] = 1
if metadata["PhysicalSizeY"] <= 0:
metadata["PhysicalSizeY"] = 1
if metadata["PhysicalSizeZ"] <= 0:
metadata["PhysicalSizeZ"] = 1
metadata["spacing"] = metadata["PhysicalSizeZ"]
if metadata["PhysicalSizeXUnit"] is None:
metadata["PhysicalSizeXUnit"] = metadata["unit"]
if metadata["PhysicalSizeYUnit"] is None:
metadata["PhysicalSizeYUnit"] = metadata["unit"]
if metadata["PhysicalSizeZUnit"] is None:
metadata["PhysicalSizeZUnit"] = metadata["unit"]
if metadata["PhysicalSizeXUnit"] =="inch" or metadata["PhysicalSizeYUnit"] =="inch" or metadata["PhysicalSizeZUnit"] =="inch":
# print a warning, as inch is not a typical unit for microscopy images:
print("WARNING: read_tif detected pixel unit 'inch', which is unusual for microscopy images.")
print(" This can happen when ImageJ metadata is missing, could not be read correctly, or")
print(" old metadata conventions were used. Please verify the returned physical pixel")
print(" sizes in the original metadata.")
metadata["OMIO_VERSION"] = _OMIO_VERSION
# correct for OME axes order:
memap_large_file = False
if zarr_store=="disk":
memap_large_file = True
image, _, metadata["axes"] = _correct_for_OME_axes_order(image, metadata, memap_large_file, verbose=verbose)
# shape may have changed after axes reordering:
metadata["shape"] = image.shape
# post-hoc OME metadata checkup and correction;
metadata = OME_metadata_checkup(metadata, verbose=verbose)
if verbose:
print("Finished reading TIFF.")
if return_list:
return [image], [metadata]
else:
return image, metadata
# CZI file reader:
[docs]
def read_czi(fname, physicalsize_xyz=None, pixelunit="micron", zarr_store=None,
return_list=False, verbose=True):
"""
Read Zeiss CZI files into OMIO's canonical representation.
This function reads a Zeiss CZI file using `czifile`, extracts basic acquisition
metadata, filters and normalizes axes to the canonical OME axis convention
TZCYX, and optionally materializes the result as a Zarr array backed by an
in-memory store or an on-disk cache.
CZI pixel data are always read fully into RAM first, because lazy, memory-mapped
reading is not supported in this code path. Optional Zarr export therefore
represents an explicit post-read materialization step for downstream workflows
that benefit from chunked access or reduced peak RAM usage in later stages.
Parameters
----------
fname : str
Path to the CZI file. Note: read_czi is the core function
for Zeiss CZI file reading; omio.read() dispatches to this function when
encountering a .czi file. read_czi can only handle RAW files but no
folder paths (for this, please use read_thorlabs_raw_folder).
physicalsize_xyz : tuple of float or None, optional
Manual override for voxel sizes in the order
``(PhysicalSizeX, PhysicalSizeY, PhysicalSizeZ)``. If provided, these values
override metadata-derived sizes. If None, missing or invalid sizes fall back
to 1.0. Default is None.
pixelunit : str, optional
Unit string used for pixel size fields and unit normalization. Default is
``"micron"``.
zarr_store : {None, "memory", "disk"}, optional
Controls the representation of the returned image data.
* None: return a NumPy array in RAM
* "memory": return a Zarr array backed by an in-memory store
* "disk": return a Zarr array stored in the cache folder
``{parent}/.omio_cache/<basename>.zarr``
Existing on-disk stores at that location are replaced. Default is None.
return_list : bool, optional
If True, return ``[image]`` and ``[metadata]`` for backward compatibility.
Default is False.
verbose : bool, optional
If True, print diagnostic progress messages. Default is True.
Returns
-------
image : np.ndarray or zarr.core.array.Array
Image data in canonical OME axis order TZCYX. If `zarr_store` is not None,
the returned object is a Zarr array.
metadata : dict
Metadata dictionary aligned with the returned image, including axis and size
information and an ``Annotations`` block for non-core fields.
Raises
------
ValueError
If `zarr_store` is not one of {None, "memory", "disk"}.
Notes
-----
* Non-OME axes present in CZI files (for example B, V, or trailing singleton
axes) are collapsed by indexing at 0 so that only OME-relevant axes remain.
The resulting axis string is updated accordingly.
* Physical voxel sizes are extracted from the CZI scaling metadata and converted
to micrometer units using a fixed conversion factor. If values are missing or
non-positive, they fall back to 1.0.
* Axis reordering to TZCYX may insert singleton dimensions for missing OME axes
and may permute existing axes. The updated axis declaration is stored in the
returned metadata.
* When `zarr_store="disk"`, the function may create and overwrite paths under
``.omio_cache``.
"""
# validate zarr_store parameter
if zarr_store not in (None, "memory", "disk"):
raise ValueError(
"read_czi: zarr_store must be one of None, 'memory', or 'disk'. "
f"Got: {zarr_store!r}")
# determine whether pixel sizes were set manually
if not physicalsize_xyz:
physicalsize_xyz_ext = (1.0, 1.0, 1.0)
set_input_pixelsize = False
else:
physicalsize_xyz_ext = tuple(float(v) for v in physicalsize_xyz)
set_input_pixelsize = True
# read CZI into memory (no memory mapping possible)
if verbose:
print("Reading CZI fully into RAM...")
CZI_image = czi.imread(fname)
# initialize metadata:
metadata = {}
fname_base, fname_extension = os.path.splitext(os.path.basename(fname))
metadata["original_filetype"] = fname_extension[1:]
metadata["original_filename"] = fname_base + fname_extension
metadata["original_parentfolder"] = os.path.dirname(fname)
metadata["original_metadata_type"] = "czi_metadata"
try:
metadata["original_creation_or_change_date"] = datetime.datetime.fromtimestamp(
os.path.getctime(fname), datetime.UTC).strftime('%Y-%m-%dT%H:%M:%S')
except Exception:
metadata["original_creation_or_change_date"] = "N/A"
with czi.CziFile(fname) as CZI_metadata_obj:
# extract CZI axes (e.g. BVCTZYX0)
metadata["axes"] = _get_czi_axes(CZI_metadata_obj)
# extract scaling metadata:
czi_metadata_dict = _get_czi_metadata_dict(CZI_metadata_obj)
# filter unwanted non-OME axes (keep only TZCYX):
CZI_image, metadata["axes"] = _filter_image_data_for_ome_tif(CZI_image, metadata["axes"])
CZImetadata_xyz = (
czi_metadata_dict['ImageDocument']['Metadata']['Scaling']['Items']['Distance'])
conv_um = 10 ** 6
if isinstance(CZImetadata_xyz, dict):
CZImetadata_xyz = [CZImetadata_xyz]
for item in CZImetadata_xyz:
if item['Id'] == 'X':
metadata["PhysicalSizeX"] = item['Value'] * conv_um
elif item['Id'] == 'Y':
metadata["PhysicalSizeY"] = item['Value'] * conv_um
elif item['Id'] == 'Z':
metadata["PhysicalSizeZ"] = item['Value'] * conv_um
metadata["shape"] = CZI_image.shape
metadata["unit"] = pixelunit
# overwrite pixel sizes if provided externally
if set_input_pixelsize:
metadata["PhysicalSizeX"] = physicalsize_xyz_ext[0]
metadata["PhysicalSizeY"] = physicalsize_xyz_ext[1]
metadata["PhysicalSizeZ"] = physicalsize_xyz_ext[2]
# fallback if metadata not usable:
if metadata.get("PhysicalSizeX", 0) <= 0:
metadata["PhysicalSizeX"] = 1
if metadata.get("PhysicalSizeY", 0) <= 0:
metadata["PhysicalSizeY"] = 1
if metadata.get("PhysicalSizeZ", 0) <= 0:
metadata["PhysicalSizeZ"] = 1
# imagej compatibility (no µ symbol) ⟵ Actually, now obsolete as we write ome-tif only!
if metadata["unit"] == "µm":
metadata["unit"] = "micron"
metadata["spacing"] = metadata["PhysicalSizeZ"]
metadata["PhysicalSizeXUnit"] = metadata["unit"]
metadata["PhysicalSizeYUnit"] = metadata["unit"]
metadata["OMIO_VERSION"] = _OMIO_VERSION
# ensure SizeT, SizeZ, SizeC, SizeY, SizeX are consistent with current CZI_image
metadata = _get_ome_image_sizes(CZI_image.shape, metadata)
# OME axis reordering: NumPy path or streaming-Zarr path; as the stack still sits fully
# in RAM, we use _correct_for_OME_axes_order w/o memap_large_file logic:
CZI_image, metadata["shape"], metadata["axes"] = _correct_for_OME_axes_order(
CZI_image, metadata, memap_large_file=False, verbose=verbose)
# Optional Zarr-export: write the CZI array into .omio_cache ("disk") or into RAM ("memory")
if zarr_store is not None:
# compute suitable chunk sizes:
chunks = compute_default_chunks(CZI_image.shape, metadata["axes"], max_xy_chunk=1024)
if verbose:
print(f" writing CZI array with shape {CZI_image.shape} into Zarr store on/in {zarr_store} with chunks {chunks}...")
if zarr_store == "memory":
# write into in-memory Zarr store:
store = zarr.storage.MemoryStore()
z = zarr.open(
store=store,
mode="w",
shape=CZI_image.shape,
dtype=CZI_image.dtype,
chunks=chunks)
z[:] = CZI_image[:]
del CZI_image
CZI_image = z
elif zarr_store == "disk":
# write into on-disk Zarr store in .omio_cache folder:
zarr_cache_folder = os.path.join(metadata["original_parentfolder"], ".omio_cache")
os.makedirs(zarr_cache_folder, exist_ok=True)
zarr_path = os.path.join(zarr_cache_folder, fname_base + ".zarr")
if os.path.exists(zarr_path):
shutil.rmtree(zarr_path)
z = zarr.open(
zarr_path,
mode="w",
shape=CZI_image.shape,
dtype=CZI_image.dtype,
chunks=chunks,
)
# direct copy (array is fully in RAM)
z[:] = CZI_image[:]
del CZI_image # free RAM
CZI_image = z # continue working with Zarr array
# post-hoc OME metadata checkup and correction:
metadata = OME_metadata_checkup(metadata, verbose=verbose)
if verbose:
print("Finished reading CZI.")
if return_list:
return [CZI_image], [metadata]
else:
return CZI_image, metadata
# Thorlabs RAW file reader:
[docs]
def read_thorlabs_raw(fname, physicalsize_xyz=None, pixelunit="micron",
zarr_store=None, return_list=False, verbose=True):
"""
Read Thorlabs RAW files into OMIO's canonical representation.
This function reads a Thorlabs RAW file and constructs an image array together
with an OMIO metadata dictionary that follows the canonical OME axis convention
TZCYX. Dimensions and acquisition metadata are obtained from an accompanying XML
file in the same folder. If no XML is present, the function falls back to a
single YAML metadata file located in the same folder.
The RAW payload is interpreted as a contiguous raster of pixel values that must
be reshaped into a 5D stack ``(T, Z, C, Y, X)``. If requested, the data are
materialized into a Zarr array either in memory or on disk. For Zarr output,
copying is performed slice-wise over the last two spatial dimensions to limit
peak RAM usage.
YAML fallback in case of missing XML
--------------------------------------
In case no XML metadata file is found, the function looks for a YAML file
in the same folder. If found, it extracts the necessary dimensions and pixel
size information from the YAML keys ``T``, ``Z``, ``C``, ``Y``, ``X``, ``bits``,
``PhysicalSizeX``, ``PhysicalSizeY``, ``PhysicalSizeZ``, and ``pixelunit``.
The YAML file is not generated automatically by OMIO; it must be created
manually if no XML is available.
An example YAML file might look like this:
.. code-block:: yaml
T: 1
Z: 10
C: 3
Y: 512
X: 512
bits: 16
PhysicalSizeX: 0.65
PhysicalSizeY: 0.65
PhysicalSizeZ: 2.0
pixelunit: micron
Saved as e.g. ``image_metadata.yaml`` in the same folder as the RAW file,
this file allows read_thorlabs_raw to successfully interpret the RAW pixel.
OMIO offers a utility function to help create such YAML files:
``omio.utilities.create_thorlabs_raw_yaml()``, which prompts the user for
the necessary parameters and writes the YAML file (or takes defaults).
Note: The values entered in the YAML file must match the actual RAW data size.
I.e., the user must know the correct dimensions and bit depth in advance.
If neither XML nor YAML metadata is available, the function does not raise an
exception. Instead, it emits a warning and returns ``(None, None)`` or
``([None], [None])`` depending on `return_list`.
Parameters
----------
fname : str
Path to the RAW file. Note: the function expects an XML or YAML metadata file
to be present in the same folder. Also: read_thorlabs_raw is the core function
for Thorlabs RAW reading; omio.read() dispatches to this function when
encountering a .raw file. read_thorlabs_raw can only handle RAW files but no
folder paths (for this, please use read_thorlabs_raw_folder).
physicalsize_xyz : tuple of float or None, optional
Manual override for voxel sizes in the order
``(PhysicalSizeX, PhysicalSizeY, PhysicalSizeZ)``. If provided, these values
override XML or YAML values. Default is None.
pixelunit : str, optional
Default unit string used when neither XML nor YAML provides a unit.
Default is ``"micron"``.
zarr_store : {None, "memory", "disk"}, optional
Controls the representation of the returned image data.
* None: read and return a NumPy array in RAM
* "memory": return a Zarr array backed by an in-memory store
* "disk": return a Zarr array stored in the cache folder
``{parent}/.omio_cache/<basename>.zarr``
Existing on-disk stores at that location are replaced. Default is None.
return_list : bool, optional
If True, return ``[image]`` and ``[metadata]`` for backward compatibility.
Default is False.
verbose : bool, optional
If True, print diagnostic progress messages. Default is True.
Returns
-------
image : np.ndarray or zarr.core.array.Array or None
Image data in canonical OME axis order TZCYX, or None if dimensions cannot
be inferred from XML or YAML.
metadata : dict or None
Metadata dictionary aligned with the returned image, or None if metadata is
unavailable. The dictionary includes an ``Annotations`` block for auxiliary
fields when reading succeeds.
Raises
------
ValueError
If `zarr_store` is not one of {None, "memory", "disk"}, or if an XML file is
present but incomplete or inconsistent.
FileNotFoundError
If `fname` does not exist.
ImportError
If `zarr_store` is "memory" or "disk" but Zarr support is unavailable.
Notes
-----
* RAW reading requires the dimensions T, Z, C, Y, X and a bit depth to infer the
dtype and reshape the pixel stream. XML metadata is preferred. YAML is used
only if XML is absent.
* YAML fallback expects at minimum the keys ``T``, ``Z``, ``C``, ``Y``, ``X``,
and ``bits``. Additional keys such as ``pixelunit``, ``PhysicalSizeX/Y/Z``,
and ``TimeIncrement`` are optional.
* For `zarr_store` not None, the function uses ``numpy.memmap`` and slice-wise
copying to avoid loading the full RAW into RAM before writing.
* Axis normalization to TZCYX is applied at the end and may insert singleton
dimensions or reorder axes. The updated axis string and shape are stored in
the returned metadata.
* When `zarr_store="disk"`, the function may create and overwrite paths under
``.omio_cache``.
"""
if zarr_store not in (None, "memory", "disk"):
raise ValueError("read_thorlabs_raw: zarr_store must be one of None, 'memory', or 'disk'. "
f"Got: {zarr_store!r}")
if verbose:
print(f"Reading Thorlabs RAW file: {fname}")
if not os.path.exists(fname):
raise FileNotFoundError(f"The Thorlabs RAW file {fname} does not exist.")
if zarr_store in ("memory", "disk") and zarr is None:
raise ImportError("zarr is required for zarr_store='memory' or 'disk'.")
folder = os.path.dirname(fname)
fname_base, fname_extension = os.path.splitext(os.path.basename(fname))
# initialize metadata with provenance and placeholders:
metadata = {}
metadata["OMIO_VERSION"] = _OMIO_VERSION
metadata["original_filetype"] = fname_extension[1:]
metadata["original_filename"] = fname_base + fname_extension
metadata["original_parentfolder"] = folder
metadata["original_metadata_type"] = "thorlabs_metadata"
try:
metadata["original_creation_or_change_date"] = datetime.datetime.fromtimestamp(
os.path.getctime(fname), datetime.UTC).strftime("%Y-%m-%dT%H:%M:%S")
except Exception:
metadata["original_creation_or_change_date"] = "N/A"
metadata["axes"] = "TZCYX"
metadata["shape"] = 0
# these must be resolved from XML or YAML, otherwise we cannot read the RAW:
dims = None # dict with keys T,Z,C,Y,X,bits
unit_from_meta = None
# preferred: XML metadata in same folder
xml_files = [f for f in os.listdir(folder) if f.lower().endswith(".xml")]
xml_path = None
if xml_files:
xml_path = os.path.join(folder, xml_files[0])
if verbose:
print(f" Found XML file: {xml_files[0]}. Will use it for metadata extraction...")
tree = ET.parse(xml_path)
root = tree.getroot()
try:
lsm_node = root.find(".//LSM")
if lsm_node is None:
raise ValueError(f"The XML file {xml_path} is missing the LSM node.")
# dimensions X, Y:
X = int(lsm_node.get("pixelX"))
Y = int(lsm_node.get("pixelY"))
# channels C:
C = 1
wavelengths_node = root.find(".//Wavelengths")
if wavelengths_node is not None:
wavelengths_n = wavelengths_node.findall(".//Wavelength")
if wavelengths_n:
C = len(wavelengths_n)
else:
C = int(lsm_node.get("channel"))
else:
C = int(lsm_node.get("channel", C))
# time T:
T_node = root.find(".//Timelapse")
if T_node is not None:
T = int(T_node.get("timepoints"))
T_step_size = float(T_node.get("intervalSec"))
else:
T = 1
T_step_size = 1.0
# Bits and dtype
bits = 16
cam_node = root.find(".//Camera")
if cam_node is not None:
bits = int(cam_node.get("bitsPerPixel", bits))
if bits == 32:
dtype = np.float32
elif bits > 8:
dtype = np.uint16
else:
dtype = np.uint8
bytes_per_pixel = np.dtype(dtype).itemsize
# Z estimate and step size:
Z_node = root.find(".//ZStage")
Z_streaming = root.find(".//Streaming")
if Z_node is not None and Z_streaming is not None and bool(int(Z_streaming.get("zFastEnable", "0"))):
Z = int(Z_node.get("steps"))
Z_stepSize = float(Z_node.get("stepSizeUM"))
else:
Z = 1
Z_stepSize = 1.0
# correct Z from file size (flyback frames etc.):
file_size = os.path.getsize(fname)
denom = X * Y * C * T * bytes_per_pixel
if denom <= 0:
raise ValueError("Invalid dimension product for file size check.")
if file_size % denom != 0:
if verbose:
print(f" WARNING: RAW file size {file_size} is not an integer multiple of\n"
f" X*Y*C*T*bytes_per_pixel={denom}. Z_from_file_size will be truncated.")
Z_from_file_size = file_size // denom
if Z != Z_from_file_size:
if verbose:
print(f" Info: Z from XML ({Z}) does not match file size calculation ({Z_from_file_size}).\n"
" Using file size derived Z.")
Z = Z_from_file_size
dims = {"T": T, "Z": Z, "C": C, "Y": Y, "X": X, "bits": bits}
# OME like metadata:
metadata["SizeX"] = X
metadata["SizeY"] = Y
metadata["SizeC"] = C
metadata["SizeT"] = T
metadata["SizeZ"] = Z
px_um = float(lsm_node.get("pixelSizeUM"))
metadata["PhysicalSizeX"] = px_um
metadata["PhysicalSizeY"] = px_um
metadata["PhysicalSizeZ"] = Z_stepSize
unit_from_meta = "micron"
metadata["unit"] = unit_from_meta
metadata["PhysicalSizeXUnit"] = unit_from_meta
metadata["PhysicalSizeYUnit"] = unit_from_meta
metadata["PhysicalSizeZUnit"] = unit_from_meta
metadata["TimeIncrement"] = float(T_step_size)
metadata["TimeIncrementUnit"] = "seconds"
metadata["bits_per_pixel"] = bits
try:
metadata["frame_rate"] = float(lsm_node.get("frameRate", 0.0))
except Exception:
metadata["frame_rate"] = 0.0
# Optional: date from XML
date_node = root.find(".//Date")
if date_node is not None:
date_str = date_node.get("date")
local_time = None
try:
local_time = datetime.datetime.strptime(date_str, "%m/%d/%Y %H:%M:%S")
except Exception:
local_time = None
if local_time is not None:
creation_date_utc = local_time.replace(tzinfo=datetime.UTC)
metadata["original_creation_or_change_date"] = creation_date_utc.strftime("%Y-%m-%dT%H:%M:%S")
except Exception as e:
raise ValueError(f"The XML file {xml_path} is incomplete or inconsistent: {e}")
# fallback: YAML metadata in same folder if XML missing:
if dims is None:
yaml_path = _find_single_yaml(folder)
if yaml_path is not None:
if verbose:
print(f" No XML file found. Found YAML metadata file: {os.path.basename(yaml_path)}.")
ymd = _load_yaml_metadata(yaml_path)
# required keys to read RAW:
try:
T = _require_int(ymd, "T")
Z = _require_int(ymd, "Z")
C = _require_int(ymd, "C")
Y = _require_int(ymd, "Y")
X = _require_int(ymd, "X")
bits = _require_int(ymd, "bits")
except KeyError as e:
warnings.warn(
f"YAML metadata file {yaml_path} is missing required key {e}. "
"Cannot read RAW file. Please add the missing keys.")
if return_list:
return [None], [None]
return None, None
dims = {"T": T, "Z": Z, "C": C, "Y": Y, "X": X, "bits": bits}
metadata["SizeX"] = X
metadata["SizeY"] = Y
metadata["SizeC"] = C
metadata["SizeT"] = T
metadata["SizeZ"] = Z
# Unit and physical sizes are optional in YAML
unit_from_meta = ymd.get("pixelunit", None)
if unit_from_meta is not None:
metadata["unit"] = str(unit_from_meta)
for k in ("PhysicalSizeX", "PhysicalSizeY", "PhysicalSizeZ"):
if k in ymd:
try:
metadata[k] = float(ymd[k])
except Exception:
pass
if "TimeIncrement" in ymd:
try:
metadata["TimeIncrement"] = float(ymd["TimeIncrement"])
except Exception:
pass
if "TimeIncrementUnit" in ymd:
metadata["TimeIncrementUnit"] = str(ymd["TimeIncrementUnit"])
metadata["original_metadata_type"] = "thorlabs_yaml_metadata"
""" else:
print(" No XML or YAML metadata file found or multiple YAML files in the folder. Will return None.")
if return_list:
return [None], [None]
return None, None """
# if neither XML nor YAML provided dimensions, do not abort. Warn and return None:
if dims is None:
print("WARNING: No Thorlabs XML metadata and no YAML fallback found.\n"
" Cannot infer RAW dimensions (T, Z, C, Y, X, bits). Create a YAML file in the same folder as the RAW\n"
" file with keys: T, Z, C, Y, X, bits (and optionally pixelunit, PhysicalSizeX/Y/Z, TimeIncrement,\n"
" TimeIncrementUnit). Please refer to the documentation for details.\n"
" You may also use the utility function create_thorlabs_raw_yaml(fname) to create an empty YAML file\n"
" template that you can fill in manually. It will be created in the same folder as the RAW file.\n")
print(" Example YAML content (save as, e.g., Experiment.yaml into the same folder as the RAW file):\n\n T: 1\n Z: 1\n C: 1\n Y: 512\n X: 512\n bits: 16\n pixelunit: micron\n PhysicalSizeX: 0.5\n PhysicalSizeY: 0.5\n PhysicalSizeZ: 1.0\n TimeIncrement: 1.0\n TimeIncrementUnit: seconds\n")
print(" You may also use omio.create_thorlabs_raw_yaml(fname) to generate such a file interactively.\n")
if return_list:
return [None], [None]
return None, None
# final unit handling and external overrides:
# apply unit fallback if not set by XML or YAML:
if "unit" not in metadata or metadata["unit"] is None:
metadata["unit"] = pixelunit
# apply external physical size override if provided:
if physicalsize_xyz is not None:
psx, psy, psz = (float(physicalsize_xyz[0]), float(physicalsize_xyz[1]), float(physicalsize_xyz[2]))
metadata["PhysicalSizeX"] = psx
metadata["PhysicalSizeY"] = psy
metadata["PhysicalSizeZ"] = psz
# ensure physical sizes exist as fallbacks (do not invent units beyond pixel grid):
if "PhysicalSizeX" not in metadata or metadata["PhysicalSizeX"] is None:
metadata["PhysicalSizeX"] = 1.0
if "PhysicalSizeY" not in metadata or metadata["PhysicalSizeY"] is None:
metadata["PhysicalSizeY"] = 1.0
if "PhysicalSizeZ" not in metadata or metadata["PhysicalSizeZ"] is None:
metadata["PhysicalSizeZ"] = 1.0
metadata["PhysicalSizeXUnit"] = metadata.get("PhysicalSizeXUnit", metadata["unit"])
metadata["PhysicalSizeYUnit"] = metadata.get("PhysicalSizeYUnit", metadata["unit"])
metadata["PhysicalSizeZUnit"] = metadata.get("PhysicalSizeZUnit", metadata["unit"])
# read RAW data and optionally materialize into Zarr:
T = dims["T"]
Z = dims["Z"]
C = dims["C"]
Y = dims["Y"]
X = dims["X"]
bits = dims["bits"]
if bits == 32:
dtype = np.float32
elif bits > 8:
dtype = np.uint16
else:
dtype = np.uint8
expected_elements = T * Z * C * Y * X
if zarr_store is None:
if verbose:
print(" Reading entire Thorlabs RAW file into RAM...")
with open(fname, "rb") as f:
raw_data = np.frombuffer(f.read(), dtype=dtype)
if raw_data.size != expected_elements:
warnings.warn(
f"RAW data size mismatch: expected {expected_elements} elements, got {raw_data.size}. "
"Check XML/YAML metadata.")
if return_list:
return [None], [None]
return None, None
image = raw_data.reshape((T, Z, C, Y, X))
metadata["shape"] = image.shape
else:
if verbose:
print(" Preparing Zarr representation (via memmap + slice-wise copy)...")
raw_data = np.memmap(fname, dtype=dtype, mode="r")
if raw_data.size != expected_elements:
warnings.warn(
f"RAW data size mismatch: expected {expected_elements} elements, got {raw_data.size}. "
"Check XML/YAML metadata.")
if return_list:
return [None], [None]
return None, None
image_np = raw_data.reshape((T, Z, C, Y, X))
metadata["shape"] = image_np.shape
chunks = compute_default_chunks(image_np.shape, metadata["axes"])
if verbose:
print(f" Computed Zarr chunks: {chunks} (shape: {image_np.shape})")
if zarr_store == "memory":
if verbose:
print(" Writing into in-memory Zarr store...")
store = zarr.storage.MemoryStore()
zarr_array = zarr.open(
store=store,
mode="w",
shape=image_np.shape,
dtype=image_np.dtype,
chunks=chunks)
else:
if verbose:
print(" Writing into on-disk Zarr store for memory mapping...")
zarr_cache_folder = os.path.join(folder, ".omio_cache")
os.makedirs(zarr_cache_folder, exist_ok=True)
zarr_cache_path = os.path.join(zarr_cache_folder, fname_base + ".zarr")
if os.path.exists(zarr_cache_path):
shutil.rmtree(zarr_cache_path)
zarr_array = zarr.open(
zarr_cache_path,
mode="w",
shape=image_np.shape,
dtype=image_np.dtype,
chunks=chunks)
_copy_to_zarr_in_xy_slices(image_np, zarr_array,
desc=" slice-wise copying Thorlabs RAW to Zarr")
image = zarr_array
# final normalization steps:
memap_large_file_flag = (zarr_store == "disk")
image, metadata["shape"], metadata["axes"] = _correct_for_OME_axes_order(
image, metadata, memap_large_file=memap_large_file_flag, verbose=verbose)
metadata = OME_metadata_checkup(metadata, verbose=verbose)
if verbose:
print("Finished reading Thorlabs RAW file.")
if return_list:
return [image], [metadata]
return image, metadata
# %% OMIO_CACHE CLEANUP FUNCTION
[docs]
def cleanup_omio_cache(fname, full_cleanup=False, verbose=True):
"""
Remove OMIO-generated on-disk cache data under the `.omio_cache` folder.
This utility deletes Zarr stores created by OMIO when reading files with
``zarr_store="disk"``. The cache is expected to live in a hidden subfolder
``.omio_cache`` within a dataset's parent directory.
Two modes are supported:
* Targeted cleanup:
If ``fname`` is a file path and ``full_cleanup`` is False, only the corresponding
cache store ``.omio_cache/<basename>.zarr`` is removed.
* Full cleanup:
If ``full_cleanup`` is True, or if ``fname`` points to a directory, the entire
``.omio_cache`` folder under that directory is removed.
Parameters
----------
fname : str
Path to a file whose cache should be removed, or a directory containing an
``.omio_cache`` folder to be cleaned.
full_cleanup : bool, optional
If True, delete the entire ``.omio_cache`` folder. If False and ``fname`` is a
file, delete only the cache store corresponding to that file's basename.
Default is False.
verbose : bool, optional
If True, print diagnostic messages. Default is True.
Returns
-------
None
Raises
------
ValueError
If `fname` is neither an existing file nor an existing directory.
Notes
-----
* Cache deletion is performed via recursive directory removal and is not
reversible.
* If no ``.omio_cache`` folder exists at the expected location, the function
returns without error.
"""
if os.path.isfile(fname):
parent_folder = os.path.dirname(fname)
base_name = os.path.splitext(os.path.basename(fname))[0]
elif os.path.isdir(fname):
parent_folder = fname
base_name = None
else:
raise ValueError(f"cleanup_omio_cache: {fname} is neither a file nor a folder.")
omio_cache_folder = os.path.join(parent_folder, ".omio_cache")
if not os.path.exists(omio_cache_folder):
if verbose:
print(f"No .omio_cache folder found in {parent_folder}. Nothing to clean up.")
return
if full_cleanup or base_name is None:
print(f"Performing full cleanup of .omio_cache folder: {omio_cache_folder}")
shutil.rmtree(omio_cache_folder)
print("Cleanup complete.")
else:
zarr_path = os.path.join(omio_cache_folder, base_name + ".zarr")
if os.path.exists(zarr_path):
print(f"Deleting Zarr store for {base_name}: {zarr_path}")
shutil.rmtree(zarr_path)
print("Deletion complete.")
else:
print(f"No Zarr store found for {base_name} in .omio_cache. Nothing to delete.")
# %% EMPTY IMAGE AND METADATA CREATORS
# function to create empty OME metadata dict with default values:
# function to create empty OME ordered image with axes TZCYX:
[docs]
def create_empty_image(shape: tuple[int, int, int, int, int] = (1, 1, 1, 1, 1),
dtype=np.uint16,
fill_value=0,
zarr_store: Union[None, str] = None,
zarr_store_path: Union[None, str] = None,
zarr_store_name: Union[None, str] = None,
return_metadata: bool = False,
input_metadata: Union[None, dict] = None,
verbose: bool = True
) -> Union[None,
np.ndarray,
"zarr.core.array.Array",
tuple[np.ndarray, dict],
tuple["zarr.core.array.Array", dict]]:
"""
Create an empty 5D image in canonical OME axis order TZCYX.
This factory creates a new image container with shape ``(T, Z, C, Y, X)`` and a
specified dtype, either as a NumPy array in RAM or as a Zarr array backed by an
in-memory store or an on-disk cache. Optionally, it also returns a matching OMIO
metadata dictionary consistent with the created image.
For Zarr output, chunking is determined via `compute_default_chunks` using the
canonical OME axes. When writing to disk, the array is created under a hidden
cache folder ``.omio_cache`` located in the specified parent directory. Any
existing store at the target path is replaced.
Parameters
----------
shape : tuple of int, optional
Desired image shape as a 5-tuple ``(T, Z, C, Y, X)``. Default is
``(1, 1, 1, 1, 1)``. If `shape` is None or does not have length 5, a warning
is issued and the function returns None (or ``(None, None)`` if
`return_metadata` is True).
dtype : numpy dtype, optional
Data type of the created array. Default is ``np.uint16``.
fill_value : scalar or None, optional
Value used to initialize the array. If 0 and `zarr_store` is None, a
zero-initialized NumPy array is created via `np.zeros`. If `fill_value` is
None for Zarr output, the array is left uninitialized. Default is 0.
zarr_store : {None, "memory", "disk"}, optional
Storage backend for the created image.
* None: return a NumPy array in RAM
* "memory": return a Zarr array backed by a `zarr.storage.MemoryStore`
* "disk": return a Zarr array stored under ``.omio_cache`` on disk
Default is None.
zarr_store_path : str or None, optional
Path used to determine the parent directory for on-disk storage when
`zarr_store="disk"`. If this is a directory, it is used directly. If it is
a file path, its parent directory is used. Required for `zarr_store="disk"`.
zarr_store_name : str or None, optional
Basename used for the on-disk Zarr store when `zarr_store="disk"`. The final
store path is ``<parent>/.omio_cache/<zarr_store_name>.zarr``. Required for
`zarr_store="disk"`.
return_metadata : bool, optional
If True, return a tuple ``(image, metadata)`` where `metadata` is created by
`create_empty_metadata` and is consistent with `shape`. Default is False.
input_metadata : dict or None, optional
Optional metadata dictionary merged into the generated metadata when
`return_metadata` is True. Default is None.
verbose : bool, optional
If True, print diagnostic messages for some path handling cases. Default is
True.
Returns
-------
image : np.ndarray or zarr.core.array.Array or None
The created image container. Returns None if validation fails.
metadata : dict, optional
Only returned when `return_metadata` is True. The metadata dictionary is
consistent with the created image shape and canonical axes TZCYX.
Notes
-----
* The function assumes canonical OME axes TZCYX as defined by the module-level
constant ``_OME_AXES``.
* For `zarr_store="disk"`, any existing store at the target location is removed
before creating a new one.
* Chunking is delegated to `compute_default_chunks`. For very small arrays,
chunk sizes may match the full dimensions.
"""
if shape is None or len(shape) != 5:
print("WARNING create_empty_image: shape must be a 5-tuple (T, Z, C, Y, X).\n"
f" Got: {shape!r}. Will return None.")
if return_metadata:
return None, None
else:
return None
if zarr_store is None:
# numpy array in RAM:
if fill_value == 0:
if return_metadata:
return np.zeros(shape, dtype=dtype), create_empty_metadata(shape=shape,
input_metadata=input_metadata,
verbose=verbose)
else:
return np.zeros(shape, dtype=dtype)
else:
arr = np.empty(shape, dtype=dtype)
arr[...] = fill_value
if return_metadata:
return arr, create_empty_metadata(shape=shape, input_metadata=input_metadata,
verbose=verbose)
else:
return arr
else:
# zarr_store is not None:
# sanity check whether fname is not None, otherwise print warning and return None:
if zarr_store not in ("memory", "disk"):
warnings.warn("create_empty_image: zarr_store must be 'memory', or 'disk'. "
f"Got: {zarr_store!r}")
if return_metadata:
return None, None
else:
return None
# calculate chunks from shape:
try:
chunks = compute_default_chunks(shape, _OME_AXES, max_xy_chunk=1024)
except TypeError:
chunks = compute_default_chunks(shape, _OME_AXES)
if zarr_store == "memory":
store = zarr.storage.MemoryStore()
z_out = zarr.open(store=store, mode="w", shape=shape, dtype=dtype, chunks=chunks)
else:
# disk:
if zarr_store_path is None:
warnings.warn("create_empty_image: for zarr_store='disk', a valid zarr_store_path must be provided.\n"
f" Got: {zarr_store_path!r}")
if return_metadata:
return None, None
else:
return None
if zarr_store_name is None:
warnings.warn("create_empty_image: for zarr_store='disk', a valid zarr_store_name must be provided.\n"
f" Got: {zarr_store_name!r}")
if return_metadata:
return None, None
else:
return None
if os.path.isdir(zarr_store_path):
parent_folder = zarr_store_path
else:
parent_folder = os.path.dirname(zarr_store_path) or "."
if verbose:
print(f" zarr_store_path is a file; taking its parent folder:")
print(f" {parent_folder}")
cache_folder = os.path.join(parent_folder, ".omio_cache")
os.makedirs(cache_folder, exist_ok=True)
zarr_path = os.path.join(cache_folder, zarr_store_name + ".zarr")
if os.path.exists(zarr_path):
shutil.rmtree(zarr_path)
z_out = zarr.open(zarr_path, mode="w", shape=shape, dtype=dtype, chunks=chunks)
# initialize with fill_value (optionally, leave as uninitialized if fill_value is None)
if fill_value is not None:
if fill_value == 0:
z_out[:] = 0
else:
z_out[:] = np.asarray(fill_value, dtype=dtype)
if return_metadata:
return z_out, create_empty_metadata(shape=shape, input_metadata=input_metadata,
verbose=verbose)
else:
return z_out
# function to update metadata from image shape and axes:
# %% OME-TIF WRITER
# function to estimate compressed size of an image:
def _estimate_compressed_size(image, sample_fraction=0.001, compression_level=3):
"""
Estimate the compressed size of an image array using sampling and zlib.
This helper provides a rough estimate of the compressed size of an image by
compressing a small representative sample and extrapolating the resulting
compression ratio to the full dataset. It supports both NumPy arrays and
Zarr arrays.
For NumPy inputs, a linear prefix of the flattened array is sampled according
to `sample_fraction`. For Zarr inputs, a single spatial (Y, X) plane is
extracted to avoid materializing large portions of the dataset.
Parameters
----------
image : np.ndarray or zarr.core.array.Array
Image data whose compressed size is to be estimated.
sample_fraction : float, optional
Fraction of the total number of elements to sample for NumPy arrays.
The minimum sample size is one element. Default is 0.001.
compression_level : int, optional
Compression level passed to ``zlib.compress``, between 0 (no compression)
and 9 (maximum compression). Default is 3.
Returns
-------
estimated_compressed_size : float
Estimated compressed size of the full image in bytes.
Notes
-----
* The estimate assumes that the sampled region is representative of the entire
image. Strong spatial or temporal heterogeneity can lead to inaccurate
estimates.
* For Zarr inputs, only a single spatial slice is sampled, which may bias the
estimate if compression characteristics vary across non-spatial axes.
* The function does not account for container or metadata overhead associated
with specific storage formats.
"""
# Get a contiguous chunk of the image as a sample:
is_zarr = isinstance(image, zarr.core.array.Array)
if is_zarr:
# if Zarr, first just get a small chunk, e.g., first time slice, z-slice etc.:
slicer = [0] * (image.ndim - 2) + [slice(None), slice(None)]
sample_block = np.asarray(image[tuple(slicer)])
sample = sample_block.ravel()
else:
sample_size = max(1, int(np.prod(image.shape) * sample_fraction))
sample = image.ravel()[:sample_size]
# Compress the sample using specified compression level
compressed_sample = zlib.compress(sample.tobytes(), level=compression_level)
# Estimate compression ratio
compression_ratio = len(compressed_sample) / sample.nbytes
# Estimate compressed size of the entire image
estimated_compressed_size = image.nbytes * compression_ratio
return estimated_compressed_size
# function to check whether to use BigTIFF:
def _check_bigtiff(image, compression_level=3):
"""
Determine whether BigTIFF should be used for writing an image.
This helper decides whether an image exceeds the practical size limits of
standard TIFF files and therefore requires the BigTIFF format. The decision is
based first on the uncompressed in-memory size and, if that exceeds the limit,
optionally refined using an estimate of the compressed size.
The threshold used corresponds to the maximum addressable size of classic TIFF
files, reduced by a safety margin.
Parameters
----------
image : np.ndarray or zarr.core.array.Array
Image data to be evaluated.
compression_level : int, optional
Compression level passed to the internal compressed-size estimator.
This value is forwarded to `_estimate_compressed_size` and should be in the
range supported by zlib (0 to 9). Default is 3.
Returns
-------
use_bigtiff : bool
True if the image should be written as BigTIFF, False if standard TIFF is
sufficient.
Notes
-----
* The initial decision is based on the raw in-memory size ``image.nbytes``.
* If the raw size exceeds the TIFF limit, a compressed-size estimate is used as
a secondary check. If the estimated compressed size falls below the limit,
BigTIFF is not required.
* The compressed-size estimate is heuristic and may misclassify borderline
cases depending on image content and compression behavior.
"""
# (2**32 - 2**25)/1024**3 # in GB
# estimated_size/1024**3 # in GB
# check, whether image size is larger than 4GB:
if image.nbytes > 2**32 - 2**25:
use_bigtiff = True
else:
use_bigtiff = False
# check, whether the estimated size after compression is smaller than the maximum
# size of a normal tif file (if so, reset use_bigtiff to False):
if use_bigtiff:
estimated_size = _estimate_compressed_size(image, sample_fraction=0.001,compression_level=compression_level)
if estimated_size < 2**32 - 2**25:
use_bigtiff = False
return use_bigtiff
# function to check and modify output filename if it already exists:
def _check_fname_out(fname_out, overwrite):
"""
Resolve output filename collisions by appending a numeric suffix.
This helper checks whether an output filename already exists on disk. If it
does and overwriting is not permitted, a numeric suffix is appended to the base
filename before the ``.ome.tif`` extension. The suffix is incremented until a
non-existing filename is found.
Parameters
----------
fname_out : str
Proposed output filename, expected to end with ``.ome.tif``.
overwrite : bool
If True, allow overwriting an existing file and return `fname_out`
unchanged. If False, generate a modified filename if needed.
Returns
-------
fname_out_rev : str
A filename that does not exist on disk, either the original `fname_out` or
a suffixed variant.
Notes
-----
* The suffix is inserted as a space followed by an integer, for example
``"image 1.ome.tif"``.
* The function assumes the ``.ome.tif`` extension is present and does not
attempt to generalize to other extensions.
"""
""" fname_out_rev = fname_out
if os.path.exists(fname_out) and not overwrite:
i = 0
while os.path.exists(fname_out_rev):
i += 1
fname_out_rev = fname_out.replace(".ome.tif", f" {i}.ome.tif")
return fname_out_rev """
if not fname_out.endswith(".ome.tif"):
raise ValueError(
"_check_fname_out: fname_out must end with '.ome.tif'. "
f"Got: {fname_out!r}"
)
if overwrite or not os.path.exists(fname_out):
return fname_out
base = fname_out[:-len(".ome.tif")]
i = 1
while True:
candidate = f"{base} {i}.ome.tif"
if not os.path.exists(candidate):
return candidate
i += 1
# function to normalize axes and squeeze singleton S axis:
def _normalize_axes_for_ometiff(image, axes):
"""
Normalize axes for OME-TIFF writing by removing trivial singleton dimensions.
This helper prepares image data and its axis declaration for OME-TIFF output.
It currently handles the special case of a singleton ``"S"`` axis by removing
it when its corresponding dimension has size 1. The image array is squeezed
accordingly, and the axis string is updated to remain consistent.
After normalization, the function verifies that the axis string length matches
the array dimensionality.
Parameters
----------
image : array-like
Image data to be normalized. The input is converted to a NumPy array via
``np.asarray``.
axes : str
Axis declaration corresponding to `image`.
Returns
-------
arr : np.ndarray
Normalized NumPy array with trivial singleton axes removed.
axes : str
Updated axis string consistent with the returned array.
Raises
------
ValueError
If the resulting axis string length does not match ``arr.ndim``.
Notes
-----
* Only the ``"S"`` axis is handled explicitly. Other singleton dimensions are
not modified.
* The function is intended as a small preprocessing step before writing
OME-TIFF files.
"""
arr = np.asarray(image)
if "S" in axes:
s_idx = axes.index("S")
if arr.shape[s_idx] == 1:
arr = np.squeeze(arr, axis=s_idx)
axes = axes.replace("S", "")
if len(axes) != arr.ndim:
raise ValueError(
f"_normalize_axes_for_ometiff: axes '{axes}' (len={len(axes)}) "
f"does not fit to arr.ndim={arr.ndim}"
)
return arr, axes
# function to extract original filename from metadata:
def _get_original_filename_from_metadata(metadata: dict) -> Union[None, str]:
"""
Extract the original filename from an OMIO metadata dictionary.
This helper attempts to recover the original filename stored inside the
``Annotations`` entry of a metadata dictionary. It supports both supported
representations of annotations used within OMIO:
* a single annotations dictionary
* a list of annotation dictionaries
Only the basename of the file is returned. Any directory components are
stripped. If no valid filename can be found, the function returns ``None``.
Parameters
----------
metadata : dict
Metadata dictionary that may contain an ``Annotations`` entry.
Returns
-------
str or None
The original filename (basename only) if present and non-empty, otherwise
``None``.
Notes
-----
* The function looks specifically for the key ``"original_filename"`` inside
``metadata["Annotations"]``.
* If ``Annotations`` is a list, the first valid occurrence is returned.
* Invalid metadata structures or empty values are silently ignored.
"""
if not isinstance(metadata, dict):
return None
anns = metadata.get("Annotations", None)
# dict case
if isinstance(anns, dict):
fn = anns.get("original_filename", None)
if isinstance(fn, str) and fn.strip():
return os.path.basename(fn.strip())
# list of dicts case
if isinstance(anns, list):
for a in anns:
if not isinstance(a, dict):
continue
fn = a.get("original_filename", None)
if isinstance(fn, str) and fn.strip():
return os.path.basename(fn.strip())
return None
# main OME-TIFF writer function:
[docs]
def imwrite(fname: str,
images: Union[np.ndarray, "zarr.core.array.Array", list[Union[np.ndarray, "zarr.core.array.Array"]]],
metadatas: Union[dict, list[dict]],
compression_level: int = 3,
relative_path: Union[None, str] = None,
overwrite: bool = False,
return_fnames: bool = False,
indicate_merged_files: bool = False,
verbose: bool = True) -> Union[None, list[str]]:
"""
Write image stacks as OME-TIFF with OMIO-normalized metadata.
This function is OMIO's main OME-TIFF writer. It accepts either a single image
and metadata dictionary or lists of images and metadatas. For each stack, it
constructs an OME-XML metadata payload compatible with `tifffile.imwrite`,
normalizes axes for OME-TIFF writing, decides whether BigTIFF is required, and
writes a compressed OME-TIFF using zlib.
Output naming follows a provenance-first policy:
* If the metadata contain an original filename inside ``Annotations``, that
basename is used as the output basename.
* Otherwise, the basename is derived from `fname` (file stem) or from the
directory name if `fname` is a directory.
* Filename collisions are resolved by `_check_fname_out` unless `overwrite` is
True.
* If multiple stacks are written and no per-stack provenance name is available,
a numeric suffix ``_NNN`` is appended to keep outputs distinct.
If `relative_path` is provided, outputs are written into a subfolder relative to
the chosen output parent directory.
Parameters
----------
fname : str
Output anchor path. If `fname` is a directory, outputs are written into that
directory (or into `relative_path` below it). If `fname` is a file path,
outputs are written next to that file (or into `relative_path` below that
parent directory).
images : np.ndarray or zarr.core.array.Array or list of such arrays
Image data to write. A single image is accepted and treated as a one-element
list. Arrays are expected to represent OME-like dimensions; the function
normalizes axes and permutes to the writer's target order internally.
metadatas : dict or list of dict
Metadata dictionary or list of dictionaries aligned with `images`. Each
metadata dictionary should include at least ``axes`` and physical pixel sizes
(``PhysicalSizeX``, ``PhysicalSizeY``) for correct resolution tagging.
compression_level : int, optional
zlib compression level passed to `tifffile.imwrite` via
``compressionargs={"level": ...}``. Typical values are 0 to 9. Default is 3.
relative_path : str or None, optional
If not None, outputs are written into ``<out_parent>/<relative_path>`` and
the directory is created if needed. Default is None.
overwrite : bool, optional
If True, allow overwriting existing output files. If False, resolve name
collisions by appending a numeric suffix. Default is False.
return_fnames : bool, optional
If True, return a list of written filenames. If False, return None. Default
is False.
indicate_merged_files : bool, optional
If True, append ``"_merged"`` to the output basename for each written stack.
This is intended to mark stacks that originate from prior merging steps.
Default is False.
verbose : bool, optional
If True, print diagnostic messages about output naming and BigTIFF decisions.
Default is True.
Returns
-------
list of str or None
If `return_fnames` is True, returns a list of full paths to the written
OME-TIFF files in the order processed. Otherwise returns None.
Raises
------
ValueError
If `images` and `metadatas` have different lengths.
Notes
-----
* BigTIFF selection is determined by `_check_bigtiff`, using the uncompressed
array size and, if needed, an estimated compressed size.
* Axes are normalized by `_normalize_axes_for_ometiff` (currently removing a
singleton ``"S"`` axis) and then permuted into the writer's target axis order
before writing.
* Physical pixel sizes are written both as OME physical size fields and as TIFF
resolution tags using ``resolution=(1/PhysicalSizeY, 1/PhysicalSizeX)``.
* Map annotations are written from ``metadata["Annotations"]``. If annotations
are a dictionary, a single MapAnnotation is written. If annotations are a
list of dictionaries, multiple MapAnnotations are written. A namespace entry
is ensured if missing.
* The function writes with ``photometric="minisblack"`` and disables ImageJ
metadata blocks (``imagej=False``), relying on OME metadata for
interoperability.
"""
# check whether images and metadatas are lists:
#images_was_list = isinstance(images, list) and len(images) > 1
if not isinstance(images, list):
images = [images]
if not isinstance(metadatas, list):
metadatas = [metadatas]
if len(images) != len(metadatas):
raise ValueError("imwrite: images and metadatas must have the same length.")
# decide output parent directory:
# * if fname is a directory: output next to that directory (or inside relative_path if set)
# * if fname is a file: output next to the file (or inside relative_path if set)
if os.path.isdir(fname):
out_parent = fname
fallback_base = os.path.basename(os.path.normpath(fname))
""" # if name was a directory and images was not a list, writer received
# an image stack merged from multiple files; in this case, we append
# to the new filename "merged" to indicate this:
if images_was_list==False:
merged_files_appendix = "_merged" """
else:
out_parent = os.path.dirname(fname)
fallback_base = os.path.splitext(os.path.basename(fname))[0]
fallback_base = fallback_base.split(".")[0] # strip dot-separated extra extensions
# append "_merged" if requested:
merged_files_appendix = ""
if indicate_merged_files==True:
merged_files_appendix = "_merged"
# default output template uses fallback_base, but per-stack we may override via metadata provenance soon:
fname_out = os.path.join(out_parent, fallback_base + ".ome.tif")
#relative_path = "omio_outputs" # this will become a switch with None, "subfolder" or any relative path like or "../" "../subfolder"
if relative_path is not None:
out_parent = os.path.join(out_parent, relative_path)
os.makedirs(out_parent, exist_ok=True)
# refresh fname_out template (fallback)
fname_out = os.path.join(out_parent, fallback_base + ".ome.tif")
# we loop over images and metadatas:
stack_n = len(images)
stack_count = 0
fnames_written = []
for image, metadata in zip(images, metadatas):
# image = images[0]
# metadata = metadatas[0].copy()
# check, whether bigtiff is necessary:
use_bigtiff = _check_bigtiff(image, compression_level=compression_level)
# build output filename base for this stack:
orig_fn = _get_original_filename_from_metadata(metadata)
if orig_fn is not None:
base_i = os.path.splitext(orig_fn)[0]
base_i = base_i.split(".")[0]
else:
base_i = fallback_base
fname_out_i = os.path.join(out_parent, base_i + merged_files_appendix + ".ome.tif")
# if multiple outputs, append index only if needed (collision-safe); We do NOT blindly
# append index, because original filenames are already unique in most cases:
fname_out_stack = _check_fname_out(fname_out_i, overwrite)
# if overwrite is False and _check_fname_out returns the same name but file exists,
# _check_fname_out should already modify it.
# if stack_n>1 and no provenance name exists, solve via adding numbering:
if stack_n > 1 and orig_fn is None:
stack_count += 1
fname_out_i = os.path.join(out_parent, f"{base_i}_{stack_count:03d}.ome.tif")
fname_out_stack = _check_fname_out(fname_out_i, overwrite)
if verbose:
print(f"Writing OME-TIFF to: {fname_out_stack} (bigtiff={use_bigtiff})")
# reorder axes to OME standard TZCYX:
axes_in = metadata.get("axes", "TZCYX")
image_ome, axes_in = _normalize_axes_for_ometiff(image, axes_in)
desired_axes = "TCZYX"
if axes_in != desired_axes:
idx = {ax: i for i, ax in enumerate(axes_in)}
perm = [idx[ax] for ax in desired_axes]
image_ome = np.moveaxis(image_ome, perm, range(len(perm)))
axes_out = desired_axes
else:
axes_out = axes_in
len_unit = metadata.get("unit", "µm")
if len_unit in ("micron", "micrometer", "um"):
len_unit = "µm"
# check whether
ome_meta = {
"axes": axes_out,
"SizeX": metadata.get("SizeX", None),
"SizeY": metadata.get("SizeY", None),
"SizeZ": metadata.get("SizeZ", None),
"SizeT": metadata.get("SizeT", None),
"SizeC": metadata.get("SizeC", None),
"PhysicalSizeX": metadata.get("PhysicalSizeX", None),
"PhysicalSizeY": metadata.get("PhysicalSizeY", None),
"PhysicalSizeZ": metadata.get("PhysicalSizeZ", None),
"PhysicalSizeXUnit": len_unit,
"PhysicalSizeYUnit": len_unit,
"PhysicalSizeZUnit": len_unit,
#'Description': 'A multi-dimensional, multi-resolution image',
#'Channel': {'Name': ['Channel 1 fab', 'Channel 2 fab']},
# 'MapAnnotation': {
# 'Namespace': 'omio:metadata',
# '_OMIO_VERSION': '0.1.0',
# 'Experiment': 'MSD',
# 'Experimenter': 'Fabrizio'},
}
# get the time increment if present:
time_incr = metadata.get("TimeIncrement", None)
if time_incr is not None:
ome_meta["TimeIncrement"] = float(time_incr)
tunit = metadata.get("TimeIncrementUnit", "s")
if tunit in ("sec", "seconds"):
tunit = "s"
ome_meta["TimeIncrementUnit"] = tunit
# get any MapAnnotations if present:
annotations = metadata.get("Annotations", None)
if isinstance(annotations, dict):
ma = dict(annotations)
if "Namespace" not in ma:
ma["Namespace"] = "omio:metadata"
ome_meta["MapAnnotation"] = ma
elif isinstance(annotations, list):
ma_list = []
for ann in annotations:
if not isinstance(ann, dict):
continue
ma = dict(ann)
if "Namespace" not in ma:
ma["Namespace"] = "omio:metadata"
ma_list.append(ma)
if ma_list:
ome_meta["MapAnnotation"] = ma_list
tifffile.imwrite(
fname_out_stack,
image_ome,
ome=True,
compression="zlib",
compressionargs={"level": compression_level},
resolution=(1/metadata["PhysicalSizeY"], 1/metadata["PhysicalSizeX"]),
metadata=ome_meta,
photometric="minisblack",
imagej=False,
bigtiff=use_bigtiff)
fnames_written.append(fname_out_stack)
if return_fnames:
return fnames_written
# %% NAPARI-VIEWER CONVENIENCE FUNCTIONS
# function for squeezing a Zarr array for Napari visualization:
def _squeeze_zarr_to_napari_cache(src, fname, axes="TZCYXS", cache_folder_name=".omio_cache"):
if not isinstance(src, zarr.core.array.Array):
raise TypeError("_squeeze_zarr_to_napari_cache expects a zarr.core.Array as `src`.")
src_shape = src.shape
axes_list = list(axes)
if len(axes_list) != len(src_shape):
raise ValueError(f"axes length {len(axes_list)} does not match src.ndim {len(src_shape)}")
# keep all non singleton axes, but never drop Y or X even if singleton
keep_indices = [i for i, dim in enumerate(src_shape)
if (dim > 1) or (axes_list[i] in ("Y", "X"))]
squeezed_axes = "".join(axes_list[i] for i in keep_indices)
squeezed_shape = tuple(src_shape[i] for i in keep_indices)
napari_zarr_path = fname
if os.path.exists(napari_zarr_path):
shutil.rmtree(napari_zarr_path)
if src.chunks is not None:
squeezed_chunks = tuple(src.chunks[i] for i in keep_indices)
else:
squeezed_chunks = None
dst = zarr.open(
napari_zarr_path,
mode="w",
shape=squeezed_shape,
dtype=src.dtype,
chunks=squeezed_chunks)
# copy shortcut for 2D or less
if len(squeezed_shape) <= 2:
src_idx = []
for i, dim in enumerate(src_shape):
if i in keep_indices:
src_idx.append(slice(None))
else:
src_idx.append(0)
dst[...] = src[tuple(src_idx)]
return dst, squeezed_axes
# determine positions of spatial axes inside the squeezed representation
y_pos = squeezed_axes.find("Y")
x_pos = squeezed_axes.find("X")
if y_pos < 0 or x_pos < 0:
raise ValueError("Squeezed axes must contain Y and X.")
# outer axes are all except Y and X
outer_axes_positions = [i for i in range(len(squeezed_axes)) if i not in (y_pos, x_pos)]
outer_shape = tuple(squeezed_shape[i] for i in outer_axes_positions)
total_outer = int(np.prod(outer_shape)) if outer_shape else 1
# build mapping from squeezed positions to original indices
squeezed_to_orig = {sq_i: orig_i for sq_i, orig_i in enumerate(keep_indices)}
for outer_idx in tqdm(
np.ndindex(*outer_shape) if outer_shape else [()],
total=total_outer,
desc="creating Napari view Zarr (squeezed)"
):
# build dst index in squeezed space
dst_idx = [0] * len(squeezed_shape)
# fill outer axes indices
for pos, val in zip(outer_axes_positions, outer_idx):
dst_idx[pos] = val
# set Y and X to full slices
dst_idx[y_pos] = slice(None)
dst_idx[x_pos] = slice(None)
# now build src index in original space
src_idx = [0] * len(src_shape)
for sq_pos in range(len(squeezed_axes)):
orig_pos = squeezed_to_orig[sq_pos]
ax = squeezed_axes[sq_pos]
if ax in ("Y", "X"):
src_idx[orig_pos] = slice(None)
else:
src_idx[orig_pos] = dst_idx[sq_pos]
dst[tuple(dst_idx)] = src[tuple(src_idx)]
return dst, squeezed_axes
# function to get channel axis from axes and shape:
def _get_channel_axis_from_axes_and_shape(axes, shape, target_axis="C"):
"""
Return the index of a specific axis in a squeezed array.
This helper determines the positional index of a given axis label within an
axis string and its corresponding array shape. It is typically used after
singleton dimensions have been removed, where the remaining axes define the
layout of a reduced array.
Parameters
----------
axes : str
Axis string describing the order of dimensions in the array, for example
``"ZCYX"``.
shape : tuple
Shape of the array corresponding to `axes`.
target_axis : str, optional
Axis label to locate. The default is ``"C"`` for the channel axis.
Returns
-------
int or None
Zero-based index of the requested axis in the array if present, otherwise
``None``.
Raises
------
ValueError
If the length of `axes` does not match the length of `shape`.
Notes
-----
* The function performs a simple linear scan over the axis string.
* No validation of axis semantics is performed beyond matching the label.
"""
if len(axes) != len(shape):
raise ValueError("axes and shape must have the same length")
for i, ax in enumerate(axes):
if ax == target_axis:
return i
return None
# function to get scales from axes and metadata:
def _get_scales_from_axes_and_metadata(axes, metadata):
"""
Construct Napari scale values from an axis string and OMIO metadata.
This helper derives a tuple of scale factors suitable for passing to Napari’s
``scale`` argument. Spatial axes are mapped to their corresponding physical
voxel sizes stored in the metadata, while non-spatial axes receive a unit scale
of 1.0. The channel axis ``"C"`` is explicitly excluded, because when Napari’s
``channel_axis`` parameter is used, Napari expects the scale tuple to have
length ``ndim - 1`` and to cover only non-channel axes.
Axis handling
-------------
* ``Z`` → ``metadata["PhysicalSizeZ"]``
* ``Y`` → ``metadata["PhysicalSizeY"]``
* ``X`` → ``metadata["PhysicalSizeX"]``
* ``C`` → skipped (no scale entry)
* All other axes (for example ``T`` or ``S``) → scale ``1.0``
Parameters
----------
axes : str
Axis string corresponding to the array passed to Napari, for example
``"TCYX"`` or ``"TZCYX"``.
metadata : dict
Metadata dictionary providing physical voxel sizes under the keys
``PhysicalSizeX``, ``PhysicalSizeY``, and ``PhysicalSizeZ``.
Returns
-------
tuple of float
Scale values for all non-channel axes, in the order in which those axes
appear in `axes`.
Notes
-----
* No unit conversion is performed. The returned values are assumed to already
be in the units expected by Napari.
* Missing physical size entries in `metadata` will raise a ``KeyError``.
"""
scales = []
for ax in axes:
# Channel axis is handled via `channel_axis` in napari and
# must not receive a separate scale entry.
if ax == "C":
continue
if ax == "Z":
scales.append(metadata["PhysicalSizeZ"])
elif ax == "Y":
scales.append(metadata["PhysicalSizeY"])
elif ax == "X":
scales.append(metadata["PhysicalSizeX"])
else:
# T, S, and all other non-spatial axes:
scales.append(1.0)
return tuple(scales)
# function for squeezing a Zarr array for Napari visualization using Dask:
def _squeeze_numpy_keep_yx(image_np: np.ndarray, axes_full: str) -> tuple[np.ndarray, str]:
"""
Squeeze a NumPy array by removing singleton axes except for Y and X.
This helper removes all singleton dimensions from a NumPy array while preserving
the Y and X axes, even if they are singleton. The function also constructs an
updated axis string that reflects the new shape of the array.
Parameters
----------
image_np : np.ndarray
Input NumPy array to be squeezed.
axes_full : str
Full axis string corresponding to `image_np.shape`. This is typically an OME-like
axis declaration such as ``"TZCYXS"``.
Returns
-------
image_sq : np.ndarray
Squeezed NumPy array with singleton axes removed (except Y and X).
axes_sq : str
Updated axis string corresponding to `image_sq`.
"""
if len(image_np.shape) != len(axes_full):
raise ValueError("NumPy image does not match expected OME axis length")
squeeze_axes = [
i for i, (ax, dim) in enumerate(zip(axes_full, image_np.shape))
if (dim == 1) and (ax not in ("Y", "X"))
]
if squeeze_axes:
image_sq = np.squeeze(image_np, axis=tuple(squeeze_axes))
else:
image_sq = image_np
axes_sq = "".join(
ax for ax, dim in zip(axes_full, image_np.shape)
if (dim > 1) or (ax in ("Y", "X"))
)
return image_sq, axes_sq
def _squeeze_zarr_to_napari_cache_dask(src, fname, axes, cache_folder_name=".omio_cache"):
"""
Create a squeezed on-disk Zarr view for Napari using Dask.
This helper constructs a derived Zarr store in which all singleton dimensions of
a source Zarr array are removed. The computation is performed with Dask so that
the source array is not materialized fully in RAM. Instead, Dask streams chunks
from the input Zarr, applies ``squeeze`` lazily, and writes the result into a
new Zarr store under an OMIO cache folder.
The function also returns the corresponding squeezed axis string, obtained by
dropping axis labels whose dimensions were of length 1.
Parameters
----------
src : zarr.core.array.Array
Source Zarr array. The array is expected to be OME-like ordered according to
`axes` (often ``"TZCYXS"``).
fname : str
Path used to derive the cache location. The squeezed Zarr store is written
into ``<dirname(fname)>/<cache_folder_name>/`` and named
``<basename(fname)>_napari_squeezed.zarr``.
axes : str
Axis string corresponding to ``src.shape``.
cache_folder_name : str, optional
Name of the cache folder created alongside `fname`. Default is
``".omio_cache"``.
Returns
-------
squeezed_zarr : zarr.core.array.Array
Newly created Zarr array stored on disk with all singleton axes removed.
squeezed_axes : str
Axis string corresponding to `squeezed_zarr`.
Notes
-----
* Any existing Zarr store at the target path is deleted and replaced.
* The write is performed via Dask’s Zarr writer to allow chunk-wise computation
and writing. This avoids reading the full source array into memory.
* The computed list of singleton axis indices is used only to derive the
returned axis string; the actual squeeze operation is performed by
``da.squeeze``.
* This function creates a derived representation for visualization and does not
modify the source Zarr store.
"""
base_dir = os.path.dirname(fname)
cache_dir = os.path.join(base_dir, cache_folder_name)
os.makedirs(cache_dir, exist_ok=True)
target_path = os.path.join(cache_dir, os.path.basename(fname) + "_napari_squeezed.zarr")
if os.path.exists(target_path):
shutil.rmtree(target_path)
darr = da.from_zarr(src)
# squeeze only singleton axes that are not Y or X:
squeeze_axes = [i for i, (ax, dim) in enumerate(zip(axes, src.shape))
if (dim == 1) and (ax not in ("Y", "X"))]
squeezed_axes = "".join(ax for ax, dim in zip(axes, src.shape)
if (dim > 1) or (ax in ("Y", "X")))
if squeeze_axes:
darr = da.squeeze(darr, axis=tuple(squeeze_axes))
da.to_zarr(darr, target_path, mode="w")
squeezed_zarr = zarr.open(target_path, mode="r")
return squeezed_zarr, squeezed_axes
# main single-image handle for Napari visualization of image(s) as NumPy, Zarr, or Zarr + Dask:
def _single_image_open_in_napari(
image: Union[np.ndarray, "zarr.core.array.Array"],
metadata: dict,
fname: str,
zarr_mode: str = "numpy",
cache_folder_name: str = ".omio_cache",
axes_full: str = "TZCYX",
viewer=None,
viewer_name: Union[None, str] = None,
verbose: bool = True
) -> tuple["napari.Viewer", "napari.layers.Image", Union[np.ndarray, "zarr.core.array.Array"], str]:
"""
Open or extend a Napari viewer with a single OMIO image.
This helper prepares an image in OMIO’s canonical OME axis convention and then
adds it as a Napari image layer. It supports NumPy arrays and Zarr arrays, and
for Zarr inputs it provides three strategies controlled by `zarr_mode`:
* ``"numpy"``: fully materialize the Zarr array into RAM as a NumPy array,
apply ``squeeze()``, and pass the result to Napari. This is fastest if the
dataset fits comfortably in memory.
* ``"zarr_nodask"``: create a new squeezed on-disk Zarr store under a cache
folder by copying plane-wise. Napari reads from this derived store.
* ``"zarr_dask"``: create the squeezed on-disk Zarr store using Dask for
chunk-wise IO and parallelized writing, avoiding full materialization in RAM.
For NumPy inputs, the array is squeezed in RAM and the axis string is reduced
accordingly.
The function attempts to reuse an existing viewer when possible: if `viewer` is
provided it is used, otherwise ``napari.current_viewer()`` is tried, and if that
fails a new viewer is created.
Parameters
----------
image : np.ndarray or zarr.core.array.Array or list or tuple
Image data. If a list or tuple is provided, only the first element is used.
The input is expected to be OME-normalized already (for example via
``_correct_for_OME_axes_order``) so that it matches `axes_full`.
metadata : dict or list or tuple
Metadata corresponding to `image`. If a list or tuple is provided, only the
first element is used. The metadata should provide physical voxel sizes
(``PhysicalSizeX``, ``PhysicalSizeY``, ``PhysicalSizeZ``) and optionally a
length unit under ``unit``.
fname : str
Source filename used to derive the default layer name and cache locations.
zarr_mode : {"numpy", "zarr_nodask", "zarr_dask"}, optional
Strategy for handling Zarr inputs. Default is ``"numpy"``.
cache_folder_name : str, optional
Name of the hidden cache folder used to store derived Zarr stores created by
the squeezing modes. Default is ``".omio_cache"``.
axes_full : str, optional
Axis string describing the full expected axis order of the input before
squeezing. Default is ``"TZCYX"``. The implementation assumes that `image`
is consistent with this declaration.
viewer : napari.Viewer or None, optional
Existing Napari viewer to reuse. If None, a current viewer is reused if
available, otherwise a new viewer is created.
viewer_name : str or None, optional
Explicit layer name to use. If None, the basename of `fname` is used.
verbose : bool, optional
If True, print diagnostic progress messages. Default is True.
Returns
-------
viewer : napari.Viewer
The Napari viewer that was used or created.
layer : napari.layers.Image
The newly added image layer.
napari_data : np.ndarray or dask.array.Array
The data object passed to Napari. Zarr outputs are converted to a Dask array
via ``da.from_zarr`` for better Napari behavior.
napari_axes : str
Axis string corresponding to `napari_data` after squeezing.
Raises
------
ValueError
If `image` or `metadata` is an empty list or tuple.
ValueError
If the input array dimensionality does not match `axes_full`.
ValueError
If `zarr_mode` is not one of the supported values.
Notes
-----
* The channel axis is inferred from the squeezed axis string via
``_get_channel_axis_from_axes_and_shape`` and passed to Napari as
``channel_axis`` when present.
* Scale factors are computed from metadata via
``_get_scales_from_axes_and_metadata``. The channel axis is excluded from the
scale tuple by design.
* When `zarr_mode` produces a Zarr store, the store is written under the cache
folder and may overwrite an existing derived store with the same name.
"""
# fallback normalization: extract first element from lists/tuples
if isinstance(image, (list, tuple)):
if len(image) == 0:
raise ValueError(" _single_image_open_in_napari: 'image' list is empty.")
image = image[0]
if isinstance(metadata, (list, tuple)):
if len(metadata) == 0:
raise ValueError(" _single_image_open_in_napari: 'metadata' list is empty.")
metadata = metadata[0]
# case 1: Zarr-array
if isinstance(image, zarr.core.array.Array):
if verbose:
print(" Input is Zarr array.")
print(f" Preparing image for napari (zarr_mode='{zarr_mode}')...")
if zarr_mode == "zarr_dask":
# Zarr → squeezed Zarr w/ Dask:
if verbose:
print(" Using Dask for memory-efficient squeezing...")
store_path = str(image.store).replace("file://", "")
base_no_ext = store_path.replace(".zarr", "")
squeezed_zarr, squeezed_axes = _squeeze_zarr_to_napari_cache_dask(src=image,
fname=base_no_ext, axes=axes_full,
cache_folder_name=cache_folder_name)
napari_data = squeezed_zarr
napari_axes = squeezed_axes
elif zarr_mode == "zarr_nodask":
# Zarr → squeezed Zarr w/o Dask:
if verbose:
print(" Memory-efficient squeezing Zarr without Dask...")
store_path = str(image.store).replace("file://", "")
base_no_ext = store_path.replace(".zarr", "")
squeezed_zarr, squeezed_axes = _squeeze_zarr_to_napari_cache(src=image,
fname=base_no_ext, axes=axes_full,
cache_folder_name=cache_folder_name)
napari_data = squeezed_zarr
napari_axes = squeezed_axes
elif zarr_mode == "numpy":
# Zarr → NumPy into RAM, then squeeze:
if verbose:
print(" Loading full Zarr into RAM as NumPy array...")
image_np = np.asarray(image)
if len(image_np.shape) != len(axes_full):
raise ValueError("NumPy image does not match expected OME axis length")
#napari_data = image_np.squeeze()
napari_data, napari_axes = _squeeze_numpy_keep_yx(image_np, axes_full)
#napari_axes = "".join(ax for ax, dim in zip(axes_full, image_np.shape) if dim > 1)
else:
raise ValueError(
f" _single_image_open_in_napari: unknown zarr_mode='{zarr_mode}'. "
f" Use one of 'numpy', 'zarr_nodask', 'zarr_dask'.")
# case 2: NumPy-array
else:
if verbose:
print(" Input is NumPy array. Full loading into RAM (zarr_mode has no effect)...")
image_np = np.asarray(image)
if len(image_np.shape) != len(axes_full):
raise ValueError(" NumPy image does not match expected OME axis length")
#napari_data = image_np.squeeze()
napari_data, napari_axes = _squeeze_numpy_keep_yx(image_np, axes_full)
#napari_axes = "".join(ax for ax, dim in zip(axes_full, image_np.shape) if dim > 1)
# determine channel axis:
if len(napari_axes) != napari_data.ndim:
raise ValueError(
f"Internal error: napari_axes='{napari_axes}' (len={len(napari_axes)}) "
f"does not match napari_data.shape={napari_data.shape} (ndim={napari_data.ndim}).")
channel_axis = _get_channel_axis_from_axes_and_shape(axes=napari_axes,
shape=napari_data.shape,
target_axis="C")
# get scales (C-axis is not scaled in _get_scales_from_axes_and_metadata):
scales_array = _get_scales_from_axes_and_metadata(axes=napari_axes,metadata=metadata)
# check whether a viewer is already given, create a new one otherwise:
if viewer is None:
try:
viewer = napari.current_viewer()
except Exception:
viewer = None
if viewer is None:
viewer = napari.Viewer()
# build layer name:
if viewer_name is not None:
layer_name = viewer_name
else:
layer_name = os.path.basename(fname)
# convert napari_data into a dask-array if it's a Zarr (napari handles zarr dask arrays better):
if isinstance(napari_data, zarr.core.array.Array):
napari_data = da.from_zarr(napari_data)
# add the new image layer:
layer = viewer.add_image(napari_data, channel_axis=channel_axis,
scale=scales_array, name=layer_name)
viewer.scale_bar.visible = True
viewer.scale_bar.unit = metadata.get("unit", "micron")
return viewer, layer, napari_data, napari_axes
# main multi-image handler for Napari visualization of image(s) as NumPy, Zarr, or Zarr + Dask:
[docs]
def open_in_napari(images: Union[np.ndarray, "zarr.core.array.Array", list[Union[np.ndarray, "zarr.core.array.Array"]]],
metadatas: Union[dict, list[dict]],
fname: str,
zarr_mode: str = "numpy",
cache_folder_name: str = ".omio_cache",
axes_full: str = "TZCYX",
viewer: napari.Viewer = None,
returns: bool=False,
verbose: bool=True):
"""
Open or extend a Napari viewer with one or multiple OMIO images.
This is the main Napari convenience wrapper exposed to users. It accepts a
single image or a sequence of images together with matching metadata objects,
and adds each dataset as a Napari image layer by delegating per-image handling
to ``_single_image_open_in_napari``.
Input images may be NumPy arrays or Zarr arrays. For Zarr inputs, the behavior
is controlled by `zarr_mode` and follows the same strategies implemented in
``_single_image_open_in_napari`` (full materialization to NumPy, creation of a
squeezed cache Zarr without Dask, or creation of a squeezed cache Zarr with
Dask). A single viewer instance is reused across all layers.
Parameters
----------
images : np.ndarray or zarr.core.array.Array or list of (np.ndarray or zarr.core.array.Array)
Image data to visualize. If a single array is provided, it is treated as a
one-element list. Each image is expected to be consistent with `axes_full`
before squeezing (for example already normalized by
``_correct_for_OME_axes_order``).
metadatas : dict or list of dict
Metadata dictionaries corresponding to `images`. If a single dict is
provided, it is treated as a one-element list. Each metadata dict should
provide the physical voxel sizes used for Napari scaling (typically
``PhysicalSizeX``, ``PhysicalSizeY``, ``PhysicalSizeZ``) and optionally a
unit string under ``unit``.
fname : str
Base name used for Napari layer naming and cache path construction. If
multiple images are provided, an ``_idx{n}`` suffix is appended.
zarr_mode : {"numpy", "zarr_nodask", "zarr_dask"}, optional
Strategy for handling Zarr inputs, forwarded to
``_single_image_open_in_napari``. Default is ``"numpy"``.
cache_folder_name : str, optional
Name of the cache folder used for derived Zarr stores. Default is
``".omio_cache"``.
axes_full : str, optional
Full axis string describing the expected axis order of the input images
before squeezing. Default is ``"TZCYX"``.
viewer : napari.Viewer or None, optional
Existing Napari viewer to reuse. If None, a current viewer is reused if
available, otherwise a new viewer is created (via the single-image helper).
returns : bool, optional
If True, return detailed objects (viewer, layers, napari_datas, napari_axess).
If False, the function returns None. Default is False.
verbose : bool, optional
If True, print diagnostic progress messages. Default is True.
Returns
-------
viewer : napari.Viewer
The Napari viewer that was used or created. Only returned if `returns=True`.
layers : list of napari.layers.Image
The image layers added to the viewer, one per input image. Only returned if
`returns=True`.
napari_datas : list of (np.ndarray or dask.array.Array)
The data objects passed to Napari for each layer (Zarr inputs are typically
converted to Dask arrays in the single-image helper). Only returned if
`returns=True`.
napari_axess : list of str
Axis strings corresponding to each entry in `napari_datas` after squeezing.
Only returned if `returns=True`.
Raises
------
ValueError
If the number of images does not match the number of metadata dictionaries.
Notes
-----
* This function does not perform axis normalization itself. It assumes that
inputs already follow OMIO’s canonical axis convention as declared by
``axes_full``, and delegates squeezing, channel-axis inference, and scaling to
``_single_image_open_in_napari``.
* When multiple images are opened, the layer name is derived from ``fname`` with a
simple index suffix; if more informative naming is desired, pass a distinct
``fname`` per call or use the ``viewer_name`` mechanism in the single-image helper.
"""
# check, whether images and metadatas are lists:
if not isinstance(images, (list, tuple)):
images = [images]
if not isinstance(metadatas, (list, tuple)):
metadatas = [metadatas]
if len(images) != len(metadatas):
raise ValueError("open_in_napari: images and metadatas must have the same length.")
if verbose:
print(f"Got {len(images)} image(s) to open in napari.")
layers = []
napari_datas = []
napari_axess = []
for idx, (img, md) in enumerate(zip(images, metadatas)):
if verbose:
print(f"Opening image {idx+1}/{len(images)} in napari...")
# build layer name:
layer_fname = fname if len(images) == 1 else f"{fname}_idx{idx}"
# open in napari:
v, layer, napari_data, napari_axes = _single_image_open_in_napari(
image=img,
metadata=md,
fname=layer_fname,
zarr_mode=zarr_mode,
cache_folder_name=cache_folder_name,
axes_full=axes_full,
viewer=viewer,
verbose=verbose)
viewer = v
layers.append(layer)
napari_datas.append(napari_data)
napari_axess.append(napari_axes)
if verbose:
print(f"Opened {len(images)} image(s) with scales:")
if type(layers[0]) is list:
layer_to_iterate = layers[0]
else:
layer_to_iterate = layers
for i, layer in enumerate(layer_to_iterate):
# i = 0
# layer = layers[0][i]
print(f" Layer {i}: name='{layer.name}', scale={layer.scale}, shape={layer.data.shape}")
#print("All images opened in napari.")
if returns:
return viewer, layers, napari_datas, napari_axess
# %% CONVENIENCE READER AND CONVERTER
# helper functions:
# function to normalize input filename(s) to a list of strings:
def _normalize_to_list(fname: Union[str, os.PathLike, List[Union[str, os.PathLike]]]) -> List[str]:
"""
Normalize input filenames to a list of strings.
This helper ensures that a filename argument is always represented as a list of
string paths. It accepts a single path-like object or a sequence of such objects
and converts all entries to their string representation.
Parameters
----------
fname : str or os.PathLike or list of (str or os.PathLike)
Input filename or filenames to normalize.
Returns
-------
list of str
List of filename strings. A single input is wrapped into a one-element list.
Notes
-----
* Path-like objects are converted using ``str(...)``.
* Tuples are treated the same as lists and returned as a new list.
"""
if isinstance(fname, (list, tuple)):
return [str(f) for f in fname]
return [str(fname)]
# function to check whether path is a directory:
def _is_dir(p: str) -> bool:
"""
Check whether a path refers to an existing directory.
This helper wraps ``os.path.isdir`` to provide a small, explicit predicate that
tests whether the given path exists and is a directory.
Parameters
----------
p : str
Path to test.
Returns
-------
bool
True if `p` exists and is a directory, False otherwise.
"""
return os.path.isdir(p)
# function to check whether path is a file:
def _is_file(p: str) -> bool:
"""
Check whether a path refers to an existing file.
This helper wraps ``os.path.isfile`` to provide a small, explicit predicate that
tests whether the given path exists and is a file.
Parameters
----------
p : str
Path to test.
Returns
-------
bool
True if `p` exists and is a file, False otherwise.
"""
return os.path.isfile(p)
# function to get lowercased file extension:
def _lower_ext(p: str) -> str:
"""
Return the lowercased file extension of a path.
This helper extracts the file extension from a path and normalizes it to
lowercase. The returned string includes the leading dot. If the path has no
extension, an empty string is returned.
Parameters
----------
p : str
Path from which to extract the file extension.
Returns
-------
str
Lowercased file extension, including the leading dot, or an empty string if
no extension is present.
"""
return os.path.splitext(p)[1].lower()
# function to check whether path looks like an OME-TIFF:
def _looks_like_ome_tif(p: str) -> bool:
"""
Check whether a path looks like an OME-TIFF filename.
This helper performs a simple filename-based check to determine whether a path
appears to refer to an OME-TIFF file by testing for the standard OME-TIFF
extensions.
Parameters
----------
p : str
Path or filename to check.
Returns
-------
bool
True if the path ends with ``.ome.tif`` or ``.ome.tiff`` (case-insensitive),
False otherwise.
Notes
-----
* This is a heuristic based solely on the filename extension and does not
inspect file contents.
"""
lp = p.lower()
return lp.endswith(".ome.tif") or lp.endswith(".ome.tiff")
# function to list image files in a folder:
def _list_image_files_in_folder(folder: str,
allowed_ext: Union[None, set] = None,
recursive: bool = False) -> List[str]:
"""
List image files in a folder matching supported extensions.
This helper scans a directory for image files whose extensions match a set of
allowed formats commonly handled by OMIO. It can operate either non-recursively
on a single directory level or recursively across all subdirectories.
OME-TIFF files are detected explicitly via their ``.ome.tif`` or ``.ome.tiff``
suffixes and are always included when present.
Parameters
----------
folder : str
Path to the directory to scan for image files.
allowed_ext : set of str or None, optional
Set of allowed lowercase file extensions (including the leading dot).
If None, a default set is used:
``{".tif", ".tiff", ".lsm", ".czi", ".raw", ".ome.tif", ".ome.tiff"}``.
recursive : bool, optional
If True, search recursively through all subdirectories of `folder`.
If False, only files directly inside `folder` are considered. Default is
False.
Returns
-------
list of str
Sorted list of file paths matching the allowed extensions.
Notes
-----
* Only regular files are included; directories are ignored.
* Extension checks are case-insensitive.
* The function does not validate file contents and relies solely on filename
extensions.
"""
if allowed_ext is None:
allowed_ext = {".tif", ".tiff", ".lsm", ".czi", ".raw", ".ome.tif", ".ome.tiff"}
patterns = []
if recursive:
patterns.append(os.path.join(folder, "**", "*"))
else:
patterns.append(os.path.join(folder, "*"))
files = []
for pat in patterns:
for p in glob.glob(pat, recursive=recursive):
if not os.path.isfile(p):
continue
lp = p.lower()
if _looks_like_ome_tif(lp):
files.append(p)
continue
ext = _lower_ext(lp)
if ext in allowed_ext:
files.append(p)
files = sorted(files)
return files
# function to get the first image file in a folder:
def _first_image_file_in_folder(folder: str,
allowed_ext: Union[None, set] = None) -> Union[None, str]:
"""
Return the first image file found in a folder.
This helper scans a directory for image files matching a set of allowed
extensions and returns the first match according to the sorted order defined
by ``_list_image_files_in_folder``. If no matching files are found, ``None`` is
returned.
Parameters
----------
folder : str
Path to the directory to scan for image files.
allowed_ext : set of str or None, optional
Set of allowed lowercase file extensions (including the leading dot). If
None, the default extension set used by
``_list_image_files_in_folder`` is applied.
Returns
-------
str or None
Path to the first matching image file, or ``None`` if no image files are
found.
Notes
-----
* The search is non-recursive.
* File ordering is determined by lexicographic sorting of the matched paths.
* No validation of file contents is performed.
"""
files = _list_image_files_in_folder(folder, allowed_ext=allowed_ext, recursive=False)
if not files:
return None
return files[0]
# function to merge metadata sources:
def _merge_metadata_sources(sources: List[Dict[str, Any]],
namespace: str = "omio:merge",
keep_original_forever: bool = True) -> Dict[str, Any]:
"""
Merge multiple metadata dictionaries originating from different image stacks
into a single metadata dictionary with explicit provenance tracking.
The merge policy is conservative and provenance focused:
* Metadata from the first source (index 0) is taken as authoritative for
physical scaling and timing fields.
* PhysicalSizeX, PhysicalSizeY, PhysicalSizeZ, and TimeIncrement are compared
across all sources. If inconsistencies are detected, a warning is issued and
the value from source 0 is retained.
* Image size related keys (SizeT, SizeZ, SizeC, SizeY, SizeX) are not recomputed
here and are expected to be updated later from the merged image data.
* Provenance information for each input source is collected and stored inside
the Annotations block under a dedicated namespace.
Parameters
----------
sources : list of dict
List of metadata dictionaries to be merged. Each entry is assumed to
correspond to one image stack.
namespace : str, optional
Namespace prefix used for keys written into the Annotations block that
describe the merge operation. Default is "omio:merge".
keep_original_forever : bool, optional
If True, existing original_* keys inside Annotations are preserved and not
overwritten. Default is True.
Returns
-------
dict
A merged metadata dictionary based on the first source, extended with
provenance and merge information stored in the Annotations field.
Notes
-----
* This function does not modify the input dictionaries in place.
* Provenance information includes original filename, parent folder, file type,
metadata type, shape, and axes for each source stack.
"""
if not sources:
return {}
md0 = dict(sources[0])
def _get(md: Dict[str, Any], k: str, default=None):
return md.get(k, default)
# Compare physical sizes and time increment across sources and warn if inconsistent.
keys_to_compare = ["PhysicalSizeX", "PhysicalSizeY", "PhysicalSizeZ", "TimeIncrement"]
for k in keys_to_compare:
v0 = _get(md0, k, None)
for i, mdi in enumerate(sources[1:], start=1):
vi = _get(mdi, k, None)
if v0 is None or vi is None:
continue
try:
if float(v0) != float(vi):
warnings.warn(
f"Metadata mismatch in '{k}' between stack 0 ({v0}) and stack {i} ({vi}). "
f"Using stack 0 value."
)
break
except Exception:
if v0 != vi:
warnings.warn(
f"Metadata mismatch in '{k}' between stack 0 ({v0}) and stack {i} ({vi}). "
f"Using stack 0 value."
)
break
# Build provenance block.
provenance = []
for i, mdi in enumerate(sources):
provenance.append({
"index": i,
"original_filename": mdi.get("original_filename", "N/A"),
"original_parentfolder": mdi.get("original_parentfolder", "N/A"),
"original_filetype": mdi.get("original_filetype", "N/A"),
"original_metadata_type": mdi.get("original_metadata_type", "N/A"),
"shape": mdi.get("shape", None),
"axes": mdi.get("axes", None),
})
# Place provenance into Annotations under a single namespace.
annotations = md0.get("Annotations", {})
if not isinstance(annotations, dict):
annotations = {}
annotations = dict(annotations)
# Preserve existing original_* keys inside annotations if requested.
if keep_original_forever:
pass
# tifffile MapAnnotation is single namespace in your current policy, so keep it flat.
# We store the merge info as JSON-like string to keep it simple and robust.
# If you prefer, you can store it as multiple keys, but keep in mind Fiji display readability.
annotations["Namespace"] = md0.get("Annotations", {}).get("Namespace", "omio:metadata")
annotations[f"{namespace}:created_utc"] = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%dT%H:%M:%S")
annotations[f"{namespace}:n_sources"] = str(len(sources))
annotations[f"{namespace}:sources"] = str(provenance)
md0["Annotations"] = annotations
return md0
# function to compute merge target shapes:
def _compute_merge_target_shapes(images, merge_along_axis: str, context: str = "merge"):
"""
Compute target shapes required for merging multiple 5D images along a given axis.
This helper determines three shape descriptors used during merge operations:
* max_shape:
The maximum extent across all input images for every axis except the merge
axis. This defines the required padding or broadcasting size for non-merged
dimensions.
* merged_shape:
The final output shape after merging, where the merge axis length is the sum
of the corresponding axis lengths across all inputs, and all other axes take
their maximum extent.
* shapes:
The original shapes of all input images, preserved in input order.
Parameters
----------
images : list of array-like
Sequence of input images. Each image must be 5-dimensional and follow the
OMIO/OME axis convention.
merge_along_axis : str
Axis label along which the images will be concatenated (e.g. "T", "Z", "C").
Must be a valid key in the global axis-to-index mapping.
context : str, optional
Short context string used to prefix warning messages. Default is "merge".
Returns
-------
max_shape : tuple[int, int, int, int, int] or None
Maximum shape across all non-merge axes. None if validation fails.
merged_shape : tuple[int, int, int, int, int] or None
Shape of the merged output image. None if validation fails.
shapes : list of tuple[int, ...] or None
List of original input shapes in the same order as `images`.
None if validation fails.
Notes
-----
* All input images are expected to be 5D. If any input violates this
assumption, a warning is issued and the function returns (None, None, None).
* This function performs no data allocation and no axis reordering. It only
computes shape bookkeeping required for downstream merge logic.
"""
axis_idx = _AXIS_TO_INDEX[merge_along_axis]
shapes = []
for i, img in enumerate(images):
try:
s = tuple(img.shape)
except Exception:
s = tuple(np.asarray(img).shape)
if len(s) != 5:
warnings.warn(f"{context}: expected 5D arrays. Got shape {s} at index {i}.")
return None, None, None
shapes.append(s)
# max over non merge axes
max_shape = list(shapes[0])
for j in range(5):
if j == axis_idx:
continue
max_shape[j] = max(s[j] for s in shapes)
# merged shape: merge axis is sum, others max
merged_shape = list(max_shape)
merged_shape[axis_idx] = int(sum(s[axis_idx] for s in shapes))
return tuple(max_shape), tuple(merged_shape), shapes
# function to validate merge inputs:
def _validate_merge_inputs_with_optional_padding(images, metadatas, merge_along_axis: str,
zeropadding: bool,
context: str = "merge"):
"""
Validate inputs for a multi-stack merge operation, with optional zero-padding support.
This function enforces OMIO's merge preconditions for a set of input images and
their corresponding metadata entries. The validation is intentionally strict
about axis semantics and dimensionality and provides two modes regarding shape
compatibility for non-merge axes.
Validation policy
-----------------
* The merge axis must be one of the allowed merge axes.
* `images` and `metadatas` must be non-empty and have identical lengths.
* Each metadata entry must declare canonical OME axes exactly as "TZCYX".
No attempt is made to repair or normalize axes during validation.
* Each image must be 5D and compatible with the canonical axis convention.
Shape compatibility modes
-------------------------
* If `zeropadding` is False (strict mode):
All non-merge axes must match exactly across all stacks. Only the merge axis
is allowed to differ. Any mismatch aborts the merge.
* If `zeropadding` is True (padding-permitted mode):
Exact agreement on non-merge axes is not required. Only the 5D requirement is
enforced, enabling later padding or broadcasting logic to harmonize shapes.
Parameters
----------
images : list of array-like
Sequence of image arrays to be merged. Each image must be 5-dimensional and
follow the OME axis convention implied by metadata axes "TZCYX".
metadatas : list of dict
Sequence of metadata dictionaries aligned with `images`. Each must contain
an "axes" entry that equals "TZCYX".
merge_along_axis : str
Axis label along which the images are intended to be merged (e.g. "T", "Z", "C").
Must be a member of `_ALLOWED_MERGE_AXES`.
zeropadding : bool
If True, allow shape mismatches on non-merge axes (while still requiring 5D).
If False, require exact matching across all non-merge axes.
context : str, optional
Short context string used to prefix warning messages. Default is "merge".
Returns
-------
bool
True if validation passes under the selected policy and mode, otherwise False.
Notes
-----
* The function emits warnings (rather than raising exceptions) to support
higher-level workflows that may choose alternative merge strategies.
* In strict mode, the first image (index 0) defines the reference shape for all
non-merge axes.
* This function performs no padding, concatenation, or data copying. It only
checks preconditions for downstream merge logic.
"""
if merge_along_axis not in _ALLOWED_MERGE_AXES:
print(f"{context}: invalid merge_along_axis={merge_along_axis!r}.\n"
f" Allowed: {sorted(_ALLOWED_MERGE_AXES)}.")
return False
if not images or not metadatas or len(images) != len(metadatas):
print(f"{context}: empty inputs or mismatched images/metadatas list lengths.")
return False
for i, md in enumerate(metadatas):
ax = md.get("axes", None)
if ax != _OME_AXES:
print(f"{context}: axes mismatch at index {i}. Expected '{_OME_AXES}' but got {ax!r}.\n"
" Merge aborted.")
return False
# shape checks:
axis_idx = _AXIS_TO_INDEX[merge_along_axis]
try:
shape0 = tuple(images[0].shape)
except Exception:
shape0 = tuple(np.asarray(images[0]).shape)
if len(shape0) != 5:
warnings.warn(f"{context}: expected 5D arrays (TZCYX). Got shape {shape0}. \n"
" Merge aborted.")
return False
if zeropadding:
# only need to ensure every input is 5D
for i, img in enumerate(images):
try:
s = tuple(img.shape)
except Exception:
s = tuple(np.asarray(img).shape)
if len(s) != 5:
warnings.warn(
f"{context}: expected 5D arrays (TZCYX). Got shape {s} at index {i}. \n"
" Merge aborted.")
return False
return True
# strict mode: non merge axes must match
must_match_axes = [a for a in _OME_AXES if a != merge_along_axis]
for i, img in enumerate(images):
try:
shapei = tuple(img.shape)
except Exception:
shapei = tuple(np.asarray(img).shape)
if len(shapei) != 5:
warnings.warn(
f"{context}: expected 5D arrays (TZCYX). Got shape {shapei} at index {i}. \n"
" Merge aborted.")
return False
for a in must_match_axes:
j = _AXIS_TO_INDEX[a]
if shapei[j] != shape0[j]:
print(f"{context}: incompatible shapes for merge along '{merge_along_axis}'.\n"
f" Mismatch in axis '{a}' between stack 0 ({shape0}) and stack {i} ({shapei}).\n"
" Merge aborted.")
return False
return True
# function to open Zarr for merge output:
def _zarr_open_for_merge_output(zarr_store: str, folder: str, basename: str, shape, dtype, chunks):
"""
Create and open a Zarr array to be used as the output target of a merge operation.
This helper encapsulates OMIO’s policy for allocating the destination Zarr store
used when merging multiple image stacks. The storage backend is selected via
`zarr_store` and the resulting Zarr array is always opened in write mode,
replacing any existing on-disk store if necessary.
Storage modes
-------------
* zarr_store == "memory":
Create a Zarr array backed by an in-memory `MemoryStore`. The data live only
for the lifetime of the Python process.
* zarr_store == "disk":
Create a persistent Zarr array on disk at
`{folder}/.omio_cache/<basename>.zarr`. If a Zarr store with the same name
already exists, it is removed and recreated.
Parameters
----------
zarr_store : str
Storage backend selector. Must be either "memory" or "disk".
folder : str
Parent folder used when creating an on-disk Zarr store.
basename : str
Base name (without extension) for the output Zarr directory.
shape : tuple
Shape of the output array.
dtype : numpy.dtype
Data type of the output array.
chunks : tuple
Chunk shape to use for the Zarr array.
Returns
-------
zarr.core.array.Array
An opened Zarr array ready to receive merged image data.
Raises
------
ValueError
If `zarr_store` is not one of the supported values.
Notes
-----
* This function performs no validation of `shape`, `dtype`, or `chunks`; it
assumes these have already been computed and validated by the merge logic.
* The `.omio_cache` folder is created automatically if it does not exist.
"""
if zarr_store == "memory":
store = zarr.storage.MemoryStore()
return zarr.open(store=store, mode="w", shape=shape, dtype=dtype, chunks=chunks)
if zarr_store == "disk":
zarr_cache_folder = os.path.join(folder, ".omio_cache")
os.makedirs(zarr_cache_folder, exist_ok=True)
zarr_path = os.path.join(zarr_cache_folder, basename + ".zarr")
if os.path.exists(zarr_path):
shutil.rmtree(zarr_path)
return zarr.open(zarr_path, mode="w", shape=shape, dtype=dtype, chunks=chunks)
raise ValueError(f"_zarr_open_for_merge_output: invalid zarr_store={zarr_store!r}.")
# function to copy into zarr chunk-aligned:
def _copy_into_zarr_chunk_aligned(z_out, img, out_start: int, axis_idx: int):
"""
Copy `img` into an output Zarr array `z_out`, writing blocks aligned to the
output chunk grid along a specified merge axis.
The copy is performed only along `axis_idx`, starting at the output offset
`out_start`. All other axes are copied fully. To minimize overhead and to keep
the copy compatible with interactive environments, the function iterates in
contiguous blocks whose length matches `z_out.chunks[axis_idx]` whenever chunk
information is available. If chunking is unknown or invalid, the function falls
back to copying the full extent of `img` along the merge axis in a single block.
A key implementation detail is that each block is materialized as a NumPy array
via `np.asarray(img[...])` before assignment. This avoids assignment issues that
can occur when attempting direct Zarr to Zarr writes in certain interactive
(Jupyter or REPL) contexts, at the cost of temporarily holding the current block
in RAM.
Parameters
----------
z_out : zarr.core.array.Array
Destination Zarr array. Must be 5D and writable. Chunking is used to define
block boundaries along `axis_idx` when available.
img : array-like
Source image data to copy. Can be a NumPy array or a Zarr array. Must be 5D
and compatible with `z_out` on all non-merge axes.
out_start : int
Start index along `axis_idx` in `z_out` where the first element of `img`
will be written.
axis_idx : int
Integer index of the axis along which the copy is offset and blockwise
partitioned.
Returns
-------
None
The function writes into `z_out` in place.
Notes
-----
* The function assumes both `z_out` and `img` are 5D (consistent with OMIO’s
canonical TZCYX convention) and does not validate dimensionality beyond what
is implicitly required by indexing.
* Block boundaries are chosen to align with the destination chunk size along
`axis_idx`, which is typically beneficial for write performance and reduces
the chance of repeatedly touching the same chunks during sequential merges.
* Memory usage is bounded by the size of a single block (full extents of the
non-merge axes and `block` along the merge axis).
"""
n = int(img.shape[axis_idx])
# chunk length along merge axis in output
chunk_len = int(z_out.chunks[axis_idx]) if getattr(z_out, "chunks", None) is not None else None
if chunk_len is None or chunk_len <= 0:
chunk_len = n # fallback: one block
src_pos = 0
while src_pos < n:
block = min(chunk_len, n - src_pos)
out_slice = [slice(None)] * 5
src_slice = [slice(None)] * 5
out_slice[axis_idx] = slice(out_start + src_pos, out_start + src_pos + block)
src_slice[axis_idx] = slice(src_pos, src_pos + block)
# materialize only the block, not the whole img
""" Note on memory efficiency (Dec 2025):
When executed in Jupyter notebooks or Interactive Python environments,
we would get an asynchronous assignment error if we would use
z_out[tuple(slicer)] = img directly (img is Zarr!)
Therefore, we convert to NumPy first, which puts the image slice-wise (!)
into RAM temporarily. This is the pill we have to swallow for now, i.e.,
no further memory-efficient optimization is possible with current Zarr version
(as of 2025-12). """
z_out[tuple(out_slice)] = np.asarray(img[tuple(src_slice)])
src_pos += block
# function to copy into zarr with zero padding:
def _copy_into_zarr_with_padding(z_out, img, out_start: int, axis_idx: int,
target_nonmerge_shape: tuple):
"""
Copy a 5D source image `img` into a 5D output Zarr array `z_out` at a specified
offset along a merge axis, while implicitly applying zero padding on all
non-merge axes.
The output array `z_out` is assumed to be pre-initialized with zeros and sized
to the merge target shape. During copying, only the region that exists in the
source is written: for every non-merge axis `j`, the function writes the slice
`0:src_shape[j]` into `z_out`. Any remaining extent up to the non-merge target
shape stays zero, thereby realizing padding without explicitly writing zeros.
Copying is performed in contiguous blocks aligned to the destination chunk grid
along the merge axis. If chunk information is unavailable or invalid, the
function falls back to copying the full extent of `img` along the merge axis in
a single block.
Each written block is materialized as a NumPy array via `np.asarray(...)` before
assignment. This avoids issues that can arise with direct Zarr to Zarr writes in
interactive environments (for example Jupyter), at the cost of temporarily
holding the current block in RAM.
Parameters
----------
z_out : zarr.core.array.Array
Destination Zarr array. Must be writable and 5D. It should already be
initialized with zeros so that unwritten regions represent padded zeros.
img : array-like
Source image data to copy. Can be a NumPy array or a Zarr array. Must be 5D.
out_start : int
Start index along the merge axis in `z_out` where the first element of `img`
will be written.
axis_idx : int
Integer index of the merge axis (the axis along which stacking/concatenation
occurs).
target_nonmerge_shape : tuple
A 5D shape defining the intended maximal extents on the non-merge axes for
the merge operation. The merge axis length in this tuple is not used by this
function; it is included for interface consistency with merge planning code.
Returns
-------
None
The function writes into `z_out` in place.
Notes
-----
* The function assumes both `z_out` and `img` follow the 5D convention used in
the merge pipeline (typically TZCYX) and does not perform full compatibility
checks beyond what indexing requires.
* Padding is implicit: only `0:src_shape[j]` is written for non-merge axes, and
the remainder stays zero due to `z_out` initialization.
* Memory usage is bounded by the size of one block: full extents of the source
on non-merge axes and `block` elements along the merge axis.
"""
src_shape = tuple(img.shape)
n = int(src_shape[axis_idx])
chunk_len = int(z_out.chunks[axis_idx]) if getattr(z_out, "chunks", None) is not None else None
if chunk_len is None or chunk_len <= 0:
chunk_len = n
src_pos = 0
while src_pos < n:
block = min(chunk_len, n - src_pos)
out_slice = [slice(None)] * 5
src_slice = [slice(None)] * 5
# merge axis placement:
out_slice[axis_idx] = slice(out_start + src_pos, out_start + src_pos + block)
src_slice[axis_idx] = slice(src_pos, src_pos + block)
# non merge axes: only write the valid src region [0:src_shape[j]]:
for j in range(5):
if j == axis_idx:
continue
out_slice[j] = slice(0, src_shape[j])
src_slice[j] = slice(0, src_shape[j])
""" Note on memory efficiency (Dec 2025):
When executed in Jupyter notebooks or Interactive Python environments,
we would get an asynchronous assignment error if we would use
z_out[tuple(slicer)] = img directly (img is Zarr!)
Therefore, we convert to NumPy first, which puts the image slice-wise (!)
into RAM temporarily. This is the pill we have to swallow for now, i.e.,
no further memory-efficient optimization is possible with current Zarr version
(as of 2025-12). """
z_out[tuple(out_slice)] = np.asarray(img[tuple(src_slice)])
src_pos += block
# function to merge images by concatenation along an axis:
def _merge_concat_along_axis(images, metadatas, merge_along_axis: str,
zarr_store: str,
namespace: str = "omio:merge",
zeropadding: bool = False,
verbose: bool = True):
"""
Concatenate multiple 5D image stacks along a specified OME axis and return a
merged image plus merged metadata, with optional zero padding and optional Zarr
output.
This routine implements OMIO's merge policy for images that are already in the
canonical 5D OME order (typically TZCYX) and whose metadata explicitly declares
`axes == "TZCYX"`. No axis repair or reshaping is attempted. The merge occurs by
concatenation along `merge_along_axis`, where each input may contribute an
arbitrary length greater than one on that axis.
Two validation and shape policies are supported:
Strict mode (zeropadding=False)
All non-merge axes must match exactly across inputs. The output shape equals
the common non-merge shape, and the merge axis length equals the sum of all
input lengths along that axis.
Zero padding mode (zeropadding=True)
Non-merge axes may differ across inputs. The output non-merge extents are set
to the per-axis maxima across all inputs. Each input is embedded into a
zero-initialized target block by writing only its existing source region
`0:src_shape[j]` on every non-merge axis. The merge axis is then concatenated
as in strict mode.
The merged metadata are created by combining `metadatas` according to
`_merge_metadata_sources(...)` and then updated to reflect the merged image shape.
Per-source provenance is recorded in `Annotations` under the provided `namespace`.
Output representation is controlled by `zarr_store`:
zarr_store is None
The merge is performed in NumPy, returning a NumPy ndarray. In strict mode,
inputs are concatenated directly. In zero padding mode, padded NumPy blocks
are allocated per input before concatenation.
zarr_store is "memory" or "disk"
The merge target is created as a Zarr array (in-memory store or
`{folder}/.omio_cache/<basename>.zarr`). Copying is performed incrementally
into the destination to avoid loading all data at once. In strict mode, blocks
are written in chunk-aligned slabs along the merge axis. In zero padding mode,
the destination is zero-initialized and only the valid source region is written
for each input, which implicitly leaves padded regions as zeros.
Due to current Zarr behavior in interactive environments, Zarr-backed sources are
materialized block-wise via `np.asarray(...)` during assignment into the output
Zarr, trading small temporary RAM usage for robustness.
Parameters
----------
images : sequence of array-like
Input image stacks. Each entry must be 5D and compatible with the declared
OME axes order. Entries may be NumPy arrays or Zarr arrays.
metadatas : sequence of dict
Metadata dictionaries corresponding one-to-one with `images`. Each dict must
declare `axes == "TZCYX"` (or the configured `_OME_AXES`) and should contain
provenance fields used by the merge metadata policy.
merge_along_axis : str
Axis label along which to concatenate (must be in `_ALLOWED_MERGE_AXES` and
present in `_OME_AXES`).
zarr_store : {None, "memory", "disk"}
Controls whether output is a NumPy array (None) or a Zarr array ("memory" or
"disk").
namespace : str, optional
Namespace prefix used when writing merge provenance into `Annotations`.
Default is "omio:merge".
zeropadding : bool, optional
If False, require exact non-merge axis matches. If True, allow mismatched
non-merge axes and pad each input to the maxima before concatenation.
Default is False.
verbose : bool, optional
If True, print diagnostic messages about shapes and progress.
Returns
-------
merged : np.ndarray or zarr.core.array.Array or None
The merged image. Returns None if validation fails or if Zarr output was
requested but Zarr is unavailable.
md_merged : dict or None
The merged metadata dictionary aligned with `merged`. Returns None if the
merge fails.
Notes
-----
* Inputs must already be 5D and OME-ordered; this function does not reorder axes.
* In Zarr mode, the output is written into an OMIO cache location when
`zarr_store="disk"`. Existing stores at that path are replaced.
* Zero padding is implemented by writing only existing source extents into a
zero-initialized destination, leaving the remaining regions as zeros.
"""
ok = _validate_merge_inputs_with_optional_padding(
images, metadatas,
merge_along_axis=merge_along_axis,
zeropadding=zeropadding,
context=f"merge_along_{merge_along_axis}")
if not ok:
return None, None
axis_idx = _AXIS_TO_INDEX[merge_along_axis]
if zeropadding:
max_shape_nonmerge, merged_shape, _ = _compute_merge_target_shapes(
images, merge_along_axis, context=f"merge_along_{merge_along_axis}")
if verbose:
print(f"Merging with zero padding along axis '{merge_along_axis}':")
print(f" max non-merge shape = {max_shape_nonmerge}")
print(f" merged shape = {merged_shape}")
if merged_shape is None:
if verbose:
print("Merge aborted due to shape computation failure.")
return None, None
else:
shape0 = tuple(images[0].shape)
merged_shape = list(shape0)
merged_shape[axis_idx] = int(sum(int(img.shape[axis_idx]) for img in images))
merged_shape = tuple(merged_shape)
max_shape_nonmerge = shape0
md_merged = _merge_metadata_sources(metadatas, namespace=namespace)
md_merged["axes"] = _OME_AXES
md_merged["shape"] = merged_shape
md_merged["SizeT"] = int(merged_shape[_AXIS_TO_INDEX["T"]])
md_merged["SizeZ"] = int(merged_shape[_AXIS_TO_INDEX["Z"]])
md_merged["SizeC"] = int(merged_shape[_AXIS_TO_INDEX["C"]])
md_merged["SizeY"] = int(merged_shape[_AXIS_TO_INDEX["Y"]])
md_merged["SizeX"] = int(merged_shape[_AXIS_TO_INDEX["X"]])
if zarr_store is None:
# NumPy path:
if not zeropadding:
merged = np.concatenate([np.asarray(img) for img in images], axis=axis_idx)
return merged, md_merged
# zeropadding=True: build padded blocks then concatenate:
padded = []
for image_i, img in enumerate(images):
src = np.asarray(img)
# build per input target shape:
out_shape = list(max_shape_nonmerge)
out_shape[axis_idx] = src.shape[axis_idx] # keep merge axis length per input
if verbose:
print(f" Padding image {image_i} of shape {src.shape} to target shape {tuple(out_shape)}...")
out = np.zeros(tuple(out_shape), dtype=src.dtype)
sl = [slice(None)] * 5
for j in range(5):
sl[j] = slice(0, src.shape[j])
out[tuple(sl)] = src
padded.append(out)
merged = np.concatenate(padded, axis=axis_idx)
return merged, md_merged
""" padded = []
for image_i, img in enumerate(images):
if verbose:
print(f" Padding image {image_i} of shape {tuple(img.shape)} to target non-merge shape {tuple(max_shape_nonmerge)}...")
src = np.asarray(img)
out = np.zeros(tuple(max_shape_nonmerge), dtype=src.dtype)
sl = [slice(None)] * 5
for j in range(5):
sl[j] = slice(0, src.shape[j])
# sanity check: src shape must fit into target non-merge shape
out[tuple(sl)] = src
padded.append(out)
merged = np.concatenate(padded, axis=axis_idx)
return merged, md_merged """
# Zarr output requested:
if zarr is None:
warnings.warn("Merge: zarr_store was requested but zarr is not available. Merge aborted.")
return None, None
chunks = compute_default_chunks(merged_shape, _OME_AXES)
folder0 = metadatas[0].get("original_parentfolder", ".")
base0 = os.path.splitext(metadatas[0].get("original_filename", "merge"))[0]
out_basename = f"{base0}_merged_{merge_along_axis}"
z_out = _zarr_open_for_merge_output(
zarr_store=zarr_store,
folder=folder0,
basename=out_basename,
shape=merged_shape,
dtype=images[0].dtype,
chunks=chunks)
""" start = 0
for img in images:
n = int(img.shape[axis_idx])
slicer = [slice(None)] * 5
slicer[axis_idx] = slice(start, start + n)
# when executed in Jupyter notebooks or Interactive Python environments,
# we get an asynchronous assignment error here with Zarr arrays if we
# try z_out[tuple(slicer)] = img directly. Therefore, we convert to NumPy first.
# (can't be solved otherwise withe current Zarr version as of 2025-12)
z_out[tuple(slicer)] = np.asarray(img)
start += n """
# z_out is zero initialized already, so "padding" is just writing the existing source region
start = 0
for img in images:
if zeropadding:
_copy_into_zarr_with_padding(z_out, img, out_start=start,
axis_idx=axis_idx,
target_nonmerge_shape=max_shape_nonmerge)
else:
_copy_into_zarr_chunk_aligned(z_out, img, out_start=start, axis_idx=axis_idx)
start += int(img.shape[axis_idx])
return z_out, md_merged
# function to merge folder-stacks with padding:
def _merge_folderstacks_with_padding(images, metadatas,
merge_along_axis: str,
zarr_store: str = None,
zeropadding: bool = True,
verbose: bool = True
) -> Tuple[Union[None, np.ndarray, "zarr.core.array.Array"], Union[None, dict]]:
"""
Merge multiple 5D folder stacks by concatenating along a chosen OME axis, with an
optional zero padding policy for mismatched non-merge dimensions and optional
materialization into Zarr.
This helper is intended for the common case where a folder contains multiple
stacks that should be combined into a single canonical 5D array in OME axis
order (TZCYX). The function enforces that all metadata declare `axes == "TZCYX"`
and that all inputs are 5D. No axis repair, reordering, or dimensional inference
is performed.
Merge policy
------------
* The output is constructed by concatenation along `merge_along_axis`.
* Non-merge axes can be handled in two ways:
zeropadding=False (strict)
All non-merge axis lengths must match exactly across all inputs. If any
mismatch is detected, the merge is aborted.
zeropadding=True (padding)
For each non-merge axis, the maximum size across all inputs is computed.
Each input stack is then embedded into a zero-initialized target array of
that padded shape by writing only the valid source region. Concatenation
is performed on these padded arrays, so missing regions remain zero.
Output materialization
----------------------
* If `zarr_store is None`, the merged result is returned as a NumPy ndarray.
* If `zarr_store` is not None, the merged NumPy result is written into a Zarr
array created by `_zarr_open_for_merge_output(...)` and the returned image is
that Zarr array.
Practical note
--------------
This merge is primarily meaningful for `merge_along_axis="T"` in workflows where
multiple time blocks belong to a single logical acquisition. Merging along "Z"
or "C" is allowed but assumes that the remaining axes correspond to compatible
acquisitions and that interpreting the concatenation as an extended Z stack or
channel axis is semantically correct.
Parameters
----------
images : sequence of array-like
Input image stacks. Each entry must be 5D (TZCYX). Entries may be NumPy
arrays or Zarr arrays, but padding requires materialization via
`np.asarray(...)`.
metadatas : sequence of dict
Metadata dictionaries corresponding one-to-one with `images`. Each must
declare `axes == "TZCYX"` (or `_OME_AXES`).
merge_along_axis : str
Axis label along which to concatenate. Must be in `_ALLOWED_MERGE_AXES`.
zarr_store : {None, "memory", "disk"}, optional
If None, return a NumPy array. Otherwise, write the merged result to a Zarr
store and return a Zarr array handle.
zeropadding : bool, optional
If True, pad mismatched non-merge axes to per-axis maxima using zeros before
concatenation. If False, require exact non-merge axis matches.
verbose : bool, optional
If True, print diagnostic progress and merge mode information.
Returns
-------
merged : np.ndarray or zarr.core.array.Array or None
The merged image. Returns None if validation fails or if Zarr output was
requested but Zarr is unavailable.
md_merged : dict or None
Metadata dictionary aligned with the returned merged image, including updated
shape and SizeT/SizeZ/SizeC/SizeY/SizeX fields and merge provenance stored
under the merge namespace.
"""
if merge_along_axis not in _ALLOWED_MERGE_AXES:
warnings.warn(
f"merge_folder_stacks: invalid merge_along_axis={merge_along_axis!r}. "
f"Allowed: {sorted(_ALLOWED_MERGE_AXES)}."
)
return None, None
if not images:
warnings.warn("merge_folder_stacks: no images to merge.")
return None, None
# path without zero-padding:
if not zeropadding:
# strict check: require identical sizes on all non merged axes
if verbose:
print(f"merge_folder_stacks: merging without zero-padding along axis '{merge_along_axis}'.")
axis_idx = _AXIS_TO_INDEX[merge_along_axis]
sh0 = tuple(images[0].shape)
for i, img in enumerate(images):
shi = tuple(img.shape)
for j in range(5):
if j == axis_idx:
continue
if shi[j] != sh0[j]:
print( "WARNING: merge_folder_stacks: shape mismatch on non merged axis. \n"
f" stack0={sh0}, stack{i}={shi}.\n"
" Set zeropadding=True to allow padding merge. Merge aborted.")
return None, None
# otherwise: path with zero-padding:
if verbose:
print(f"merge_folder_stacks: merging with zero-padding along axis '{merge_along_axis}'.")
# require correct axes and 5D:
for i, md in enumerate(metadatas):
if md.get("axes", None) != _OME_AXES:
warnings.warn(
f"merge_folder_stacks: expected axes '{_OME_AXES}' but got {md.get('axes', None)!r} at index {i}.\n"
" Merge aborted.")
return None, None
if len(tuple(images[i].shape)) != 5:
warnings.warn(
f"merge_folder_stacks: expected 5D arrays (TZCYX) but got shape {tuple(images[i].shape)} at index {i}.\n"
" Merge aborted.")
return None, None
axis_idx = _AXIS_TO_INDEX[merge_along_axis]
non_merge_idxs = [j for j in range(5) if j != axis_idx]
# determine max sizes for non merged axes:
max_sizes = list(images[0].shape)
for j in non_merge_idxs:
max_sizes[j] = max(int(img.shape[j]) for img in images)
# Build padded arrays
padded_arrays = []
for img in images:
src = np.asarray(img) # padding requires NumPy materialization
target_shape = list(src.shape)
for j in non_merge_idxs:
target_shape[j] = max_sizes[j]
target_shape = tuple(target_shape)
out = np.zeros(target_shape, dtype=src.dtype)
slicer = [slice(None)] * 5
for j in range(5):
slicer[j] = slice(0, src.shape[j])
out[tuple(slicer)] = src
padded_arrays.append(out)
# Now concat along merge axis
merged_np = np.concatenate(padded_arrays, axis=axis_idx)
md_merged = _merge_metadata_sources(metadatas, namespace="omio:merge_folderstacks")
md_merged["axes"] = _OME_AXES
md_merged["shape"] = merged_np.shape
md_merged["SizeT"] = int(merged_np.shape[_AXIS_TO_INDEX["T"]])
md_merged["SizeZ"] = int(merged_np.shape[_AXIS_TO_INDEX["Z"]])
md_merged["SizeC"] = int(merged_np.shape[_AXIS_TO_INDEX["C"]])
md_merged["SizeY"] = int(merged_np.shape[_AXIS_TO_INDEX["Y"]])
md_merged["SizeX"] = int(merged_np.shape[_AXIS_TO_INDEX["X"]])
if zarr_store is None:
return merged_np, md_merged
if zarr is None:
warnings.warn("merge_folder_stacks: zarr_store was requested but zarr is not available. Merge aborted.")
return None, None
chunks = compute_default_chunks(merged_np.shape, _OME_AXES)
folder0 = metadatas[0].get("original_parentfolder", ".")
base0 = os.path.splitext(metadatas[0].get("original_filename", "merge"))[0]
out_basename = f"{base0}_merged_folderstacks_{merge_along_axis}"
z_out = _zarr_open_for_merge_output(
zarr_store=zarr_store,
folder=folder0,
basename=out_basename,
shape=merged_np.shape,
dtype=merged_np.dtype,
chunks=chunks,
)
z_out[:] = merged_np
return z_out, md_merged
# function to dispatch to format-specific readers:
def _dispatch_read_file(path: str,
zarr_store: Union[None, str],
return_list: bool,
physicalsize_xyz: Union[None, Any],
pixelunit: str,
verbose: bool = True,
) -> Tuple[Any, Dict[str, Any]]:
"""
Dispatch a single microscopy file to the appropriate OMIO reader based on its
filename extension and return the loaded image and metadata.
This function selects one of OMIO's format specific readers and forwards common
configuration parameters such as voxel size overrides, unit normalization, Zarr
materialization mode, verbosity, and backward compatible list returns.
Supported formats and dispatch rules
------------------------------------
* TIFF family: OME TIFF (.ome.tif, .ome.tiff) and standard TIFF variants
(.tif, .tiff, .lsm) are read via `read_tif(...)`.
* Zeiss CZI: .czi is read via `read_czi(...)`.
* Thorlabs RAW: .raw is read via `read_thorlabs_raw(...)`.
Parameters
----------
path : str
Path to the input file to read.
zarr_store : {None, "memory", "disk"}
If None, the reader returns a NumPy array in RAM. If "memory" or "disk", the
reader materializes the result as a Zarr array backed by an in memory store
or an on disk cache store, respectively. The concrete behavior is determined
by the called reader.
return_list : bool
Forwarded to the reader for backward compatibility. If True, readers may
return `[image]` and `[metadata]` for non paginated inputs. Some readers may
return lists regardless of this flag for semantically ambiguous cases
(e.g. paginated TIFFs).
physicalsize_xyz : Any or None
Optional override for physical pixel sizes, forwarded to the reader. If
provided, the reader uses these values instead of metadata derived sizes
according to its own precedence policy.
pixelunit : str
Unit string forwarded to the reader for unit normalization and defaults.
verbose : bool, optional
If True, forward diagnostic progress output from the reader.
Returns
-------
image : Any
The loaded image, typically a NumPy ndarray or Zarr array, or a list of such
objects if the reader returns multiple stacks.
metadata : dict
Metadata dictionary aligned with the returned image, or a list of dicts if
the reader returns multiple stacks.
Raises
------
ValueError
If the file extension is not supported by the dispatch rules.
"""
lp = path.lower()
if _looks_like_ome_tif(lp) or _lower_ext(lp) in {".tif", ".tiff", ".lsm"}:
return read_tif(
path,
zarr_store=zarr_store,
return_list=return_list,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
verbose=verbose)
if _lower_ext(lp) == ".czi":
return read_czi(
path,
zarr_store=zarr_store,
return_list=return_list,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
verbose=verbose)
if _lower_ext(lp) == ".raw":
return read_thorlabs_raw(
path,
zarr_store=zarr_store,
return_list=return_list,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
verbose=verbose)
raise ValueError(f"Unsupported file extension '{_lower_ext(lp)}' for path: {path}")
# functions to detect and collapse OME multifile series:
_UUID_FILENAME_RE = re.compile(r'FileName="([^"]+)"')
def _ome_referenced_basenames(tif_path: str) -> list[str]:
"""
Return list of basenames referenced via FileName="..." in OME-XML.
Does not trigger multifile loading.
"""
try:
with tifffile.TiffFile(tif_path, _multifile=False) as tif:
ome = tif.ome_metadata
except Exception:
return []
if not ome:
return []
refs = _UUID_FILENAME_RE.findall(ome)
return [os.path.basename(r) for r in refs]
class _UnionFind:
def __init__(self):
self.parent = {}
def find(self, x):
self.parent.setdefault(x, x)
if self.parent[x] != x:
self.parent[x] = self.find(self.parent[x])
return self.parent[x]
def union(self, a, b):
ra, rb = self.find(a), self.find(b)
if ra != rb:
self.parent[rb] = ra
def _collapse_ome_multifile_series(files: list[str], verbose: bool = True) -> list[str]:
"""
Keep only one representative per OME multifile series.
Groups files by OME-XML connectivity (connected components).
Works if only some member files contain OME-XML and if refs are partial.
"""
if not files:
return []
# Map basename -> all full paths seen (basename collisions are possible, keep list)
base_to_paths: dict[str, list[str]] = {}
for f in files:
base_to_paths.setdefault(os.path.basename(f), []).append(f)
uf = _UnionFind()
# Build connectivity graph: file_basename <-> referenced_basename
for f in files:
b = os.path.basename(f)
refs = _ome_referenced_basenames(f)
if not refs:
continue
for r in refs:
# Only union if the referenced file exists among discovered files
if r in base_to_paths:
uf.union(b, r)
# Collect components
comp: dict[str, set[str]] = {}
for b in base_to_paths.keys():
root = uf.find(b)
comp.setdefault(root, set()).add(b)
representatives: list[str] = []
skipped = 0
for root, members in comp.items():
if len(members) == 1:
# singletons: keep all their concrete paths (could be basename collisions)
b = next(iter(members))
representatives.extend(base_to_paths[b])
continue
# Multifile component: choose deterministic representative path
# Pick lexicographically smallest basename, then lexicographically smallest full path for that basename
members_sorted = sorted(members)
rep_base = members_sorted[0]
rep_path = sorted(base_to_paths[rep_base])[0]
representatives.append(rep_path)
# Skip all other members
for b in members_sorted[1:]:
skipped += len(base_to_paths[b])
if verbose:
print(
f"Detected OME multifile series with {sum(len(base_to_paths[b]) for b in members_sorted)} files "
f"({len(members_sorted)} unique basenames). Using representative: {os.path.basename(rep_path)}"
)
if verbose and skipped:
print(f"Skipped {skipped} files that belong to already detected OME multifile series.")
# Preserve original order as much as possible: sort representatives by their first occurrence in `files`
pos = {p: i for i, p in enumerate(files)}
representatives.sort(key=lambda p: pos.get(p, 10**12))
return representatives
# OMIO's main universal image reader:
[docs]
def imread(fname: Union[str, os.PathLike, List[Union[str, os.PathLike]]],
zarr_store: Union[None, str] = None,
return_list: bool = False,
recursive: bool = False,
folder_stacks: bool = False,
merge_folder_stacks: bool = False,
merge_multiple_files_in_folder: bool = False,
merge_along_axis: str = "T",
zeropadding: bool = True,
physicalsize_xyz: Union[None, Any] = None,
pixelunit: str = "micron",
collapse_ome_multifile_series: bool = True,
verbose: bool = True,
) -> Union[
Tuple[Any, Dict[str, Any]],
Tuple[List[Any], List[Dict[str, Any]]]]:
"""
Read microscopy images and folders into OMIO's canonical representation, with optional
folder stack handling and concatenation based merges.
This is OMIO's high level entry point. It accepts a single file, a list of files, or a
folder path. Supported input formats are TIFF family files (including OME TIFF and LSM),
Zeiss CZI, and Thorlabs RAW. For each file, the corresponding format specific reader is
selected automatically, metadata are standardized, and the returned image is normalized
to OME axis order TZCYX.
If `zarr_store` is set to "memory" or "disk", readers return a Zarr array instead of a
NumPy array. For "disk", Zarr outputs are created in a hidden cache folder `.omio_cache`
next to the source data. This is intended for large files where memory mapping and
chunked access are required downstream.
Folder input behavior
---------------------
If `fname` resolves to a folder, OMIO lists all supported image files inside the folder
(optionally recursive) and reads them in sorted order.
If `folder_stacks=True` or `merge_folder_stacks=True`, the folder is interpreted as one
member of a tagged folder stack family with names like `<TAG>_000`, `<TAG>_001`, etc.
OMIO derives `<TAG>_` from the provided folder name, finds all co folders with the same
tag in the parent directory, reads the first image file in each of these folders, and
returns either the list of stacks or a merged stack.
Merge behavior
--------------
Two merge modes are supported.
* `merge_multiple_files_in_folder=True` merges all images found in a folder by
concatenating along `merge_along_axis`. This is applied after reading all files from
that folder.
* `merge_folder_stacks=True` merges the tagged co folder stacks by concatenating along
`merge_along_axis`.
`merge_along_axis` must be one of {"T", "Z", "C"}. In merge modes, OMIO expects that all
inputs are already in OME order and have 5 dimensions (TZCYX). If `zeropadding=False`,
non merge axes must match exactly, otherwise the merge is aborted with a warning and a
None result. If `zeropadding=True`, non merge axes are padded with zeros up to the
maximum size across inputs before concatenation. The merge axis may have length greater
than one in each input; OMIO concatenates the full segments in the discovered order.
For merge outputs, metadata are merged with a provenance policy that records the inputs
under the `Annotations` namespace and uses stack 0 as the reference for physical size and
time increment fields.
Parameters
----------
fname : str, os.PathLike, or list of such
File path, folder path, or list of file paths to read.
zarr_store : {None, "memory", "disk"}, optional
Controls whether images are returned as NumPy arrays (None) or as materialized Zarr
arrays ("memory" or "disk"). Default is None.
return_list : bool, optional
If True, always return lists of images and metadata. If False, return a single image
and metadata for single input cases, otherwise lists. Default is False.
recursive : bool, optional
If True and `fname` is a folder, search recursively for supported image files.
Default is False.
folder_stacks : bool, optional
If True and `fname` is a folder, interpret it as a tagged folder stack member and
read the first image file from each tagged co folder. Default is False.
merge_folder_stacks : bool, optional
If True, interpret tagged folder stacks and merge them along `merge_along_axis`.
Default is False.
merge_multiple_files_in_folder : bool, optional
If True and `fname` is a folder, merge all files found in that folder along
`merge_along_axis`. Default is False.
merge_along_axis : {"T", "Z", "C"}, optional
Axis along which concatenation is performed in merge modes. Default is "T".
zeropadding : bool, optional
If True, allow merges with mismatched non merge axes by zero padding to maxima. If
False, require exact match on non merge axes. Default is True.
physicalsize_xyz : Any or None, optional
Optional voxel size override forwarded to the underlying readers. Default is None.
pixelunit : str, optional
Unit string forwarded to readers for unit normalization and defaults. Default is
"micron".
collapse_ome_multifile_series : bool, optional
If True, detect OME multifile series and keep only one representative file per
series to avoid duplicate loading. Default is True.
verbose : bool, optional
If True, print diagnostic progress messages. Default is True.
Returns
-------
image, metadata : (Any, dict) or (list[Any], list[dict])
For single non folder inputs and `return_list=False`, returns one image and one
metadata dict. For multi file inputs, folder reads, or `return_list=True`, returns
lists. Merge modes return a single merged image and merged metadata (or lists if
`return_list=True`). If a requested merge fails validation, returns None results
according to the calling branch.
Raises
------
ValueError
If `merge_along_axis` is not one of {"T", "Z", "C"}.
FileNotFoundError
If a requested file path does not exist or is not a file.
"""
if merge_along_axis not in _ALLOWED_MERGE_AXES:
raise ValueError(f"read: merge_along_axis must be one of {sorted(_ALLOWED_MERGE_AXES)}. "
f"Got: {merge_along_axis!r}")
allowed_ext = {".tif", ".tiff", ".lsm", ".czi", ".raw", ".ome.tif", ".ome.tiff"}
# TODO: maybe we shift this variable to a module-level global later
paths = _normalize_to_list(fname)
# folder input cases:
# sanity check:
if merge_folder_stacks:
if verbose:
print(f"merge_folder_stacks={merge_folder_stacks} ⟶ will read and merge from tagged folder stacks.")
if folder_stacks and not merge_folder_stacks:
if verbose:
print(f"folder_stacks={folder_stacks}, merge_folder_stacks={merge_folder_stacks} ⟶ will read from tagged folder stacks.")
if len(paths) == 1 and _is_dir(paths[0]):
folder = paths[0]
if folder_stacks or merge_folder_stacks:
# we expect folder to be one of the TAG_000 style folderstacks, thus, let's search for
# the other TAG_XXX co-folders:
folder_base = os.path.basename(folder)
folder_path_to_base = os.path.dirname(folder)
# first verify, that folder_base contains at least one underscore:
if "_" not in folder_base:
if verbose:
print(f" Could not detect <TAG>_ from folder name: {folder_base!r}.")
print(" Abort merging.")
return ([], []) if return_list else (None, {})
# extract tag:
tag = folder_base.split("_", 1)[0] + "_"
if tag is None:
if verbose:
print(f" Could not detect <TAG>_ from folder name: {folder_base!r}.")
print(" Abort merging.")
return ([], []) if return_list else (None, {})
else:
if verbose:
print(f"Detected folder stack tag: {tag!r}.")
tagfolders = []
for d in os.listdir(folder_path_to_base):
d_full = os.path.join(folder_path_to_base, d)
if not os.path.isdir(d_full):
continue
if d.startswith(tag):
tagfolders.append(d)
if not tagfolders:
if verbose:
print(f" folder_stacks={folder_stacks} or merge_folder_stacks={merge_folder_stacks} requested, but no co-folders with tag '{tag}' found.")
print(" Abort merging.")
return ([], []) if return_list else (None, {})
else:
# sort:
tagfolders = sorted(tagfolders)
# prepend folder-path_to_base to tagfolders' entries:
tagfolders_fullpaths = [os.path.join(folder_path_to_base, tf) for tf in tagfolders]
images = []
metadatas = []
for sf in tagfolders_fullpaths:
f0 = _first_image_file_in_folder(sf, allowed_ext=allowed_ext)
if f0 is None:
if verbose:
print(f" No valid image file found in folder stack: {sf!r}. Skipping.")
continue
img, md = _dispatch_read_file(
f0,
zarr_store=zarr_store,
return_list=False,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
verbose=verbose)
# post-hoc OME metadata checkup and correction:
md = OME_metadata_checkup(md, verbose=verbose)
# update merged image stack and metadata lists:
images.append(img)
metadatas.append(md)
if merge_folder_stacks:
if not images:
if verbose:
print(" No valid images found in any of the folder stacks. Abort merging.")
return ([], []) if return_list else (None, {})
merged_img, merged_md = _merge_folderstacks_with_padding(images, metadatas,
merge_along_axis=merge_along_axis,
zarr_store=zarr_store,
zeropadding=zeropadding,
verbose=verbose)
# post-hoc OME metadata checkup and correction:
if merged_md is not None:
merged_md = OME_metadata_checkup(merged_md, verbose=verbose)
# return result:
if return_list:
return [merged_img], [merged_md]
return merged_img, merged_md
# return results:
if return_list:
return images, metadatas
if len(images) == 1:
return images[0], metadatas[0]
return images, metadatas
# default folder behavior: read all image files in folder:
files = _list_image_files_in_folder(folder, allowed_ext=allowed_ext, recursive=recursive)
if collapse_ome_multifile_series:
files = _collapse_ome_multifile_series(files, verbose=verbose)
if not files:
return ([], []) if return_list else (None, {})
images = []
metadatas = []
for f in files:
img, md = _dispatch_read_file(
f,
zarr_store=zarr_store,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
return_list=False,
verbose=verbose)
images.append(img)
metadatas.append(md)
if merge_multiple_files_in_folder:
merged_img, merged_md = _merge_concat_along_axis(
images, metadatas,
merge_along_axis=merge_along_axis,
zarr_store=zarr_store,
namespace="omio:merge_multiple_files_in_folder",
zeropadding=zeropadding,
verbose=verbose)
if merged_img is None:
if return_list:
return [None], [None]
return None, None
if return_list:
return [merged_img], [merged_md]
return merged_img, merged_md
if return_list:
return images, metadatas
if len(images) == 1:
return images[0], metadatas[0]
return images, metadatas
# file input or list of files:
images = []
metadatas = []
for p in paths:
if not _is_file(p):
raise FileNotFoundError(f"Path does not exist or is not a file: {p}")
img, md = _dispatch_read_file(
p,
zarr_store=zarr_store,
return_list=False,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
verbose=verbose)
images.append(img)
metadatas.append(md)
if return_list:
return images, metadatas
if len(images) == 1:
return images[0], metadatas[0]
return images, metadatas
# OMIO'S universal converter (=imreader + imwrite):
[docs]
def imconvert(fname: Union[str, os.PathLike, List[Union[str, os.PathLike]]],
zarr_store: Union[None, str] = None,
recursive: bool = False,
folder_stacks: bool = False,
merge_folder_stacks: bool = False,
merge_multiple_files_in_folder: bool = False,
merge_along_axis: str = "T",
collapse_ome_multifile_series: bool = True,
zeropadding: bool = True,
physicalsize_xyz: Union[None, Any] = None,
pixelunit: str = "micron",
compression_level: int = 3,
relative_path: Union[None, str] = "omio_converted",
overwrite: bool = False,
return_fnames: bool = False,
cleanup_cache: bool = True,
verbose: bool = True) -> Union[None, List[str]]:
"""
Convert microscopy image inputs to OME TIFF using OMIO's reader plus OME TIFF writer.
This function is a convenience wrapper around `imread(...)` followed by
`imwrite(...)`. It accepts a single file path, a list of file paths, or a
folder path, reads the input data into OMIO's canonical representation (OME ordered
axes TZCYX plus standardized metadata), and writes one OME TIFF per resulting image
stack.
Input path semantics (inherited from `imread(...)`)
---------------------------------------------------
Input handling and optional merges follow the same semantics as `imread(...)`:
folder reading can be recursive, tagged folder stacks can be interpreted as a sequence
of co folders, and merge operations can concatenate multiple stacks along a chosen OME
axis ("T", "Z", or "C"), optionally with zero padding on non merge axes.
The behavior depends on the type and structure of `fname`:
Single file path
The file is read according to its extension (TIFF, OME TIFF, LSM, CZI, or RAW),
converted to OMIO's internal representation, and written as a single OME TIFF.
List of file paths
Each file is read independently. By default, one OME TIFF per input file is
written. If merge options are enabled (for example
``merge_multiple_files_in_folder``), files may be concatenated before writing.
Folder path
By default, all supported image files in the folder are read, optionally
recursively if ``recursive=True``, and written as individual OME TIFF files.
Additional folder specific modes are available:
* ``folder_stacks=True``:
The folder is interpreted as one element of a tagged folder stack
(for example ``TAG_000``, ``TAG_001``). The first valid image file from each
tagged folder is read and written as a separate OME TIFF.
* ``merge_folder_stacks=True``:
Tagged folder stacks are read as above, but the resulting stacks are
concatenated along ``merge_along_axis`` and written as a single merged
OME TIFF.
* ``merge_multiple_files_in_folder=True``:
All image files found in the folder are concatenated along
``merge_along_axis`` and written as a single merged OME TIFF.
Merge behavior
--------------
Merge operations follow the same validation and padding rules as in `imread(...)`:
* Allowed merge axes are "T", "Z", and "C".
* If `zeropadding=False`, all non merge axes must match exactly.
* If `zeropadding=True`, non merge axes are padded with zeros to the maximum size
across inputs before concatenation.
Output behavior
---------------
The output location and naming follow `imwrite(...)`:
* OME TIFFs are written next to the input file or inside the input folder.
* If `relative_path` is provided, a subfolder is created under the chosen output
parent directory.
* When merge modes are used, output filenames may include an indicator suffix to
reflect merged content.
* If `overwrite=False`, existing files are not replaced and collision safe names
are generated.
Zarr handling and cache cleanup
-------------------------------
If `zarr_store` is "memory" or "disk", `imread(...)` may create Zarr arrays or
materialize intermediate Zarr stores under a hidden `.omio_cache` directory.
If `cleanup_cache=True`, this function removes the corresponding cache entries
after writing. Cache cleanup is skipped when `zarr_store=None`.
Parameters
----------
fname : str, os.PathLike, or list of such
File path, folder path, or list of file paths to convert.
zarr_store : {None, "memory", "disk"}, optional
Controls whether `imread(...)` returns NumPy arrays (None) or Zarr arrays
("memory" or "disk"). Default is None.
recursive : bool, optional
If True and `fname` is a folder, search recursively for supported image files.
Default is False.
folder_stacks : bool, optional
Interpret a tagged folder as part of a folder stack and read one image per
tagged subfolder. Default is False.
merge_folder_stacks : bool, optional
Merge tagged folder stacks along `merge_along_axis` and write a single OME TIFF.
Default is False.
merge_multiple_files_in_folder : bool, optional
Merge all image files found in a folder along `merge_along_axis` and write a
single OME TIFF. Default is False.
merge_along_axis : {"T", "Z", "C"}, optional
Axis along which concatenation is performed in merge modes. Default is "T".
collapse_ome_multifile_series : bool, optional
If True, detect OME multifile series and keep only one representative file per
series to avoid duplicate loading. Default is True.
zeropadding : bool, optional
Allow padding of non merge axes during merges. Default is True.
physicalsize_xyz : Any or None, optional
Optional voxel size override forwarded to the underlying readers. Default is None.
pixelunit : str, optional
Unit string forwarded to readers for unit normalization. Default is "micron".
compression_level : int, optional
Zlib compression level passed to `imwrite(...)`. Default is 3.
relative_path : str or None, optional
Optional relative subfolder under the output parent directory where OME TIFFs
are written. Default is "omio_converted".
overwrite : bool, optional
Control overwriting behavior for existing outputs. Default is False.
return_fnames : bool, optional
If True, return the list of written OME TIFF filenames. Default is False.
cleanup_cache : bool, optional
Remove `.omio_cache` entries after writing when Zarr output was used.
Default is True.
verbose : bool, optional
Print diagnostic progress messages. Default is True.
Returns
-------
None or list[str]
If `return_fnames=True`, returns a list of output OME TIFF paths.
Otherwise returns None.
Raises
------
ValueError
If invalid merge options are provided.
FileNotFoundError
If an input file does not exist.
Other exceptions
Reader and writer errors may propagate during I O or metadata handling.
"""
if verbose:
print(f"Converting to OME-TIFF: {fname!r}")
#print(f"Reading input...")
images, metadatas = imread(
fname=fname,
zarr_store=zarr_store,
recursive=recursive,
folder_stacks=folder_stacks,
merge_folder_stacks=merge_folder_stacks,
merge_multiple_files_in_folder=merge_multiple_files_in_folder,
merge_along_axis=merge_along_axis,
collapse_ome_multifile_series=collapse_ome_multifile_series,
zeropadding=zeropadding,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
verbose=verbose)
#print(f"Writing OME-TIFF output...")
if images is None or metadatas is None:
if verbose:
print("No images or metadata to write. Conversion aborted.")
return None
fnames_written = imwrite(
fname=fname,
images=images,
metadatas=metadatas,
compression_level=compression_level,
relative_path=relative_path,
overwrite=overwrite,
indicate_merged_files=merge_multiple_files_in_folder or merge_folder_stacks,
return_fnames=True,
verbose=verbose)
""" print(f"Written {len(fnames_written)} OME-TIFF files:")
for f in fnames_written:
print(f" {f}") """
if cleanup_cache:
if zarr_store is not None:
#cleanup_omio_cache(fname, full_cleanup=False, verbose=verbose)
if os.path.isdir(str(fname)):
cleanup_omio_cache(fname, full_cleanup=True, verbose=verbose)
else:
cleanup_omio_cache(fname, full_cleanup=False, verbose=verbose)
else:
if verbose:
print(f"Skipping omio cache cleanup because zarr_store=None.")
if return_fnames:
return fnames_written
# %% BIDS BATCH CONVERTER
# helper function for name matching:
def _match_name(name: str, pattern: str, mode: str) -> bool:
"""
Match a string against a pattern using a selectable matching mode.
This helper provides a small, explicit abstraction over common name matching
strategies used throughout OMIO, for example when selecting files, folders,
or tagged stack components.
Supported matching modes
------------------------
* "startswith":
Return True if `name` starts with `pattern`, equivalent to
`name.startswith(pattern)`.
* "exact":
Return True if `name` and `pattern` are identical strings.
* "regex":
Interpret `pattern` as a regular expression and return True if
`re.match(pattern, name)` succeeds. The match is anchored at the beginning
of `name`, following Python's `re.match` semantics.
Parameters
----------
name : str
The string to be tested, typically a filename or folder name.
pattern : str
The pattern to match against `name`. Interpreted according to `mode`.
mode : {"startswith", "exact", "regex"}
Matching strategy to use.
Returns
-------
bool
True if the match succeeds under the selected mode, False otherwise.
Raises
------
ValueError
If `mode` is not one of the supported values {"startswith", "exact", "regex"}.
Notes
-----
This function does not perform any normalization (such as lowercasing) of
either `name` or `pattern`. Callers are responsible for ensuring consistent
string preprocessing when required.
"""
if mode == "startswith":
return name.startswith(pattern)
if mode == "exact":
return name == pattern
if mode == "regex":
return re.match(pattern, name) is not None
raise ValueError(f"_match_name: invalid mode={mode!r}. Allowed: 'startswith','exact','regex'.")
# OMIO's BIDS-like batch converter function:
[docs]
def bids_batch_convert(
fname: str, # must be a directory
sub: str, # e.g. "ID" (subject folder detection)
exp: str, # e.g. "TP000" (experiment folder detection)
exp_match_mode: str = "startswith", # "startswith" | "exact" | "regex"
tagfolder: str | None = None, # e.g. "TAG_" (if set: only tagged folders inside exp)
merge_multiple_files_in_folder: bool = False,
merge_tagfolders: bool = False, # if tagfolder is not None: merge TAGFOLDER_01..N into one output
merge_along_axis: str = "T",
collapse_ome_multifile_series: bool = True,
zeropadding: bool = True,
zarr_store: str | None = None,
recursive: bool = False,
physicalsize_xyz=None,
pixelunit: str = "micron",
compression_level: int = 3,
relative_path: str | None = "omio_converted",
overwrite: bool = False,
cleanup_cache: bool = True,
return_fnames: bool = False,
verbose: bool = True):
"""
Batch converter for a BIDS-like directory tree.
This function traverses a project root folder and converts image files found in a
subject and experiment hierarchy into OME-TIFF using OMIO’s reader and writer.
It supports two main discovery modes: direct conversion of image files located
inside experiment folders, or conversion and optional merging of tagged
subfolders (folder-stacks) inside experiment folders.
Abstract expected folder scheme
-------------------------------
The converter expects a project root that contains subject folders, which in turn
contain experiment folders. Depending on whether `tagfolder` is provided, an
experiment folder either contains image files directly, or contains multiple
tagfolders which contain the image files.
The schematic below uses ``<...>`` as placeholders for your chosen naming policy::
project_root (= fname)
├─ <sub*>
│ ├─ <exp*>
│ │ ├─ image_01.tif / image_01.ome.tif / image_01.lsm / image_01.czi / image_01.raw
│ ├─ <exp*>
│ │ ├─ image_01.tif / image_01.ome.tif / image_01.lsm / image_01.czi / image_01.raw
│ │ ├─ image_02.tif / image_02.ome.tif / image_02.lsm / image_02.czi / image_02.raw
│ │ └─ ...
│ ├─ <exp*>
│ │ ├─ <tagfolder*>01
│ │ │ ├─ image_01.tif / image_01.czi / image_01.raw / ...
│ │ │ └─ ...
│ │ ├─ <tagfolder*>02
│ │ │ ├─ image_02.tif / image_02.czi / image_02.raw / ...
│ │ │ └─ ...
│ │ └─ ...
│ └─ ...
└─ <sub*>
└─ ...
Where:
* ``<sub*>`` are subject folders detected by prefix matching with ``sub``.
For example, if ``sub="sub"``, then ``"sub-01"``, ``"sub01"``, ``"sub_01"``, and
``"sub-A"`` all match, because this function uses ``startswith(sub)`` only.
* ``<exp*>`` are experiment folders detected within each subject folder via ``exp`` and
``exp_match_mode`` (``"startswith"``, ``"exact"``, or ``"regex"``).
* ``<tagfolder*>`` are optional tagfolders detected within an experiment folder via
prefix matching with ``tagfolder`` (for example ``"TAG_"``).
If ``tagfolder`` is set, direct image files in ``<exp*>`` are ignored and only
tagfolders are processed.
Folder discovery and selection
------------------------------
The input ``fname`` must be a directory and is treated as the project root.
Subject detection:
* Every immediate subdirectory of ``fname`` whose name starts with ``sub`` is treated
as a subject folder. No additional validation is performed.
Experiment detection:
* Within each subject folder, every immediate subdirectory whose name matches ``exp``
under ``exp_match_mode`` is treated as an experiment folder.
Matching modes are:
* ``"startswith"``: folder name starts with ``exp``
* ``"exact"``: folder name equals ``exp``
* ``"regex"``: ``re.match(exp, foldername)`` succeeds
Conversion behavior inside each experiment folder
-------------------------------------------------
Two mutually exclusive modes exist depending on `tagfolder`.
Mode A: tagfolder is None (direct file conversion):
* The converter processes image files located directly in the experiment folder.
* If ``merge_multiple_files_in_folder=False``, every supported image file is
converted to its own OME-TIFF output.
* If ``merge_multiple_files_in_folder=True``, all supported image files in the
experiment folder are read and concatenated along ``merge_along_axis`` (with
optional ``zeropadding`` on non-merge axes) into one merged output.
Mode B: tagfolder is not None (tagged folder stacks):
* Direct image files in the experiment folder are ignored.
* The converter searches for tagfolders inside the experiment folder whose name
starts with ``tagfolder`` (for example ``"TAG_"``).
* If ``merge_tagfolders=False`` (default), each tagfolder is converted separately
and produces its own OME-TIFF output.
* If ``merge_tagfolders=True``, all tagfolders are read and merged into a single
output by reusing OMIO’s folder-stack logic. To keep output naming stable and
collision-free when provenance-driven naming is used, a synthetic provenance
name is injected into ``metadata["Annotations"]["original_filename"]``.
Input path semantics
--------------------
Only directory input is accepted:
* ``fname`` must be an existing directory and is treated as the project root.
* All outputs are written within the experiment scope determined by traversal.
Output placement and naming
---------------------------
Output placement follows OMIO’s writer conventions via ``imconvert()`` and
``imwrite()``:
* If ``relative_path`` is not None, outputs are written into a subfolder named
``relative_path`` under the relevant experiment folder (or under the experiment
folder when writing a merged tagfolder product).
* If ``relative_path`` is None, outputs are written directly into the experiment
folder.
* Per-stack output basenames are preferably derived from metadata provenance via
``Annotations["original_filename"]`` when present. Otherwise, a fallback basename
is derived from the corresponding folder name.
* If ``overwrite=False``, name collisions are resolved by appending an incrementing
suffix to the output filename.
Merging semantics
-----------------
* ``merge_along_axis`` must be one of {"T","Z","C"}.
* In merge operations, the merge axis segments are concatenated in discovery order.
* If ``zeropadding=True``, non-merge axes may differ between inputs and will be
padded with zeros to the maximum size across inputs before concatenation.
If ``zeropadding=False``, non-merge axes must match exactly or the merge is aborted.
Zarr and cache handling
-----------------------
* ``zarr_store`` controls whether intermediate data are represented as NumPy in RAM
or as Zarr arrays ("memory" or "disk") during reading and merging.
* If ``cleanup_cache=True`` and ``zarr_store`` is not None, the function removes the
per-input `.omio_cache` artifacts created during conversion once outputs are written.
Parameters
----------
fname : str
Project root directory (must exist).
sub : str
Prefix used to detect subject folders at the project root level.
exp : str
Pattern used to detect experiment folders within each subject.
exp_match_mode : {"startswith","exact","regex"}
Matching strategy for experiment folder selection.
tagfolder : str or None
If None, convert direct files in experiment folders. If set, only process
tagged subfolders inside experiment folders whose names start with `tagfolder`.
merge_multiple_files_in_folder : bool
If tagfolder is None, optionally merge all image files in an experiment folder
into a single output.
merge_tagfolders : bool
If tagfolder is set, optionally merge all detected tagfolders into a single output.
merge_along_axis : {"T","Z","C"}
Axis along which merges are performed.
collapse_ome_multifile_series : bool
If True, detect and collapse OME multifile series during reading to avoid
duplicate loading.
zeropadding : bool
If True, allow mismatched non-merge axes by padding with zeros before merging.
zarr_store : {None,"memory","disk"}
Intermediate representation for reading and merging.
recursive : bool
Passed through to the underlying folder readers for file discovery.
physicalsize_xyz : tuple or None
Optional override for physical voxel sizes.
pixelunit : str
Unit string for pixel size fields (default "micron").
compression_level : int
zlib compression level for OME-TIFF writing.
relative_path : str or None
Subfolder name for outputs under experiment folders. Default "omio_converted".
overwrite : bool
If True, existing output files may be overwritten. Otherwise, collision-safe
suffixing is used.
cleanup_cache : bool
If True, remove `.omio_cache` artifacts created during conversion.
return_fnames : bool
If True, return a list of all written output filenames.
verbose : bool
If True, print progress and diagnostic messages.
Returns
-------
list[str] or None
If `return_fnames=True`, returns a list of written OME-TIFF file paths.
Otherwise returns None. The list may be empty if nothing matched or all
conversions failed.
"""
if fname is None or not os.path.isdir(str(fname)):
raise ValueError(f"bids_batch_convert: fname must be an existing directory. Got: {fname!r}\n"
"Conversion aborted.")
if merge_along_axis not in _ALLOWED_MERGE_AXES:
raise ValueError(
f"bids_batch_convert: merge_along_axis must be one of {sorted(_ALLOWED_MERGE_AXES)}.\n"
f"Got: {merge_along_axis!r}\n"
"Conversion aborted.")
project = str(fname)
written_all = []
# subject folders: startswith(sub) only; OMIO policy: OMIO will treat all folders
# found here as subjects; thus, if the user is messy with their folder naming,
# they may get unexpected results.
subs = []
subjects_list = []
for d in sorted(os.listdir(project)):
full = os.path.join(project, d)
if os.path.isdir(full) and d.startswith(sub):
subs.append(full)
subjects_list.append(d)
if verbose:
print(f"OMIO batch processor received BIDS project named={os.path.basename(project)!r}")
print(f"in given root path={os.path.dirname(project)!r}.")
print(f"Detected subjects with provided subject tag={sub!r} are:")
for s in subjects_list:
print(f" {s}")
print(f"⟶ {len(subs)} subject(s)")
print(f"Will now look for experiment folders matching {exp!r} with mode={exp_match_mode!r} inside each subject.")
if not subs:
warnings.warn(f"[OMIO batch] No subject folders found in {project!r} starting with {sub!r}.")
if return_fnames:
return written_all
# loop over subjects:
for sub_path in subs:
# sub_path = subs[0] # for testing
sub_name = os.path.basename(sub_path)
if verbose:
print(f"\nBatch processing subject {sub_name}...")
# experiment folders inside subject:
exp_folders = []
for d in sorted(os.listdir(sub_path)):
full = os.path.join(sub_path, d)
if not os.path.isdir(full):
if verbose:
print(f" Not a directory: {full!r}. Skipping.")
continue
if _match_name(d, exp, exp_match_mode):
exp_folders.append(full)
if verbose:
print(f" {len(exp_folders)} matched experiment folder(s) with exp-tag {exp!r} found with mode={exp_match_mode!r}:")
for ef in exp_folders:
print(f" {os.path.basename(ef)!r}")
if not exp_folders:
if verbose:
print(f" No exp folders matched {exp!r} with mode={exp_match_mode!r}. Skipping subject.")
continue
# loop over experiments:
for exp_path in exp_folders:
# exp_path = exp_folders[0] # for testing
exp_name = os.path.basename(exp_path)
if verbose:
print(f" Processing '{exp_name}' exp folder...\n")
# default relative path per case:
rel_default = relative_path
# -------------------------
# Case A: no tagfolder -> direct files in exp_path
# -------------------------
if tagfolder is None:
try:
fnames_written = imconvert(
fname=exp_path,
zarr_store=zarr_store,
recursive=recursive,
folder_stacks=False,
merge_folder_stacks=False,
merge_multiple_files_in_folder=merge_multiple_files_in_folder,
merge_along_axis=merge_along_axis,
zeropadding=zeropadding,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
compression_level=compression_level,
relative_path=rel_default,
overwrite=overwrite,
return_fnames=True,
cleanup_cache=cleanup_cache,
verbose=verbose)
if verbose:
print("\n")
if isinstance(fnames_written, list):
written_all.extend(fnames_written)
except Exception as e:
if verbose:
print(f" Conversion failed (direct files). Are there any image files in {exp_path!r}?\n"
f" Or did you forget to set tagfolder=?\n"
f" Error: {type(e).__name__}: {e}")
continue
# -------------------------
# Case B: tagfolder set -> only tagged folders inside exp_path
# -------------------------
tagfolders = []
for d in sorted(os.listdir(exp_path)):
full = os.path.join(exp_path, d)
if os.path.isdir(full) and d.startswith(tagfolder):
tagfolders.append(full)
if not tagfolders:
if verbose:
print(f" tagfolder={tagfolder!r} requested, but no tagfolders found. Skipping exp.")
continue
if verbose:
print(f" found {len(tagfolders)} tagfolder(s) starting with {tagfolder!r}")
rel_tag = relative_path
# -------------------------
# B1: default = each tagfolder gets its own output
# -------------------------
if not merge_tagfolders:
for tf in tagfolders:
tf_name = os.path.basename(tf)
if verbose:
print(f" {tf_name}: converting tagfolder...\n")
try:
fnames_written = imconvert(
fname=tf,
zarr_store=zarr_store,
recursive=recursive,
folder_stacks=False, # ⟵ important: we are already in a tagfolder!
merge_folder_stacks=False,
merge_multiple_files_in_folder=merge_multiple_files_in_folder,
merge_along_axis=merge_along_axis,
zeropadding=zeropadding,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
compression_level=compression_level,
relative_path=rel_tag,
overwrite=overwrite,
return_fnames=True,
cleanup_cache=cleanup_cache)
if verbose:
print("\n")
if isinstance(fnames_written, list):
written_all.extend(fnames_written)
except Exception as e:
if verbose:
print(f" conversion failed (tagfolder).\n"
f" Error: {type(e).__name__}: {e}")
continue
# -------------------------
# B2: merge_tagfolders=True -> merge ALL tagfolders into ONE output
# Writer uses original_filename; for a merged product we inject a synthetic
# provenance name to avoid collisions and make output self-describing.
# -------------------------
try:
# Read and merge by reusing my imread TAG-folder logic:
merged_img, merged_md = imread(
fname=tagfolders[0], # imread expects one of the tagfolders; it auto-detects the tag
zarr_store=zarr_store,
return_list=False,
recursive=recursive,
folder_stacks=True,
merge_folder_stacks=True, # triggers reading of all tagfolders and merging
merge_multiple_files_in_folder=False,
merge_along_axis=merge_along_axis,
collapse_ome_multifile_series=collapse_ome_multifile_series,
zeropadding=zeropadding,
physicalsize_xyz=physicalsize_xyz,
pixelunit=pixelunit,
verbose=verbose)
if verbose:
print("\n")
if merged_img is None or merged_md is None:
if verbose:
print(f" {exp_name}: merge_tagfolders produced None. Skipping.")
continue
# Inject synthetic provenance name so writer can stay "original_filename-driven":
# This avoids depending on fname basename or exp folder name.
merged_md = dict(merged_md)
ann = merged_md.get("Annotations", {})
if not isinstance(ann, dict):
ann = {}
ann = dict(ann)
ann["original_filename"] = f"{sub_name}_{exp_name}_{tagfolder}merged.ome.tif"
merged_md["Annotations"] = ann
# Write merged output at exp level (not inside a tagfolder).
# We call writer with fname=exp_path to place output in exp scope.
fnames_written = imwrite(
fname=exp_path,
images=merged_img,
metadatas=merged_md,
compression_level=compression_level,
relative_path=relative_path if relative_path is not None else "merged",
overwrite=overwrite,
return_fnames=True,
verbose=verbose,
indicate_merged_files=True)
if isinstance(fnames_written, list):
written_all.extend(fnames_written)
if cleanup_cache and zarr_store is not None:
cleanup_omio_cache(exp_path, full_cleanup=False, verbose=verbose)
except Exception as e:
if verbose:
print(f" {exp_name}: conversion failed (merge_tagfolders). "
f" Error: {type(e).__name__}: {e}")
if verbose:
print(f"\nOMIO batch processing done. Written {len(written_all)} file(s).")
for f in written_all:
print(f" {f}")
if return_fnames:
return written_all
# %% END