Source code for ewoksid12.tests.test_hdf5_to_ascii
from typing import List
import h5py
import pytest
from ..tasks.hdf5_to_ascii import Hdf5ToAscii
[docs]
def test_hdf5_to_spec(tmp_path):
filename = tmp_path / "RAW_DATA" / "bliss_dataset.h5"
filename.parent.mkdir()
filename = str(filename)
output_filename = str(tmp_path / "PROCESSED_DATA" / "bliss_dataset.dat")
nscans = 3
nchannels = 10
with h5py.File(filename, "w") as nxroot:
for scan in range(1, nscans + 1):
_save_scan_content(nxroot, scan, nchannels)
inputs = {
"filename": filename,
"scan_numbers": list(range(1, nscans + 1)),
"output_filename": output_filename,
}
task = Hdf5ToAscii(inputs=inputs)
task.run()
output_filenames = [
str(tmp_path / "PROCESSED_DATA" / f"scan{i:03d}_bliss_dataset.dat")
for i in range(1, nscans + 1)
]
assert task.get_output_values() == {"output_filenames": output_filenames}
_assert_ascii_content(output_filenames, nchannels)
[docs]
def test_hdf5_to_spec_failed(tmp_path):
filename = tmp_path / "RAW_DATA" / "bliss_dataset.h5"
filename.parent.mkdir()
filename = str(filename)
output_filename = str(tmp_path / "PROCESSED_DATA" / "bliss_dataset.dat")
nscans = 1
nchannels = 10
with h5py.File(filename, "w") as nxroot:
for scan in range(1, nscans + 1):
_save_scan_content(nxroot, scan, nchannels)
inputs = {
"filename": filename,
"scan_numbers": list(range(1, nscans + 3)),
"output_filename": output_filename,
"retry_timeout": 0.1,
}
task = Hdf5ToAscii(inputs=inputs)
with pytest.raises(
RuntimeError, match=r"^Failed scans \(see logs why\): \[2, 3\]$"
):
task.run()
output_filenames = [tmp_path / "PROCESSED_DATA" / "scan001_bliss_dataset.dat"]
_assert_ascii_content(output_filenames, nchannels)
def _save_scan_content(nxroot: h5py.Group, scan: int, nchannels: int) -> None:
nxroot[f"/{scan}.1/start_time"] = "start_time"
nxroot[f"/{scan}.1/title"] = "timescan 0.1"
nxroot[f"/{scan}.1/measurement/counter1"] = [scan + i for i in range(nchannels)]
nxroot[f"/{scan}.1/measurement/counter2"] = [
scan + i / 10 for i in range(nchannels)
]
nxroot[f"/{scan}.1/end_time"] = "end_time"
def _assert_ascii_content(output_filenames: List[str], nchannels: int):
for scan, filename in enumerate(output_filenames, 1):
with open(filename, "r") as f:
lines = [s.rstrip() for s in f.readlines()]
expected = ["counter1 counter2"]
expected += [f"{scan + i} {scan + i/10}" for i in range(nchannels)]
assert lines == expected