Source code for ewoksid12.tests.test_hdf5_to_ascii
from typing import List
import h5py
import pytest
from ..tasks.hdf5_to_ascii import Hdf5ToAscii
[docs]
@pytest.mark.parametrize(
"counters",
[
["counter1", "counter2"],
"all",
["counter2", "all"],
["counter2"],
],
)
def test_hdf5_to_spec(tmp_path, counters):
filename = tmp_path / "RAW_DATA" / "bliss_dataset.h5"
filename.parent.mkdir()
filename = str(filename)
output_filename = str(tmp_path / "PROCESSED_DATA" / "bliss_dataset.dat")
nscans = 3
nchannels = 10
with h5py.File(filename, "w") as nxroot:
for scan in range(1, nscans + 1):
_save_scan_content(nxroot, scan, nchannels)
inputs = {
"filename": filename,
"scan_numbers": list(range(1, nscans + 1)),
"output_filename": output_filename,
"counters": counters,
}
task = Hdf5ToAscii(inputs=inputs)
task.run()
output_filenames = [
str(tmp_path / "PROCESSED_DATA" / f"scan{i:03d}_bliss_dataset.dat")
for i in range(1, nscans + 1)
]
output_values = task.get_output_values()
assert output_values["output_filenames"] == output_filenames
counters = output_values["counters"]
_assert_ascii_content(output_filenames, counters, nchannels)
[docs]
def test_hdf5_to_spec_failed(tmp_path):
filename = tmp_path / "RAW_DATA" / "bliss_dataset.h5"
filename.parent.mkdir()
filename = str(filename)
output_filename = str(tmp_path / "PROCESSED_DATA" / "bliss_dataset.dat")
nscans = 1
nchannels = 10
with h5py.File(filename, "w") as nxroot:
for scan in range(1, nscans + 1):
_save_scan_content(nxroot, scan, nchannels)
inputs = {
"filename": filename,
"scan_numbers": list(range(1, nscans + 3)),
"output_filename": output_filename,
"counters": ["counter1", "counter2"],
"retry_timeout": 0.1,
}
task = Hdf5ToAscii(inputs=inputs)
with pytest.raises(
RuntimeError, match=r"^Failed scans \(see logs why\): \[2, 3\]$"
):
task.run()
output_filenames = [tmp_path / "PROCESSED_DATA" / "scan001_bliss_dataset.dat"]
counters = inputs["counters"]
_assert_ascii_content(output_filenames, counters, nchannels)
def _save_scan_content(nxroot: h5py.Group, scan: int, nchannels: int) -> None:
nxroot[f"/{scan}.1/start_time"] = "start_time"
nxroot[f"/{scan}.1/title"] = "timescan 0.1"
for counter in ["counter1", "counter2"]:
factor = 10 ** (int(counter[-1]) - 1)
nxroot[f"/{scan}.1/measurement/{counter}"] = [
scan + i / factor for i in range(nchannels)
]
nxroot[f"/{scan}.1/end_time"] = "end_time"
def _assert_ascii_content(
output_filenames: List[str],
counters: List[str],
nchannels: int,
):
for scan, filename in enumerate(output_filenames, 1):
with open(filename, "r") as f:
header = " ".join(counters)
expected_lines = [header]
for i in range(nchannels):
values = []
for counter in counters:
# Calculate factor based on the suffix of the counter name.
factor = 10 ** (int(counter[-1]) - 1)
value = scan + i / factor
values.append(str(value))
expected_lines.extend([" ".join(values)])
actual_lines = [s.rstrip() for s in f.readlines()]
assert actual_lines == expected_lines