From ec91519952e6cae82e8831f77cdd62b76eb24eb3 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 11 Oct 2024 12:41:17 -0700 Subject: [PATCH 01/18] Ensure "in process" metric is always decremented. The previous code would leave a request marked as "in process" even if the service, for example, timed out waiting for a response. This introduced a permanent offset into the metric that could only be cleared by rebooting the fan-out service. The new code is exception-safe and should not have this problem. --- src/main.py | 41 +++++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/src/main.py b/src/main.py index a6ef077..a9626c7 100644 --- a/src/main.py +++ b/src/main.py @@ -122,36 +122,33 @@ async def knative_request( Information such as some fields of the next visit message to identify this request and to log with. """ - in_process_requests_gauge.inc() - - result = await client.post( - knative_serving_url, - headers=headers, - data=body, # type:ignore - timeout=None, - ) - - logging.info( - f"nextVisit {info} status code {result.status_code} for initial request {result.content}" - ) - - ''' - if result.status_code == 502 or result.status_code == 503: - logging.info( - f"retry after status code {result.status_code} for nextVisit {info}" - ) - retry_result = await client.post( + with in_process_requests_gauge.track_inprogress(): + result = await client.post( knative_serving_url, headers=headers, data=body, # type:ignore timeout=None, ) + logging.info( - f"nextVisit {info} retried request {retry_result.content}" + f"nextVisit {info} status code {result.status_code} for initial request {result.content}" ) - ''' - in_process_requests_gauge.dec() + ''' + if result.status_code == 502 or result.status_code == 503: + logging.info( + f"retry after status code {result.status_code} for nextVisit {info}" + ) + retry_result = await client.post( + knative_serving_url, + headers=headers, + data=body, # type:ignore + timeout=None, + ) + logging.info( + f"nextVisit {info} retried request {retry_result.content}" + ) + ''' async def main() -> None: From be49a3255fc167c14d9928f655b7d5da90284e78 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Fri, 11 Oct 2024 13:29:41 -0700 Subject: [PATCH 02/18] Factor out handling of per-instrument metrics. Grouping the Gauges together, and indexing them by instrument name, lets us remove a large amount of boilerplate code in both initializing the Gauges and passing them around. --- src/main.py | 94 ++++++++++++++++++++++------------------------------- 1 file changed, 38 insertions(+), 56 deletions(-) diff --git a/src/main.py b/src/main.py index a9626c7..1d1423b 100644 --- a/src/main.py +++ b/src/main.py @@ -97,6 +97,34 @@ def detector_load(conf: dict, instrument: str) -> list[int]: return active_detectors +@dataclasses.dataclass(frozen=True) +class Metrics: + """A container for all metrics associated with a specific instrument. + + Parameters + ---------- + instrument : `str` + The instrument whose metrics are held by this object. + """ + + instrument: str + """The instrument whose metrics are held by this object (`str`).""" + total_received: Gauge + """The number of incoming messages processed by this instance (`prometheus_client.Gauge`).""" + in_process: Gauge + """The number of fanned-out messages currently being processed (`prometheus_client.Gauge`).""" + + def __init__(self, instrument): + super().__setattr__("instrument", instrument) + word_instrument = instrument.lower().replace(" ", "_") + super().__setattr__("total_received", + Gauge(word_instrument + "_next_visit_messages", + f"next visit messages with {instrument} as instrument")) + super().__setattr__("in_process", + Gauge(word_instrument + "_prompt_processing_in_process_requests", + f"{instrument} in process requests for next visit")) + + @REQUEST_TIME.time() async def knative_request( in_process_requests_gauge, @@ -152,6 +180,7 @@ async def knative_request( async def main() -> None: + supported_instruments = ["LATISS", "LSSTCam", "LSSTComCam", "LSSTComCamSim", "HSC", ] # Get environment variables detector_config_file = os.environ["DETECTOR_CONFIG_FILE"] @@ -204,47 +233,7 @@ async def main() -> None: sasl_plain_password=sasl_password, ) - latiss_gauge = Gauge( - "latiss_next_visit_messages", "next visit nessages with latiss as instrument" - ) - lsstcam_gauge = Gauge( - "lsstcam_next_visit_messages", "next visit nessages with lsstcam as instrument" - ) - lsstcomcam_gauge = Gauge( - "lsstcomcam_next_visit_messages", - "next visit nessages with lsstcomcam as instrument", - ) - lsstcomcamsim_gauge = Gauge( - "lsstcomcamsim_next_visit_messages", - "next visit nessages with lsstcomcamsim as instrument", - ) - hsc_gauge = Gauge( - "hsc_next_visit_messages", "next visit nessages with hsc as instrument" - ) - hsc_in_process_requests_gauge = Gauge( - "hsc_prompt_processing_in_process_requests", - "hsc in process requests for next visit", - ) - - latiss_in_process_requests_gauge = Gauge( - "latiss_prompt_processing_in_process_requests", - "latiss in process requests for next visit", - ) - - lsstcam_in_process_requests_gauge = Gauge( - "lsstcam_prompt_processing_in_process_requests", - "lsstcam in process requests for next visit", - ) - - lsstcomcam_in_process_requests_gauge = Gauge( - "lsstcomcam_prompt_processing_in_process_requests", - "lsstcomcam in process requests for next visit", - ) - - lsstcomcamsim_in_process_requests_gauge = Gauge( - "lsstcomcamsim_prompt_processing_in_process_requests", - "lsstcomcamsim in process requests for next visit", - ) + gauges = {inst: Metrics(inst) for inst in supported_instruments} await consumer.start() @@ -319,7 +308,7 @@ async def main() -> None: match next_visit_message_updated.instrument: case "LATISS": - latiss_gauge.inc() + gauges["LATISS"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), @@ -327,9 +316,8 @@ async def main() -> None: ) ) knative_serving_url = latiss_knative_serving_url - in_process_requests_gauge = latiss_in_process_requests_gauge case "LSSTComCamSim": - lsstcomcamsim_gauge.inc() + gauges["LSSTComCamSim"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), @@ -338,7 +326,6 @@ async def main() -> None: ) ) knative_serving_url = lsstcomcamsim_knative_serving_url - in_process_requests_gauge = lsstcomcamsim_in_process_requests_gauge case "LSSTComCam": logging.info(f"Ignore LSSTComCam message {next_visit_message_updated}" " as the prompt service for this is not yet deployed.") @@ -352,7 +339,7 @@ async def main() -> None: # upload.py test. match next_visit_message_updated.salIndex: case 999: # HSC datasets from using upload_from_repo.py - hsc_gauge.inc() + gauges["HSC"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), @@ -360,9 +347,8 @@ async def main() -> None: ) ) knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge case 59134: # HSC upload.py test dataset - hsc_gauge.inc() + gauges["HSC"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), @@ -370,9 +356,8 @@ async def main() -> None: ) ) knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge case 59142: # HSC upload.py test dataset - hsc_gauge.inc() + gauges["HSC"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), @@ -380,9 +365,8 @@ async def main() -> None: ) ) knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge case 59150: # HSC upload.py test dataset - hsc_gauge.inc() + gauges["HSC"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), @@ -390,9 +374,8 @@ async def main() -> None: ) ) knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge case 59160: # HSC upload.py test dataset - hsc_gauge.inc() + gauges["HSC"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), @@ -400,7 +383,6 @@ async def main() -> None: ) ) knative_serving_url = hsc_knative_serving_url - in_process_requests_gauge = hsc_in_process_requests_gauge case _: raise Exception( f"no matching case for instrument {next_visit_message_updated.instrument}." @@ -425,7 +407,7 @@ async def main() -> None: task = asyncio.create_task( knative_request( - in_process_requests_gauge, + gauges[fan_out_message["instrument"]].in_process, client, knative_serving_url, headers, From e4cb582bd86e1e2fbd902fa0fb4f1fb5cefef5d6 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 13:46:12 -0700 Subject: [PATCH 03/18] Configure supported instruments. This commit begins the process of removing specific instrument references from the source code. However, some references still remain. --- src/main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main.py b/src/main.py index 1d1423b..0a488b6 100644 --- a/src/main.py +++ b/src/main.py @@ -180,9 +180,8 @@ async def knative_request( async def main() -> None: - supported_instruments = ["LATISS", "LSSTCam", "LSSTComCam", "LSSTComCamSim", "HSC", ] - # Get environment variables + supported_instruments = os.environ["SUPPORTED_INSTRUMENTS"].split() detector_config_file = os.environ["DETECTOR_CONFIG_FILE"] kafka_cluster = os.environ["KAFKA_CLUSTER"] group_id = os.environ["CONSUMER_GROUP"] From b4f49e7ae2ff658b1a8502a67a4e5a0db45e4538 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 11:30:55 -0700 Subject: [PATCH 04/18] Factor detector.yaml out of Docker image. The configuration is now handled in Phalanx, and can be changed without rebuilding the container. --- src/detector.yaml | 329 ---------------------------------------------- src/main.py | 8 +- 2 files changed, 4 insertions(+), 333 deletions(-) delete mode 100644 src/detector.yaml diff --git a/src/detector.yaml b/src/detector.yaml deleted file mode 100644 index 16a9981..0000000 --- a/src/detector.yaml +++ /dev/null @@ -1,329 +0,0 @@ -LATISS: - detectors: - 0: True -LSSTComCam: - detectors: - 0: True - 1: True - 2: True - 3: True - 4: True - 5: True - 6: True - 7: True - 8: True -LSSTCam: - detectors: - 0: False - 1: False - 2: False - 3: False - 4: False - 5: False - 6: False - 7: False - 8: False - 9: False - 10: False - 11: False - 12: False - 13: False - 14: False - 15: False - 16: False - 17: False - 18: False - 19: False - 20: False - 21: False - 22: False - 23: False - 24: False - 25: False - 26: False - 27: False - 28: False - 29: False - 30: False - 31: False - 32: False - 33: False - 34: False - 35: False - 36: False - 37: False - 38: False - 39: False - 40: False - 41: False - 42: False - 43: False - 44: False - 45: False - 46: False - 47: False - 48: False - 49: False - 50: False - 51: False - 52: False - 53: False - 54: False - 55: False - 56: False - 57: False - 58: False - 59: False - 60: False - 61: False - 62: False - 63: False - 64: False - 65: False - 66: False - 67: False - 68: False - 69: False - 70: False - 71: False - 72: False - 73: False - 74: False - 75: False - 76: False - 77: False - 78: False - 79: False - 80: False - 81: False - 82: False - 83: False - 84: False - 85: False - 86: False - 87: False - 88: False - 89: False - 90: False - 91: False - 92: False - 93: False - 94: False - 95: False - 96: False - 97: False - 98: False - 99: False - 100: False - 101: False - 102: False - 103: False - 104: False - 105: False - 106: False - 107: False - 108: False - 109: False - 110: False - 111: False - 112: False - 113: False - 114: False - 115: False - 116: False - 117: False - 118: False - 119: False - 120: False - 121: False - 122: False - 123: False - 124: False - 125: False - 126: False - 127: False - 128: False - 129: False - 130: False - 131: False - 132: False - 133: False - 134: False - 135: False - 136: False - 137: False - 138: False - 139: False - 140: False - 141: False - 142: False - 143: False - 144: False - 145: False - 146: False - 147: False - 148: False - 149: False - 150: False - 151: False - 152: False - 153: False - 154: False - 155: False - 156: False - 157: False - 158: False - 159: False - 160: False - 161: False - 162: False - 163: False - 164: False - 165: False - 166: False - 167: False - 168: False - 169: False - 170: False - 171: False - 172: False - 173: False - 174: False - 175: False - 176: False - 177: False - 178: False - 179: False - 180: False - 181: False - 182: False - 183: False - 184: False - 185: False - 186: False - 187: False - 188: False -HSC: - detectors: - 0: True - 1: True - 2: True - 3: True - 4: True - 5: True - 6: True - 7: True - 8: True - 9: False - 10: True - 11: True - 12: True - 13: True - 14: True - 15: True - 16: True - 17: True - 18: True - 19: True - 20: True - 21: True - 22: True - 23: True - 24: True - 25: True - 26: True - 27: True - 28: True - 29: True - 30: True - 31: True - 32: True - 33: True - 34: True - 35: True - 36: True - 37: True - 38: True - 39: True - 40: True - 41: True - 42: True - 43: True - 44: True - 45: True - 46: True - 47: True - 48: True - 49: True - 50: True - 51: True - 52: True - 53: True - 54: True - 55: True - 56: True - 57: True - 58: True - 59: True - 60: True - 61: True - 62: True - 63: True - 64: True - 65: True - 66: True - 67: True - 68: True - 69: True - 70: True - 71: True - 72: True - 73: True - 74: True - 75: True - 76: True - 77: True - 78: True - 79: True - 80: True - 81: True - 82: True - 83: True - 84: True - 85: True - 86: True - 87: True - 88: True - 89: True - 90: True - 91: True - 92: True - 93: True - 94: True - 95: True - 96: True - 97: True - 98: True - 99: True - 100: True - 101: True - 102: True - 103: True -HSC-TEST-59134: - detectors: - 0: True - 4: True - 5: True -HSC-TEST-59142: - detectors: - 0: True - 5: True - 11: True -HSC-TEST-59150: - detectors: - 50: True - 58: True -HSC-TEST-59160: - detectors: - 43: True - 51: True diff --git a/src/main.py b/src/main.py index 0a488b6..9a514d9 100644 --- a/src/main.py +++ b/src/main.py @@ -85,15 +85,15 @@ def detector_load(conf: dict, instrument: str) -> list[int]: The instrument to load detectors for. Yields ------ - active_detectors : `list` + active_detectors : `list` [`int`] The active detectors for the instrument. """ detectors = conf[instrument]["detectors"] active_detectors: list[int] = [] - for k, v in detectors.items(): - if v: - active_detectors.append(k) + for detector, active in detectors.items(): + if active: + active_detectors.append(int(detector)) return active_detectors From 4a0de43417373bb308344caaf342eea5287bc3d2 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 13:50:45 -0700 Subject: [PATCH 05/18] Rename detector config to instrument config. The old format was called a "detector" config despite having room for more fields. Calling it an instrument config makes it more natural to add other instrument-specific information to the file. --- src/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.py b/src/main.py index 9a514d9..65a23f3 100644 --- a/src/main.py +++ b/src/main.py @@ -182,7 +182,7 @@ async def knative_request( async def main() -> None: # Get environment variables supported_instruments = os.environ["SUPPORTED_INSTRUMENTS"].split() - detector_config_file = os.environ["DETECTOR_CONFIG_FILE"] + instrument_config_file = os.environ["INSTRUMENT_CONFIG_FILE"] kafka_cluster = os.environ["KAFKA_CLUSTER"] group_id = os.environ["CONSUMER_GROUP"] topic = os.environ["NEXT_VISIT_TOPIC"] @@ -205,7 +205,7 @@ async def main() -> None: logging.basicConfig(stream=sys.stdout, level=logging.INFO) logging.basicConfig(stream=sys.stderr, level=logging.WARNING) - conf = yaml.safe_load(Path(detector_config_file).read_text()) + conf = yaml.safe_load(Path(instrument_config_file).read_text()) # list based on keys in config. Data class latiss_active_detectors = detector_load(conf, "LATISS") From 2bf8eecfbcc2b54a8f2ab8952b62783553b4066e Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 16 Oct 2024 10:26:04 -0700 Subject: [PATCH 06/18] Transpose detector config. The original config was organized instrument->detectors->detector, which required the entire config to be overridden if only one per-instrument property was environment-dependent. The new config is organized detectors->instrument->detector, which allows non-detector configs to be changed without touching or duplicating the detectors. --- src/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index 65a23f3..3079ee1 100644 --- a/src/main.py +++ b/src/main.py @@ -89,7 +89,7 @@ def detector_load(conf: dict, instrument: str) -> list[int]: The active detectors for the instrument. """ - detectors = conf[instrument]["detectors"] + detectors = conf["detectors"][instrument] active_detectors: list[int] = [] for detector, active in detectors.items(): if active: From ddd1df0cc99e831c9c33857bbefff2813c690388 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 13:55:19 -0700 Subject: [PATCH 07/18] Move Knative URLs from envvars to instrument map. This consolidates the instrument-specific configs in one place. --- src/main.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main.py b/src/main.py index 3079ee1..24b6521 100644 --- a/src/main.py +++ b/src/main.py @@ -189,11 +189,6 @@ async def main() -> None: offset = os.environ["OFFSET"] expire = float(os.environ["MESSAGE_EXPIRATION"]) kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"] - latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"] - lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"] - lsstcomcamsim_knative_serving_url = os.environ["LSSTCOMCAMSIM_KNATIVE_SERVING_URL"] - lsstcam_knative_serving_url = os.environ["LSSTCAM_KNATIVE_SERVING_URL"] - hsc_knative_serving_url = os.environ["HSC_KNATIVE_SERVING_URL"] # kafka auth sasl_username = os.environ["SASL_USERNAME"] @@ -207,6 +202,12 @@ async def main() -> None: conf = yaml.safe_load(Path(instrument_config_file).read_text()) + latiss_knative_serving_url = conf["knative-urls"]["LATISS"] + lsstcomcam_knative_serving_url = conf["knative-urls"]["LSSTComCam"] + lsstcomcamsim_knative_serving_url = conf["knative-urls"]["LSSTComCamSim"] + lsstcam_knative_serving_url = conf["knative-urls"]["LSSTCam"] + hsc_knative_serving_url = conf["knative-urls"]["HSC"] + # list based on keys in config. Data class latiss_active_detectors = detector_load(conf, "LATISS") lsstcomcam_active_detectors = detector_load(conf, "LSSTComCam") From e1608ab453f34143ec7434333ef0045e0df3e6f5 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 13:57:15 -0700 Subject: [PATCH 08/18] Factor out per-instrument configuration. Grouping the detectors and URLs together, and indexing them by instrument name, lets us remove most instrument-specific branches. The exceptions are the HSC Cosmos mini-datasets, which still need special treatment. --- src/main.py | 100 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/src/main.py b/src/main.py index 24b6521..0e9cf41 100644 --- a/src/main.py +++ b/src/main.py @@ -1,4 +1,5 @@ import asyncio +import collections.abc import dataclasses import datetime import json @@ -73,28 +74,53 @@ def add_detectors( return message_list -def detector_load(conf: dict, instrument: str) -> list[int]: - """Load active instrument detectors from yaml configiration file of - true false values for each detector. +@dataclasses.dataclass(frozen=True) +class InstrumentConfig: + """The configuration used for sending messages to a specific instrument. Parameters ---------- conf : `dict` - The instrument configuration from the yaml file. - instrument: `str` - The instrument to load detectors for. - Yields - ------ - active_detectors : `list` [`int`] - The active detectors for the instrument. + A hierarchical instrument configuration, whose keys are instruments. + instrument : `str` + The instrument to configure. """ - detectors = conf["detectors"][instrument] - active_detectors: list[int] = [] - for detector, active in detectors.items(): - if active: - active_detectors.append(int(detector)) - return active_detectors + instrument: str + """The instrument whose metrics are held by this object (`str`).""" + url: str + """The address of the Knative Serving instance for this instrument (`str`).""" + detectors: collections.abc.Sequence[int] + """The active detectors for this instrument (sequence [`int`]).""" + + def __init__(self, conf, instrument): + super().__setattr__("instrument", instrument) + super().__setattr__("url", conf["knative-urls"][instrument]) + super().__setattr__("detectors", self.detector_load(conf, instrument)) + + @staticmethod + def detector_load(conf: dict, instrument: str) -> list[int]: + """Load active instrument detectors from yaml configiration file of + true false values for each detector. + + Parameters + ---------- + conf : `dict` + The instrument configuration from the yaml file. + instrument : `str` + The instrument to load detectors for. + + Returns + ------- + active_detectors : `list` [`int`] + The active detectors for the instrument. + """ + detectors = conf["detectors"][instrument] + active_detectors: list[int] = [] + for detector, active in detectors.items(): + if active: + active_detectors.append(int(detector)) + return active_detectors @dataclasses.dataclass(frozen=True) @@ -201,23 +227,12 @@ async def main() -> None: logging.basicConfig(stream=sys.stderr, level=logging.WARNING) conf = yaml.safe_load(Path(instrument_config_file).read_text()) - - latiss_knative_serving_url = conf["knative-urls"]["LATISS"] - lsstcomcam_knative_serving_url = conf["knative-urls"]["LSSTComCam"] - lsstcomcamsim_knative_serving_url = conf["knative-urls"]["LSSTComCamSim"] - lsstcam_knative_serving_url = conf["knative-urls"]["LSSTCam"] - hsc_knative_serving_url = conf["knative-urls"]["HSC"] - - # list based on keys in config. Data class - latiss_active_detectors = detector_load(conf, "LATISS") - lsstcomcam_active_detectors = detector_load(conf, "LSSTComCam") - lsstcam_active_detectors = detector_load(conf, "LSSTCam") - hsc_active_detectors = detector_load(conf, "HSC") + instruments = {inst: InstrumentConfig(conf, inst) for inst in supported_instruments} # These four groups are for the small dataset used in the upload.py test - hsc_active_detectors_59134 = detector_load(conf, "HSC-TEST-59134") - hsc_active_detectors_59142 = detector_load(conf, "HSC-TEST-59142") - hsc_active_detectors_59150 = detector_load(conf, "HSC-TEST-59150") - hsc_active_detectors_59160 = detector_load(conf, "HSC-TEST-59160") + hsc_active_detectors_59134 = InstrumentConfig.detector_load(conf, "HSC-TEST-59134") + hsc_active_detectors_59142 = InstrumentConfig.detector_load(conf, "HSC-TEST-59142") + hsc_active_detectors_59150 = InstrumentConfig.detector_load(conf, "HSC-TEST-59150") + hsc_active_detectors_59160 = InstrumentConfig.detector_load(conf, "HSC-TEST-59160") # Start Prometheus endpoint start_http_server(8000) @@ -312,20 +327,19 @@ async def main() -> None: fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - latiss_active_detectors, + instruments["LATISS"].detectors, ) ) - knative_serving_url = latiss_knative_serving_url + knative_serving_url = instruments["LATISS"].url case "LSSTComCamSim": gauges["LSSTComCamSim"].total_received.inc() fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - # Just use ComCam active detector config. - lsstcomcam_active_detectors, + instruments["LSSTComCamSim"].detectors, ) ) - knative_serving_url = lsstcomcamsim_knative_serving_url + knative_serving_url = instruments["LSSTComCamSim"].url case "LSSTComCam": logging.info(f"Ignore LSSTComCam message {next_visit_message_updated}" " as the prompt service for this is not yet deployed.") @@ -343,10 +357,10 @@ async def main() -> None: fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors, + instruments["HSC"].detectors, ) ) - knative_serving_url = hsc_knative_serving_url + knative_serving_url = instruments["HSC"].url case 59134: # HSC upload.py test dataset gauges["HSC"].total_received.inc() fan_out_message_list = ( @@ -355,7 +369,7 @@ async def main() -> None: hsc_active_detectors_59134, ) ) - knative_serving_url = hsc_knative_serving_url + knative_serving_url = instruments["HSC"].url case 59142: # HSC upload.py test dataset gauges["HSC"].total_received.inc() fan_out_message_list = ( @@ -364,7 +378,7 @@ async def main() -> None: hsc_active_detectors_59142, ) ) - knative_serving_url = hsc_knative_serving_url + knative_serving_url = instruments["HSC"].url case 59150: # HSC upload.py test dataset gauges["HSC"].total_received.inc() fan_out_message_list = ( @@ -373,7 +387,7 @@ async def main() -> None: hsc_active_detectors_59150, ) ) - knative_serving_url = hsc_knative_serving_url + knative_serving_url = instruments["HSC"].url case 59160: # HSC upload.py test dataset gauges["HSC"].total_received.inc() fan_out_message_list = ( @@ -382,7 +396,7 @@ async def main() -> None: hsc_active_detectors_59160, ) ) - knative_serving_url = hsc_knative_serving_url + knative_serving_url = instruments["HSC"].url case _: raise Exception( f"no matching case for instrument {next_visit_message_updated.instrument}." From 089c8e932eb9b4e730b1817c69bd76e988431a30 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 16:04:07 -0700 Subject: [PATCH 09/18] Standardize special-casing of HSC Cosmos uploads. These datasets require specific detector combinations, but these combinations are uniquely identifiable by visit ID. --- src/main.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/src/main.py b/src/main.py index 0e9cf41..a733ff6 100644 --- a/src/main.py +++ b/src/main.py @@ -229,10 +229,8 @@ async def main() -> None: conf = yaml.safe_load(Path(instrument_config_file).read_text()) instruments = {inst: InstrumentConfig(conf, inst) for inst in supported_instruments} # These four groups are for the small dataset used in the upload.py test - hsc_active_detectors_59134 = InstrumentConfig.detector_load(conf, "HSC-TEST-59134") - hsc_active_detectors_59142 = InstrumentConfig.detector_load(conf, "HSC-TEST-59142") - hsc_active_detectors_59150 = InstrumentConfig.detector_load(conf, "HSC-TEST-59150") - hsc_active_detectors_59160 = InstrumentConfig.detector_load(conf, "HSC-TEST-59160") + hsc_upload_detectors = {visit: InstrumentConfig.detector_load(conf, f"HSC-TEST-{visit}") + for visit in {59134, 59142, 59150, 59160}} # Start Prometheus endpoint start_http_server(8000) @@ -366,7 +364,7 @@ async def main() -> None: fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59134, + hsc_upload_detectors[59134], ) ) knative_serving_url = instruments["HSC"].url @@ -375,7 +373,7 @@ async def main() -> None: fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59142, + hsc_upload_detectors[59142], ) ) knative_serving_url = instruments["HSC"].url @@ -384,7 +382,7 @@ async def main() -> None: fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59150, + hsc_upload_detectors[59150], ) ) knative_serving_url = instruments["HSC"].url @@ -393,7 +391,7 @@ async def main() -> None: fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - hsc_active_detectors_59160, + hsc_upload_detectors[59160], ) ) knative_serving_url = instruments["HSC"].url From 4c44f7d4838481ce8f2ba39845d5dfe9f2dacacf Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 15:38:37 -0700 Subject: [PATCH 10/18] Fix typing errors in NextVisitModel. Coordinates are pairs of floats, not ints, and not all values are strings. --- src/main.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/main.py b/src/main.py index a733ff6..b43fa43 100644 --- a/src/main.py +++ b/src/main.py @@ -31,7 +31,7 @@ class NextVisitModel: instrument: str groupId: str coordinateSystem: int - position: typing.List[int] + position: list[float] startTime: float rotationSystem: int cameraAngle: float @@ -47,7 +47,7 @@ def add_detectors( self, message: dict, active_detectors: list, - ) -> list[dict[str, str]]: + ) -> list[dict[str, typing.Any]]: """Adds and duplicates next visit messages for fanout. Parameters @@ -56,12 +56,13 @@ def add_detectors( The next visit message. active_detectors: `list` The active detectors for an instrument. - Yields - ------ - message_list : `list` + + Returns + ------- + message_list : `list` [`dict`] The message list for fan out. """ - message_list: list[dict[str, str]] = [] + message_list: list[dict[str, typing.Any]] = [] for active_detector in active_detectors: temp_message = message.copy() temp_message["detector"] = active_detector From b9a5de6000dfed763410398b04b3fb5e72b53dac Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 15:56:07 -0700 Subject: [PATCH 11/18] Factor out fan-out submission info. Collecting the target Knative URL and the fanned-out messages into a single object makes it easy to factor out the shared code. --- src/main.py | 43 +++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/src/main.py b/src/main.py index b43fa43..b0fa7a5 100644 --- a/src/main.py +++ b/src/main.py @@ -152,6 +152,17 @@ def __init__(self, instrument): f"{instrument} in process requests for next visit")) +@dataclasses.dataclass(frozen=True) +class Submission: + """The batched requests to be submitted to a Knative instance. + """ + + url: str + """The address of the Knative Serving instance to send requests to (`str`).""" + fan_out_messages: collections.abc.Collection[dict[str, typing.Any]] + """The messages to send to ``url`` (collection [`dict`]).""" + + @REQUEST_TIME.time() async def knative_request( in_process_requests_gauge, @@ -323,22 +334,22 @@ async def main() -> None: match next_visit_message_updated.instrument: case "LATISS": gauges["LATISS"].total_received.inc() - fan_out_message_list = ( + send_info = Submission( + instruments["LATISS"].url, next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), instruments["LATISS"].detectors, ) ) - knative_serving_url = instruments["LATISS"].url case "LSSTComCamSim": gauges["LSSTComCamSim"].total_received.inc() - fan_out_message_list = ( + send_info = Submission( + instruments["LSSTComCamSim"].url, next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), instruments["LSSTComCamSim"].detectors, ) ) - knative_serving_url = instruments["LSSTComCamSim"].url case "LSSTComCam": logging.info(f"Ignore LSSTComCam message {next_visit_message_updated}" " as the prompt service for this is not yet deployed.") @@ -353,49 +364,49 @@ async def main() -> None: match next_visit_message_updated.salIndex: case 999: # HSC datasets from using upload_from_repo.py gauges["HSC"].total_received.inc() - fan_out_message_list = ( + send_info = Submission( + instruments["HSC"].url, next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), instruments["HSC"].detectors, ) ) - knative_serving_url = instruments["HSC"].url case 59134: # HSC upload.py test dataset gauges["HSC"].total_received.inc() - fan_out_message_list = ( + send_info = Submission( + instruments["HSC"].url, next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_upload_detectors[59134], ) ) - knative_serving_url = instruments["HSC"].url case 59142: # HSC upload.py test dataset gauges["HSC"].total_received.inc() - fan_out_message_list = ( + send_info = Submission( + instruments["HSC"].url, next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_upload_detectors[59142], ) ) - knative_serving_url = instruments["HSC"].url case 59150: # HSC upload.py test dataset gauges["HSC"].total_received.inc() - fan_out_message_list = ( + send_info = Submission( + instruments["HSC"].url, next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_upload_detectors[59150], ) ) - knative_serving_url = instruments["HSC"].url case 59160: # HSC upload.py test dataset gauges["HSC"].total_received.inc() - fan_out_message_list = ( + send_info = Submission( + instruments["HSC"].url, next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), hsc_upload_detectors[59160], ) ) - knative_serving_url = instruments["HSC"].url case _: raise Exception( f"no matching case for instrument {next_visit_message_updated.instrument}." @@ -407,7 +418,7 @@ async def main() -> None: "source": topic, } - for fan_out_message in fan_out_message_list: + for fan_out_message in send_info.fan_out_messages: data = fan_out_message data_json = json.dumps(data) @@ -422,7 +433,7 @@ async def main() -> None: knative_request( gauges[fan_out_message["instrument"]].in_process, client, - knative_serving_url, + send_info.url, headers, body, str(info), From d195be5a4fc7021ae069ccd4a25bdfcabdaf0633 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 16:39:26 -0700 Subject: [PATCH 12/18] Factor duplicate fan-out code into separate functions. The HSC Cosmos datasets still need special casing, but the relevant function and switch branch can be simply deleted once HSC support is phased out. --- src/main.py | 122 +++++++++++++++++++++++----------------------------- 1 file changed, 55 insertions(+), 67 deletions(-) diff --git a/src/main.py b/src/main.py index b0fa7a5..2162e72 100644 --- a/src/main.py +++ b/src/main.py @@ -163,6 +163,48 @@ class Submission: """The messages to send to ``url`` (collection [`dict`]).""" +def fan_out(next_visit, inst_config): + """Prepare fanned-out messages for sending to the Prompt Processing service. + + Parameters + ---------- + next_visit : `NextVisitModel` + The nextVisit message to fan out. + inst_config : `InstrumentConfig` + The configuration information for the active instrument. + + Returns + ------- + fanned_out : `Submission` + The submission information for the fanned-out messages. + """ + return Submission(inst_config.url, + next_visit.add_detectors(dataclasses.asdict(next_visit), inst_config.detectors) + ) + + +def fan_out_hsc(next_visit, inst_config, detectors): + """Prepare fanned-out messages for HSC upload.py. + + Parameters + ---------- + next_visit : `NextVisitModel` + The nextVisit message to fan out. + inst_config : `InstrumentConfig` + The configuration information for the active instrument. + detectors : collection [`int`] + The detectors to send. + + Returns + ------- + fanned_out : `Submission` + The submission information for the fanned-out messages. + """ + return Submission(inst_config.url, + next_visit.add_detectors(dataclasses.asdict(next_visit), detectors) + ) + + @REQUEST_TIME.time() async def knative_request( in_process_requests_gauge, @@ -332,30 +374,8 @@ async def main() -> None: ) match next_visit_message_updated.instrument: - case "LATISS": - gauges["LATISS"].total_received.inc() - send_info = Submission( - instruments["LATISS"].url, - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - instruments["LATISS"].detectors, - ) - ) - case "LSSTComCamSim": - gauges["LSSTComCamSim"].total_received.inc() - send_info = Submission( - instruments["LSSTComCamSim"].url, - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - instruments["LSSTComCamSim"].detectors, - ) - ) - case "LSSTComCam": - logging.info(f"Ignore LSSTComCam message {next_visit_message_updated}" - " as the prompt service for this is not yet deployed.") - continue - case "LSSTCam": - logging.info(f"Ignore LSSTCam message {next_visit_message_updated}" + case "LSSTComCam" | "LSSTCam" as instrument: + logging.info(f"Ignore {instrument} message {next_visit_message_updated}" " as the prompt service for this is not yet deployed.") continue case "HSC": @@ -364,51 +384,19 @@ async def main() -> None: match next_visit_message_updated.salIndex: case 999: # HSC datasets from using upload_from_repo.py gauges["HSC"].total_received.inc() - send_info = Submission( - instruments["HSC"].url, - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - instruments["HSC"].detectors, - ) - ) - case 59134: # HSC upload.py test dataset - gauges["HSC"].total_received.inc() - send_info = Submission( - instruments["HSC"].url, - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_upload_detectors[59134], - ) - ) - case 59142: # HSC upload.py test dataset - gauges["HSC"].total_received.inc() - send_info = Submission( - instruments["HSC"].url, - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_upload_detectors[59142], - ) - ) - case 59150: # HSC upload.py test dataset - gauges["HSC"].total_received.inc() - send_info = Submission( - instruments["HSC"].url, - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_upload_detectors[59150], - ) - ) - case 59160: # HSC upload.py test dataset + send_info = fan_out(next_visit_message_updated, instruments["HSC"]) + case visit if visit in hsc_upload_detectors: # upload.py test datasets gauges["HSC"].total_received.inc() - send_info = Submission( - instruments["HSC"].url, - next_visit_message_updated.add_detectors( - dataclasses.asdict(next_visit_message_updated), - hsc_upload_detectors[59160], - ) - ) + send_info = fan_out_hsc(next_visit_message_updated, instruments["HSC"], + hsc_upload_detectors[visit]) + case _: + raise RuntimeError("No matching case for HSC " + f"salIndex {next_visit_message_updated.salIndex}") + case instrument if instrument in supported_instruments: + gauges[instrument].total_received.inc() + send_info = fan_out(next_visit_message_updated, instruments[instrument]) case _: - raise Exception( + raise RuntimeError( f"no matching case for instrument {next_visit_message_updated.instrument}." ) From 08ff768d9a4ca110d7199425df2ed7f450609794 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Mon, 14 Oct 2024 16:46:54 -0700 Subject: [PATCH 13/18] Simplify NextVisitModel.add_detectors. The old add_detectors was an instance method implemented like a static method. This commit changes it to a proper instance method, since that is the existing usage pattern. --- src/main.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/main.py b/src/main.py index 2162e72..469fe5a 100644 --- a/src/main.py +++ b/src/main.py @@ -45,15 +45,12 @@ class NextVisitModel: def add_detectors( self, - message: dict, active_detectors: list, ) -> list[dict[str, typing.Any]]: - """Adds and duplicates next visit messages for fanout. + """Adds and duplicates this message for fanout. Parameters ---------- - message: `str` - The next visit message. active_detectors: `list` The active detectors for an instrument. @@ -62,6 +59,7 @@ def add_detectors( message_list : `list` [`dict`] The message list for fan out. """ + message = dataclasses.asdict(self) message_list: list[dict[str, typing.Any]] = [] for active_detector in active_detectors: temp_message = message.copy() @@ -178,9 +176,7 @@ def fan_out(next_visit, inst_config): fanned_out : `Submission` The submission information for the fanned-out messages. """ - return Submission(inst_config.url, - next_visit.add_detectors(dataclasses.asdict(next_visit), inst_config.detectors) - ) + return Submission(inst_config.url, next_visit.add_detectors(inst_config.detectors)) def fan_out_hsc(next_visit, inst_config, detectors): @@ -200,9 +196,7 @@ def fan_out_hsc(next_visit, inst_config, detectors): fanned_out : `Submission` The submission information for the fanned-out messages. """ - return Submission(inst_config.url, - next_visit.add_detectors(dataclasses.asdict(next_visit), detectors) - ) + return Submission(inst_config.url, next_visit.add_detectors(detectors)) @REQUEST_TIME.time() From db16cb4332e1b049776ac0251ff7ba000bda1cfd Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 15 Oct 2024 11:20:23 -0700 Subject: [PATCH 14/18] Factor message-handling loop into invididual functions. This change makes the loop much easier to read and edit. However, the functions need to take an inordinate number of "global" variables, so they could use some further factoring. --- src/main.py | 303 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 194 insertions(+), 109 deletions(-) diff --git a/src/main.py b/src/main.py index 469fe5a..ed213b4 100644 --- a/src/main.py +++ b/src/main.py @@ -43,6 +43,40 @@ class NextVisitModel: totalCheckpoints: int private_sndStamp: float + @staticmethod + def from_raw_message(message: dict[str, typing.Any]): + """Factory creating a NextVisitModel from an unpacked message. + + Parameters + ---------- + message : `dict` [`str`] + A mapping containing message fields. + + Returns + ------- + model : `NextVisitModel` + An object containing the fields in the message. + """ + # Message may contain fields that aren't in NextVisitModel + return NextVisitModel( + salIndex=message["salIndex"], + scriptSalIndex=message["scriptSalIndex"], + instrument=message["instrument"], + groupId=message["groupId"], + coordinateSystem=message["coordinateSystem"], + position=message["position"], + startTime=message["startTime"], + rotationSystem=message["rotationSystem"], + cameraAngle=message["cameraAngle"], + filters=message["filters"], + dome=message["dome"], + duration=message["duration"], + nimages=message["nimages"], + survey=message["survey"], + totalCheckpoints=message["totalCheckpoints"], + private_sndStamp=message["private_sndStamp"], + ) + def add_detectors( self, active_detectors: list, @@ -161,6 +195,102 @@ class Submission: """The messages to send to ``url`` (collection [`dict`]).""" +class UnsupportedMessageError(RuntimeError): + """Exception raised if there is no Prompt Processing instance for a given + nextVisit message. + """ + pass + + +def is_handleable(message: dict[str, typing.Any], expire: float) -> bool: + """Test whether a nextVisit message has enough data to be handled by + fan-out. + + This function emits explanatory logs as a side effect. + + Parameters + ---------- + message : `dict` [`str`] + An unpacked mapping of message fields. + expire : `float` + The maximum age, in seconds, that a message can still be handled. + + Returns + ------- + handleable : `bool` + `True` is the message can be processed, `False` otherwise. + """ + if not message["instrument"]: + logging.info("Message does not have an instrument. Assuming it's not an observation.") + return False + + # efdStamp is visit publication, in seconds since 1970-01-01 UTC + if message["private_efdStamp"]: + published = message["private_efdStamp"] + age = round(time.time() - published) # Microsecond precision is distracting + if age > expire: + logging.warning("Message published on %s UTC is %s old, ignoring.", + time.ctime(published), + datetime.timedelta(seconds=age) + ) + return False + else: + logging.warning("Message does not have private_efdStamp, can't determine age.") + return True + + +def make_fanned_out_messages(message: NextVisitModel, + instruments: collections.abc.Mapping[str, InstrumentConfig], + gauges: collections.abc.Mapping[str, Metrics], + hsc_upload_detectors: collections.abc.Mapping[int, + collections.abc.Collection[int]] + ) -> Submission: + """Create appropriate fanned-out messages for an incoming message. + + Parameters + ---------- + message : `NextVisitModel` + The message to fan out. + instruments : mapping [`str`, `InstrumentConfig`] + A mapping from instrument name to configuration information. + gauges : mapping [`str`, `Metrics`] + A mapping from instrument name to metrics for that instrument. + hsc_upload_detectors : mapping [`int`, collection [`int`]] + A mapping from HSC-Cosmos visit to the supported detectors for that visit. + + Returns + ------- + send_info : `Submission` + The fanned out messages, along with where to send them. + + Raises + ------ + UnsupportedMessageError + Raised if ``message`` cannot be fanned-out or sent. + """ + match message.instrument: + case "LSSTComCam" | "LSSTCam" as instrument: + raise UnsupportedMessageError(f"Ignore {instrument} message {message}" + " as the prompt service for this is not yet deployed.") + case "HSC": + # HSC has extra active detector configurations just for the + # upload.py test. + match message.salIndex: + case 999: # HSC datasets from using upload_from_repo.py + gauges["HSC"].total_received.inc() + return fan_out(message, instruments["HSC"]) + case visit if visit in hsc_upload_detectors: # upload.py test datasets + gauges["HSC"].total_received.inc() + return fan_out_hsc(message, instruments["HSC"], hsc_upload_detectors[visit]) + case _: + raise UnsupportedMessageError(f"No matching case for HSC salIndex {message.salIndex}") + case instrument if instrument in instruments: + gauges[instrument].total_received.inc() + return fan_out(message, instruments[instrument]) + case _: + raise UnsupportedMessageError(f"no matching case for instrument {message.instrument}.") + + def fan_out(next_visit, inst_config): """Prepare fanned-out messages for sending to the Prompt Processing service. @@ -199,6 +329,61 @@ def fan_out_hsc(next_visit, inst_config, detectors): return Submission(inst_config.url, next_visit.add_detectors(detectors)) +def dispatch_fanned_out_messages(client: httpx.AsyncClient, + topic: str, + tasks: collections.abc.MutableSet[asyncio.Task], + send_info: Submission, + gauges: collections.abc.Mapping[str, Metrics], + ): + """Package and send the fanned-out messages to Prompt Processing. + + Parameters + ---------- + client : `httpx.AsyncClient` + The client to which to upload the messages. + topic : `str` + The topic to which to upload the messages. + tasks : set [`asyncio.Task`] + Collection for holding the requests. + send_info : `Submission` + The data and address to submit. + gauges : mapping [`str`, `Metrics`] + A mapping from instrument name to metrics for that instrument. + """ + try: + attributes = { + "type": "com.example.kafka", + "source": topic, + } + + for fan_out_message in send_info.fan_out_messages: + data = fan_out_message + data_json = json.dumps(data) + + logging.info(f"data after json dump {data_json}") + event = CloudEvent(attributes, data_json) + headers, body = to_structured(event) + info = { + key: data[key] for key in ["instrument", "groupId", "detector"] + } + + task = asyncio.create_task( + knative_request( + gauges[fan_out_message["instrument"]].in_process, + client, + send_info.url, + headers, + body, + str(info), + ) + ) + tasks.add(task) + task.add_done_callback(tasks.discard) + + except ValueError: + logging.exception("Error while sending fanned-out messages.") + + @REQUEST_TIME.time() async def knative_request( in_process_requests_gauge, @@ -311,122 +496,22 @@ async def main() -> None: while True: # run continously async for msg in consumer: - next_visit_message_initial = await deserializer.deserialize( data=msg.value ) - logging.info(f"message deserialized {next_visit_message_initial}") - - if not next_visit_message_initial["message"]["instrument"]: - logging.info("Message does not have an instrument. Assuming " - "it's not an observation.") + if not is_handleable(next_visit_message_initial["message"], expire): continue - # efdStamp is visit publication, in seconds since 1970-01-01 UTC - if next_visit_message_initial["message"]["private_efdStamp"]: - published = next_visit_message_initial["message"]["private_efdStamp"] - age = round(time.time() - published) # Microsecond precision is distracting - if age > expire: - logging.warning("Message published on %s UTC is %s old, ignoring.", - time.ctime(published), - datetime.timedelta(seconds=age) - ) - continue - else: - logging.warning("Message does not have private_efdStamp, can't determine age.") - - next_visit_message_updated = NextVisitModel( - salIndex=next_visit_message_initial["message"]["salIndex"], - scriptSalIndex=next_visit_message_initial["message"][ - "scriptSalIndex" - ], - instrument=next_visit_message_initial["message"]["instrument"], - groupId=next_visit_message_initial["message"]["groupId"], - coordinateSystem=next_visit_message_initial["message"][ - "coordinateSystem" - ], - position=next_visit_message_initial["message"]["position"], - startTime=next_visit_message_initial["message"]["startTime"], - rotationSystem=next_visit_message_initial["message"][ - "rotationSystem" - ], - cameraAngle=next_visit_message_initial["message"][ - "cameraAngle" - ], - filters=next_visit_message_initial["message"]["filters"], - dome=next_visit_message_initial["message"]["dome"], - duration=next_visit_message_initial["message"]["duration"], - nimages=next_visit_message_initial["message"]["nimages"], - survey=next_visit_message_initial["message"]["survey"], - totalCheckpoints=next_visit_message_initial["message"][ - "totalCheckpoints" - ], - private_sndStamp=next_visit_message_initial["message"][ - "private_sndStamp" - ], + next_visit_message_updated = NextVisitModel.from_raw_message( + next_visit_message_initial["message"] ) - - match next_visit_message_updated.instrument: - case "LSSTComCam" | "LSSTCam" as instrument: - logging.info(f"Ignore {instrument} message {next_visit_message_updated}" - " as the prompt service for this is not yet deployed.") - continue - case "HSC": - # HSC has extra active detector configurations just for the - # upload.py test. - match next_visit_message_updated.salIndex: - case 999: # HSC datasets from using upload_from_repo.py - gauges["HSC"].total_received.inc() - send_info = fan_out(next_visit_message_updated, instruments["HSC"]) - case visit if visit in hsc_upload_detectors: # upload.py test datasets - gauges["HSC"].total_received.inc() - send_info = fan_out_hsc(next_visit_message_updated, instruments["HSC"], - hsc_upload_detectors[visit]) - case _: - raise RuntimeError("No matching case for HSC " - f"salIndex {next_visit_message_updated.salIndex}") - case instrument if instrument in supported_instruments: - gauges[instrument].total_received.inc() - send_info = fan_out(next_visit_message_updated, instruments[instrument]) - case _: - raise RuntimeError( - f"no matching case for instrument {next_visit_message_updated.instrument}." - ) - - try: - attributes = { - "type": "com.example.kafka", - "source": topic, - } - - for fan_out_message in send_info.fan_out_messages: - data = fan_out_message - data_json = json.dumps(data) - - logging.info(f"data after json dump {data_json}") - event = CloudEvent(attributes, data_json) - headers, body = to_structured(event) - info = { - key: data[key] for key in ["instrument", "groupId", "detector"] - } - - task = asyncio.create_task( - knative_request( - gauges[fan_out_message["instrument"]].in_process, - client, - send_info.url, - headers, - body, - str(info), - ) - ) - tasks.add(task) - task.add_done_callback(tasks.discard) - - except ValueError as e: - logging.info("Error ", e) - + send_info = make_fanned_out_messages(next_visit_message_updated, + instruments, + gauges, + hsc_upload_detectors, + ) + dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges) finally: await consumer.stop() From 293d5d49f365f65712a5a5891589c2be65e7b577 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Tue, 15 Oct 2024 11:28:37 -0700 Subject: [PATCH 15/18] Remove special-casing of LSSTComCam/LSSTCam. The original behavior was to crash and restart the entire fan-out service if it got a message it didn't know how to handle. If the message loop instead catches the corresponding exception, then there is no need to specifically exempt the LSST cameras. With this commit, it's now safe to configure the SUPPORTED_INSTRUMENTS env variable. --- src/main.py | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/main.py b/src/main.py index ed213b4..72119b0 100644 --- a/src/main.py +++ b/src/main.py @@ -269,9 +269,6 @@ def make_fanned_out_messages(message: NextVisitModel, Raised if ``message`` cannot be fanned-out or sent. """ match message.instrument: - case "LSSTComCam" | "LSSTCam" as instrument: - raise UnsupportedMessageError(f"Ignore {instrument} message {message}" - " as the prompt service for this is not yet deployed.") case "HSC": # HSC has extra active detector configurations just for the # upload.py test. @@ -496,22 +493,25 @@ async def main() -> None: while True: # run continously async for msg in consumer: - next_visit_message_initial = await deserializer.deserialize( - data=msg.value - ) - logging.info(f"message deserialized {next_visit_message_initial}") - if not is_handleable(next_visit_message_initial["message"], expire): - continue - - next_visit_message_updated = NextVisitModel.from_raw_message( - next_visit_message_initial["message"] - ) - send_info = make_fanned_out_messages(next_visit_message_updated, - instruments, - gauges, - hsc_upload_detectors, - ) - dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges) + try: + next_visit_message_initial = await deserializer.deserialize( + data=msg.value + ) + logging.info(f"message deserialized {next_visit_message_initial}") + if not is_handleable(next_visit_message_initial["message"], expire): + continue + + next_visit_message_updated = NextVisitModel.from_raw_message( + next_visit_message_initial["message"] + ) + send_info = make_fanned_out_messages(next_visit_message_updated, + instruments, + gauges, + hsc_upload_detectors, + ) + dispatch_fanned_out_messages(client, topic, tasks, send_info, gauges) + except UnsupportedMessageError: + logging.exception("Could not process message, continuing.") finally: await consumer.stop() From 55ab86a4bf73a35217ec767cfaa01eeb7fa13c31 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 16 Oct 2024 14:44:21 -0700 Subject: [PATCH 16/18] Add a debug logging flag to the service. This makes it possible to change logging levels from the configuration. This commit also replaces the double `logging.basicConfig` calls of the original code; `basicConfig` only has an effect the first time it's called. --- src/main.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main.py b/src/main.py index 72119b0..ff24d27 100644 --- a/src/main.py +++ b/src/main.py @@ -453,8 +453,10 @@ async def main() -> None: security_protocol = os.environ["SECURITY_PROTOCOL"] # Logging config - logging.basicConfig(stream=sys.stdout, level=logging.INFO) - logging.basicConfig(stream=sys.stderr, level=logging.WARNING) + if os.environ.get("DEBUG_LOGS") == "true": + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + else: + logging.basicConfig(stream=sys.stdout, level=logging.INFO) conf = yaml.safe_load(Path(instrument_config_file).read_text()) instruments = {inst: InstrumentConfig(conf, inst) for inst in supported_instruments} From 0b6faa23660d3ee6a4b99ac4f0f6d388f338fbb8 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 16 Oct 2024 17:25:06 -0700 Subject: [PATCH 17/18] Make max outgoing connections configurable. The code previously used the default of 100, which is much less than the number of concurrent images we will need to handle with LSSTCam. --- src/main.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index ff24d27..244b943 100644 --- a/src/main.py +++ b/src/main.py @@ -445,6 +445,7 @@ async def main() -> None: offset = os.environ["OFFSET"] expire = float(os.environ["MESSAGE_EXPIRATION"]) kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"] + max_outgoing = int(os.environ["MAX_FAN_OUT_MESSAGES"]) # kafka auth sasl_username = os.environ["SASL_USERNAME"] @@ -484,7 +485,8 @@ async def main() -> None: tasks = set() - async with httpx.AsyncClient() as client: + limits = httpx.Limits(max_connections=max_outgoing) + async with httpx.AsyncClient(limits=limits) as client: try: # Setup kafka schema registry connection and deserialzer From d6b361a65a19e8bd1272496d8ec193b4fc1edfb4 Mon Sep 17 00:00:00 2001 From: Krzysztof Findeisen Date: Wed, 16 Oct 2024 17:39:37 -0700 Subject: [PATCH 18/18] Fix deprecation warning in Dockerfile. --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 38b6a9e..f86a71e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,8 +1,8 @@ FROM python:3.10-slim-buster -ENV PYTHONUNBUFFERED True +ENV PYTHONUNBUFFERED=True -#ENV PYTHONASYINCIODEBUG 1 +#ENV PYTHONASYINCIODEBUG=1 WORKDIR /app