Skip to content

Commit

Permalink
feat(wadm)!: Switch wadm events to limits-based stream and create a n…
Browse files Browse the repository at this point in the history
…ew sourcing stream for EventConsumer

Signed-off-by: Joonas Bergius <[email protected]>
  • Loading branch information
joonas authored and thomastaylor312 committed Jul 2, 2024
1 parent 8f3efe3 commit 8bdaba7
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 211 deletions.
16 changes: 11 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ MAKEFLAGS += --no-builtin-rules
MAKEFLAGS += --no-print-directory
MAKEFLAGS += -S

OS_NAME := $(shell uname -s | tr '[:upper:]' '[:lower:]')
ifeq ($(OS_NAME),darwin)
NC_FLAGS := -czt
else
NC_FLAGS := -Czt
endif

.DEFAULT: help

CARGO ?= cargo
Expand Down Expand Up @@ -70,7 +77,7 @@ build-docker: ## Build wadm docker image
CARGO_TEST_TARGET ?=

test:: ## Run tests
ifeq ($(shell nc -czt -w1 127.0.0.1 4222 || echo fail),fail)
ifeq ($(shell nc $(NC_FLAGS) -w1 127.0.0.1 4222 || echo fail),fail)
$(DOCKER) run --rm -d --name wadm-test -p 127.0.0.1:4222:4222 nats:2.10 -js
$(CARGO) test $(CARGO_TEST_TARGET) -- --nocapture
$(DOCKER) stop wadm-test
Expand All @@ -79,7 +86,7 @@ else
endif

test-e2e:: ## Run e2e tests
ifeq ($(shell nc -czt -w1 127.0.0.1 4222 || echo fail),fail)
ifeq ($(shell nc $(NC_FLAGS) -w1 127.0.0.1 4222 || echo fail),fail)
@$(MAKE) build
@# Reenable this once we've enabled all tests
@# RUST_BACKTRACE=1 $(CARGO) test --test e2e_multitenant --features _e2e_tests -- --nocapture
Expand All @@ -91,7 +98,7 @@ else
endif

test-individual-e2e:: ## Runs an individual e2e test based on the WADM_E2E_TEST env var
ifeq ($(shell nc -czt -w1 127.0.0.1 4222 || echo fail),fail)
ifeq ($(shell nc $(NC_FLAGS) -w1 127.0.0.1 4222 || echo fail),fail)
@$(MAKE) build
RUST_BACKTRACE=1 $(CARGO) test --test $(WADM_E2E_TEST) --features _e2e_tests -- --nocapture
else
Expand All @@ -106,9 +113,8 @@ endif
stream-cleanup: ## Removes all streams that wadm creates
-$(NATS) stream del wadm_commands --force
-$(NATS) stream del wadm_events --force
-$(NATS) stream del wadm_event_consumer --force
-$(NATS) stream del wadm_notify --force
-$(NATS) stream del wadm_mirror --force
-$(NATS) stream del wadm_multitenant_mirror --force
-$(NATS) stream del wadm_status --force
-$(NATS) stream del KV_wadm_state --force
-$(NATS) stream del KV_wadm_manifests --force
Expand Down
2 changes: 1 addition & 1 deletion crates/wadm/src/consumers/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{CreateConsumer, ScopedMessage};
use crate::events::*;

/// The name of the durable NATS stream and consumer that contains incoming lattice events
pub const EVENTS_CONSUMER_PREFIX: &str = "wadm_events";
pub const EVENTS_CONSUMER_PREFIX: &str = "wadm_event_consumer";

/// A stream of all events of a lattice, consumed from a durable NATS stream and consumer
pub struct EventConsumer {
Expand Down
8 changes: 4 additions & 4 deletions crates/wadm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::time::Duration;
pub mod commands;
pub mod consumers;
pub mod events;
pub mod mirror;
pub mod nats_utils;
pub mod publisher;
pub mod scaler;
Expand All @@ -26,9 +25,10 @@ pub const DEFAULT_MULTITENANT_EVENTS_TOPIC: &str = "*.wasmbus.evt.*.>";
pub const DEFAULT_COMMANDS_TOPIC: &str = "wadm.cmd.*";
/// Default topic to listen to for all status updates. wadm.status.<lattice_id>.<manifest_name>
pub const DEFAULT_STATUS_TOPIC: &str = "wadm.status.*.*";
/// The default listen topic for the merged wadm events stream. This topic is an amalgamation of
/// wasmbus.evt topics plus the wadm.internal topics
pub const DEFAULT_WADM_EVENTS_TOPIC: &str = "wadm.evt.*";
/// Default topic to listen to for all wadm event updates
pub const DEFAULT_WADM_EVENTS_TOPIC: &str = "wadm.evt.*.>";
/// Default internal wadm event consumer listen topic for the merged wadm and wasmbus events stream.
pub const DEFAULT_WADM_EVENT_CONSUMER_TOPIC: &str = "wadm_event_consumer.evt.*.>";
/// Managed by annotation used for labeling things properly in wadm
pub const MANAGED_BY_ANNOTATION: &str = "wasmcloud.dev/managed-by";
/// Identifier for managed by annotation. This is the value [`MANAGED_BY_ANNOTATION`] is set to
Expand Down
152 changes: 0 additions & 152 deletions crates/wadm/src/mirror/mod.rs

This file was deleted.

11 changes: 9 additions & 2 deletions crates/wadm/src/server/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,27 @@ impl<P: Publisher> ManifestNotifier<P> {
}

#[instrument(level = "trace", skip(self))]
async fn send_event(&self, lattice_id: &str, event: Event) -> anyhow::Result<()> {
async fn send_event(
&self,
lattice_id: &str,
event_subject_key: &str,
event: Event,
) -> anyhow::Result<()> {
let event: CloudEvent = event.try_into()?;
// NOTE(thomastaylor312): A future improvement could be retries here
trace!("Sending notification event");
self.publisher
.publish(
serde_json::to_vec(&event)?,
Some(&format!("{}.{lattice_id}", self.prefix)),
Some(&format!("{}.{lattice_id}.{event_subject_key}", self.prefix)),
)
.await
}

pub async fn deployed(&self, lattice_id: &str, manifest: Manifest) -> anyhow::Result<()> {
self.send_event(
lattice_id,
"manifest_published",
Event::ManifestPublished(ManifestPublished { manifest }),
)
.await
Expand All @@ -48,6 +54,7 @@ impl<P: Publisher> ManifestNotifier<P> {
pub async fn undeployed(&self, lattice_id: &str, name: &str) -> anyhow::Result<()> {
self.send_event(
lattice_id,
"manifest_unpublished",
Event::ManifestUnpublished(ManifestUnpublished {
name: name.to_owned(),
}),
Expand Down
Loading

0 comments on commit 8bdaba7

Please sign in to comment.