Source code for ewoksid12.tasks.hdf5_to_ascii

from __future__ import annotations

import logging
import os
from typing import Any

from blissdata.h5api import dynamic_hdf5
from ewokscore.task import Task

from .io import save_as_ascii

logger = logging.getLogger(__name__)


[docs] class Hdf5ToAscii( Task, input_names=["filename", "output_dir"], optional_input_names=[ "scan_numbers", "retry_timeout", "retry_period", "counters", "has_subscan", ], output_names=["counters", "output_filenames"], ): """Save 1D data from Bliss HDF5 scans in ID12 ASCII files."""
[docs] def run(self) -> None: filename: str = self.inputs.filename scan_numbers: list[int] | None = self.get_input_value("scan_numbers", None) retry_timeout: float | None = self.get_input_value("retry_timeout", None) retry_period: float | None = self.get_input_value("retry_period", 1) raw_counters: list[str] | str = self.get_input_value("counters", "all") has_subscan: bool = self.get_input_value("has_subscan", False) # Convert counters to a list if it is a string. if isinstance(raw_counters, str): raw_counters = [raw_counters] output_dir: str = self.inputs.output_dir if output_dir: os.makedirs(output_dir, exist_ok=True) output_filenames: list[str] = [] scans_counters: list[list[str]] = [] failed_scans: list[str] = [] with dynamic_hdf5.File( filename, retry_timeout=retry_timeout, retry_period=retry_period ) as nxroot: scan_names: list[str] if scan_numbers: scan_names = [f"{scannr}.1" for scannr in scan_numbers] if has_subscan: scan_names.extend([f"{scannr}.2" for scannr in scan_numbers]) else: # Convert HDF5 file top-level groups to a list of strings. scan_names = list(nxroot) for scan_name in scan_names: scan_number, subscan_number = scan_name.split(".") try: _ = nxroot[f"/{scan_name}/end_time"] # wait for scan to finish measurement = nxroot[f"/{scan_name}/measurement"] data: dict[str, Any] = {} # Create a fresh list of counters for each scan. scan_counters: list[str] = list(raw_counters) # Replace all occurrence of "all". if "all" in scan_counters: while "all" in scan_counters: scan_counters.remove("all") scan_counters.extend(measurement.keys()) # Remove duplicates while preserving order. scan_counters = list(dict.fromkeys(scan_counters)) for scan_counter in scan_counters: try: dataset = measurement[scan_counter] except Exception as e: logger.warning( "Skipping counter '%s' in scan %s: %s (%s)", scan_counter, scan_name, str(e), e.__class__.__name__, ) continue if dataset.ndim == 1: data[scan_counter] = dataset[()] except Exception as e: failed_scans.append(scan_number) logger.error( "Processing scan %s::/%s failed (%s)", filename, scan_name, e ) continue if not data: # Scan has no data to save continue stem = os.path.splitext(os.path.basename(filename))[0] output_filename = ( f"scan{int(scan_number):03d}_{subscan_number}_{stem}.dat" ) save_as_ascii(os.path.join(output_dir, output_filename), data) output_filenames.append(output_filename) scans_counters.append(scan_counters) if failed_scans: raise RuntimeError(f"Failed scans (see logs why): {failed_scans}") self.outputs.counters = scans_counters self.outputs.output_filenames = output_filenames