Source code for ewoksid12.tasks.hdf5_to_ascii

import os
import logging
from typing import Optional, Sequence

from ewokscore.task import Task
from blissdata.h5api import dynamic_hdf5

from .io import save_as_ascii

logger = logging.getLogger(__name__)


[docs] class Hdf5ToAscii( Task, input_names=["filename", "output_filename"], optional_input_names=["scan_numbers", "retry_timeout", "retry_period"], output_names=["output_filenames"], ): """Save 1D data from Bliss HDF5 scans in ID12 ASCII files."""
[docs] def run(self): filename: str = self.inputs.filename scan_numbers: Optional[Sequence[int]] = self.get_input_value( "scan_numbers", None ) retry_timeout: Optional[float] = self.get_input_value("retry_timeout", None) retry_period: Optional[float] = self.get_input_value("retry_period", 1) output_filename: str = self.inputs.output_filename dirname = os.path.dirname(output_filename) if dirname: os.makedirs(dirname, exist_ok=True) basename = os.path.basename(output_filename) output_filenames = list() failed_scans = list() with dynamic_hdf5.File( filename, retry_timeout=retry_timeout, retry_period=retry_period ) as nxroot: if scan_numbers: scan_names = [f"{scannr}.1" for scannr in scan_numbers] else: scan_names = nxroot for scan_name in scan_names: scan_number = int(float(scan_name)) try: _ = nxroot[f"/{scan_name}/end_time"] # wait for scan to finish measurement = nxroot[f"/{scan_name}/measurement"] data = dict() for name in measurement: dataset = measurement[name] if dataset.ndim == 1: data[name] = dataset[()] except Exception as e: failed_scans.append(scan_number) logger.error( "Processing scan %s::/%s failed (%s)", filename, scan_name, e ) continue if not data: # Scan has no data to save continue scan_output_filename = os.path.join( dirname, f"scan{scan_number:03d}_{basename}" ) save_as_ascii(scan_output_filename, data) output_filenames.append(scan_output_filename) if failed_scans: raise RuntimeError(f"Failed scans (see logs why): {failed_scans}") self.outputs.output_filenames = output_filenames