Skip to content

Commit

Permalink
Implement LOQ rundetection rules (#327)
Browse files Browse the repository at this point in the history
* Add closer to final LOQ rules changes

* Formatting and linting commit

* Update OSIRIS Specification

* Finalise LOQ tests

* Formatting and linting commit

* Undo change to ingest.py as un-needed.

* Fix issues with the test_common_rules

* Fixup mypy

* Adjust the e2e tests to make them work

* Update rundetection/rules/loq_rules.py

Co-authored-by: Samuel <[email protected]>

* Update rundetection/rules/loq_rules.py

Co-authored-by: Samuel <[email protected]>

* Fix up the quick change to simplify _extract_run_number_from_filename

* Simplify find_path_for_run_number

* Update rundetection/rules/loq_rules.py

Co-authored-by: Samuel <[email protected]>

* Review responses

* Add extra needed extracts for script changes (adding extra metadata)

* Remove un-needed test

* Test a missed line in extracts selection function

---------

Co-authored-by: github-actions <[email protected]>
Co-authored-by: Samuel <[email protected]>
  • Loading branch information
3 people authored Oct 11, 2024
1 parent be1d924 commit 12e31c9
Show file tree
Hide file tree
Showing 14 changed files with 696 additions and 13 deletions.
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ version = "0.0.1"
requires-python = ">= 3.11"
dependencies = [
"pika==1.3.2",
"h5py==3.11.0"
"h5py==3.11.0",
"xmltodict==0.13.0",
"requests==2.32.3"
]


Expand All @@ -19,7 +21,9 @@ run-detection = "rundetection.run_detection:main"
formatting = [
"ruff==0.4.8",
"mypy==1.10.0",
"run-detection[test]"
"run-detection[test]",
"types-requests==2.32.0.20240914",
"types-xmltodict==0.14.0.20241009"
]

test = [
Expand Down
18 changes: 18 additions & 0 deletions rundetection/ingestion/extracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ def skip_extract(job_request: JobRequest, _: Any) -> JobRequest:
return job_request


def loq_extract(job_request: JobRequest, dataset: Any) -> JobRequest:
"""
Get the sample details and the cycle strings
:param job_request: The job request
:param dataset: The nexus file dataset
:return: The updated job request
"""
job_request.additional_values["cycle_string"] = get_cycle_string_from_path(job_request.filepath)
job_request.additional_values["sample_thickness"] = dataset.get("sample").get("thickness")
job_request.additional_values["sample_geometry"] = dataset.get("sample").get("shape")
job_request.additional_values["sample_height"] = dataset.get("sample").get("height")
job_request.additional_values["sample_width"] = dataset.get("sample").get("width")

return job_request


def tosca_extract(job_request: JobRequest, _: Any) -> JobRequest:
"""
Add the cycle_string to the job request
Expand Down Expand Up @@ -137,6 +153,8 @@ def get_extraction_function(instrument: str) -> Callable[[JobRequest, Any], JobR
return tosca_extract
case "osiris":
return osiris_extract
case "loq":
return loq_extract
case _:
return skip_extract

Expand Down
2 changes: 2 additions & 0 deletions rundetection/ingestion/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,6 @@ def get_run_title(nexus_path: Path) -> str:
:param nexus_path: Path - the nexus file path
:return: str - The title of the files run
"""
# Instead of using Ingest here and reusing code, we won't bother with loading too much of the file every time and
# JUST load the title instead of everything.
return ingest(nexus_path).experiment_title
27 changes: 27 additions & 0 deletions rundetection/rules/common_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@
Module containing rule implementations for instrument shared rules
"""

import logging

from rundetection.job_requests import JobRequest
from rundetection.rules.rule import Rule

logger = logging.getLogger(__name__)


class EnabledRule(Rule[bool]):
"""
Expand All @@ -14,3 +18,26 @@ class EnabledRule(Rule[bool]):

def verify(self, job_request: JobRequest) -> None:
job_request.will_reduce = self._value


class NotAScatterFileError(Exception):
pass


class CheckIfScatterSANS(Rule[bool]):
def verify(self, job_request: JobRequest) -> None:
if "_SANS/TRANS" not in job_request.experiment_title:
job_request.will_reduce = False
logger.error("Not a scatter run. Does not have _SANS/TRANS in the experiment title.")
# If it has empty or direct in the title assume it is a direct run file instead of a normal scatter.
if (
"empty" in job_request.experiment_title
or "EMPTY" in job_request.experiment_title
or "direct" in job_request.experiment_title
or "DIRECT" in job_request.experiment_title
):
job_request.will_reduce = False
logger.error(
"If it is a scatter, contains empty or direct in the title and is assumed to be a scatter "
"for an empty can run."
)
12 changes: 11 additions & 1 deletion rundetection/rules/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

from typing import Any

from rundetection.rules.common_rules import EnabledRule
from rundetection.rules.common_rules import CheckIfScatterSANS, EnabledRule
from rundetection.rules.inter_rules import InterStitchRule
from rundetection.rules.loq_rules import LoqFindFiles, LoqUserFile
from rundetection.rules.mari_rules import MariMaskFileRule, MariStitchRule, MariWBVANRule
from rundetection.rules.osiris_rules import (
OsirisDefaultGraniteAnalyser,
Expand Down Expand Up @@ -59,6 +60,15 @@ def rule_factory(key_: str, value: T) -> Rule[Any]: # noqa: C901, PLR0911, PLR0
case "osirisreductionmode":
if isinstance(value, bool):
return OsirisReductionModeRule(value)
case "checkifscattersans":
if isinstance(value, bool):
return CheckIfScatterSANS(value)
case "loqfindfiles":
if isinstance(value, bool):
return LoqFindFiles(value)
case "loquserfile":
if isinstance(value, str):
return LoqUserFile(value)
case _:
raise MissingRuleError(f"Implementation of Rule: {key_} does not exist.")

Expand Down
165 changes: 165 additions & 0 deletions rundetection/rules/loq_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
Rules for LOQ
"""

from __future__ import annotations

import logging
import typing
from dataclasses import dataclass
from pathlib import Path

import requests
import xmltodict

from rundetection.rules.rule import Rule

if typing.TYPE_CHECKING:
from rundetection.job_requests import JobRequest

logger = logging.getLogger(__name__)


@dataclass
class SansFileData:
title: str
type: str
run_number: str


def _extract_run_number_from_filename(filename: str) -> str:
# Assume filename looks like so: LOQ00100002.nxs, then strip.
return filename.split(".")[0].lstrip("LOQ").lstrip("0")


def _is_sample_transmission_file(sans_file: SansFileData, sample_title: str) -> bool:
return sample_title in sans_file.title and sans_file.type == "TRANS"


def _is_sample_direct_file(sans_file: SansFileData) -> bool:
return ("direct" in sans_file.title.lower() or "empty" in sans_file.title.lower()) and sans_file.type == "TRANS"


def _is_can_scatter_file(sans_file: SansFileData, can_title: str) -> bool:
return can_title == sans_file.title.split("_")[0] and sans_file.type == "SANS/TRANS"


def _is_can_transmission_file(sans_file: SansFileData, can_title: str) -> bool:
return can_title in sans_file.title and sans_file.type == "TRANS"


def _find_trans_file(sans_files: list[SansFileData], sample_title: str) -> SansFileData | None:
for sans_file in sans_files:
if _is_sample_transmission_file(sans_file=sans_file, sample_title=sample_title):
return sans_file
return None


def _find_direct_file(sans_files: list[SansFileData]) -> SansFileData | None:
reversed_files = reversed(sans_files)
for sans_file in reversed_files:
if _is_sample_direct_file(sans_file=sans_file):
return sans_file
return None


def _find_can_scatter_file(sans_files: list[SansFileData], can_title: str) -> SansFileData | None:
for sans_file in sans_files:
if _is_can_scatter_file(sans_file=sans_file, can_title=can_title):
return sans_file
return None


def _find_can_trans_file(sans_files: list[SansFileData], can_title: str) -> SansFileData | None:
for sans_file in sans_files:
if _is_can_transmission_file(sans_file=sans_file, can_title=can_title):
return sans_file
return None


def find_path_for_run_number(cycle_path: str, run_number: int) -> Path | None:
# 10 is just a magic number, but we needed an unrealistic value for the maximum
for padding in range(11):
potential_path = Path(f"{cycle_path}/LOQ{str(run_number).zfill(padding)}.nxs")
if potential_path.exists():
return potential_path
return None


def grab_cycle_instrument_index(cycle: str) -> str:
_, cycle_year, cycle_num = cycle.split("_")
url = f"http://data.isis.rl.ac.uk/journals/ndxloq/journal_{cycle_year}_{cycle_num}.xml"
return requests.get(url, timeout=5).text


def create_list_of_files(job_request: JobRequest) -> list[SansFileData]:
cycle = job_request.additional_values["cycle_string"]
xml = grab_cycle_instrument_index(cycle=cycle)
cycle_run_info = xmltodict.parse(xml)
list_of_files = []
for run_info in cycle_run_info["NXroot"]["NXentry"]:
title_contents = run_info["title"]["#text"].split("_")
run_number = run_info["run_number"]["#text"]
if len(title_contents) in {2, 3}:
file_type = title_contents[-1]
else:
job_request.will_reduce = False
logger.error(f"Run {run_info} either doesn't contain a _ or is not an expected experiment title format.")
return []
list_of_files.append(SansFileData(title=run_info["title"]["#text"], type=file_type, run_number=run_number))
return list_of_files


def strip_excess_files(sans_files: list[SansFileData], scatter_run_number: int) -> list[SansFileData]:
new_list_of_files: list[SansFileData] = []
for sans_file in sans_files:
if int(sans_file.run_number) >= scatter_run_number:
return new_list_of_files
new_list_of_files.append(sans_file)
return new_list_of_files


class LoqFindFiles(Rule[bool]):
def verify(self, job_request: JobRequest) -> None:
# Expecting 3 values
title_parts = job_request.experiment_title.split("_")
if len(title_parts) != 3: # noqa: PLR2004
job_request.will_reduce = False
logger.error(
f"Less or more than 3 sections to the experiment_title, probably missing Can Scatter title: "
f"{job_request.experiment_title}"
)
return
sample_title, can_title, ___ = title_parts
sans_files = create_list_of_files(job_request)
if sans_files == []:
job_request.will_reduce = False
logger.error("No files found for this cycle excluding this run.")
return
sans_files = strip_excess_files(sans_files, scatter_run_number=job_request.run_number)

job_request.additional_values["run_number"] = job_request.run_number

trans_file = _find_trans_file(sans_files=sans_files, sample_title=sample_title)
if trans_file is not None:
job_request.additional_values["scatter_transmission"] = trans_file.run_number

can_scatter = _find_can_scatter_file(sans_files=sans_files, can_title=can_title)
if can_scatter is not None:
job_request.additional_values["can_scatter"] = can_scatter.run_number

can_trans = _find_can_trans_file(sans_files=sans_files, can_title=can_title)
if can_trans is not None and can_scatter is not None:
job_request.additional_values["can_transmission"] = can_trans.run_number

direct_file = _find_direct_file(sans_files=sans_files)
if direct_file is not None:
if trans_file is not None:
job_request.additional_values["scatter_direct"] = direct_file.run_number
if can_scatter is not None and can_trans is not None:
job_request.additional_values["can_direct"] = direct_file.run_number


class LoqUserFile(Rule[str]):
def verify(self, job_request: JobRequest) -> None:
job_request.additional_values["user_file"] = f"/extras/loq/{self._value}"
2 changes: 0 additions & 2 deletions rundetection/run_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,12 @@ def process_notifications(notification_queue: SimpleQueue[JobRequest]) -> None:
:param notification_queue: The notification queue
:return: None
"""
logger.info("Checking notification queue...")
while not notification_queue.empty():
detected_run = notification_queue.get()
logger.info("Sending notification for run: %s", detected_run.run_number)

with producer() as channel:
channel.basic_publish(EGRESS_QUEUE_NAME, "", detected_run.to_json_string().encode())
logger.info("Notification queue empty. Continuing...")


def write_readiness_probe_file() -> None:
Expand Down
5 changes: 4 additions & 1 deletion rundetection/specifications/loq_specification.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{
"enabled": false
"enabled": true,
"checkifscattersans": true,
"loqfindfiles": true,
"loquserfile": "USER_LOQ_243B_M3_Changer_Xpress_Okesola__MERGED_log.toml"
}
2 changes: 1 addition & 1 deletion rundetection/specifications/osiris_specification.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"enabled": true,
"osiriscalibfilesandreflection": {"002": "00148587", "004": "00148587"},
"osiriscalibfilesandreflection": {"002": "00149059", "004": "00149060"},
"osirisreductionmode": false,
"osirisdefaultspectroscopy": true,
"osirisdefaultgraniteanalyser": true,
Expand Down
21 changes: 21 additions & 0 deletions test/ingestion/test_extracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rundetection.ingestion.extracts import (
get_cycle_string_from_path,
get_extraction_function,
loq_extract,
mari_extract,
osiris_extract,
skip_extract,
Expand Down Expand Up @@ -59,6 +60,7 @@ def test_skip_extract(caplog: LogCaptureFixture):
("mari", "mari_extract"),
("tosca", "tosca_extract"),
("osiris", "osiris_extract"),
("loq", "loq_extract"),
],
)
def test_get_extraction_function(input_value, expected_function_name):
Expand Down Expand Up @@ -236,6 +238,25 @@ def test_osiris_extract_raises_on_bad_frequencies(job_request):
osiris_extract(job_request, dataset)


def test_loq_extract(job_request):
dataset = {
"sample": {
"thickness": 1.0,
"shape": "Disc",
"height": 8.0,
"width": 8.0,
}
}
with patch("rundetection.ingestion.extracts.get_cycle_string_from_path", return_value="some string"):
loq_extract(job_request, dataset)

assert job_request.additional_values["cycle_string"] == "some string"
assert job_request.additional_values["sample_thickness"] == 1.0
assert job_request.additional_values["sample_geometry"] == "Disc"
assert job_request.additional_values["sample_height"] == 8.0 # noqa: PLR2004
assert job_request.additional_values["sample_width"] == 8.0 # noqa: PLR2004


def test_get_cycle_string_from_path_valid():
"""
Test get cycle string returns correct string
Expand Down
Loading

0 comments on commit 12e31c9

Please sign in to comment.