Skip to content

Commit

Permalink
Various fixes (#30)
Browse files Browse the repository at this point in the history
Docker build
* Cleanup apt
* Add iex.sh to allow to use observer in prod
* Improve cache

Neurow
* Improve main SSE loop
* Improve monitoring on connections
* Export memory usage

Load test
* Allow to configure User-Agent
* Support ping
* Various fixes
  • Loading branch information
bpaquet authored Oct 2, 2024
1 parent b2ff68c commit f20de8d
Show file tree
Hide file tree
Showing 17 changed files with 194 additions and 52 deletions.
1 change: 1 addition & 0 deletions load_test/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ config :logger, :console,
config :load_test, port: String.to_integer(System.get_env("PORT") || "2999")
config :load_test, nb_user: String.to_integer(System.get_env("NB_USER") || "1")

config :load_test, sse_user_agent: System.get_env("SSE_USER_AGENT") || "neurow_load_test/1.0"
config :load_test, sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "15000")
config :load_test, sse_url: System.get_env("SSE_URL") || "http://localhost:4000/v1/subscribe"
config :load_test, sse_jwt_issuer: System.get_env("SSE_JWT_ISSUER") || "test_issuer1"
Expand Down
9 changes: 7 additions & 2 deletions load_test/lib/load_test/main.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule LoadTest.Main do
:sse_jwt_secret,
:sse_jwt_audience,
:sse_jwt_expiration,
:sse_user_agent,
:publish_url,
:publish_timeout,
:publish_jwt_issuer,
Expand All @@ -32,6 +33,7 @@ defmodule LoadTest.Main do

{:ok, sse_timeout} = Application.fetch_env(:load_test, :sse_timeout)
{:ok, sse_url} = Application.fetch_env(:load_test, :sse_url)
{:ok, sse_user_agent} = Application.fetch_env(:load_test, :sse_user_agent)
{:ok, sse_jwt_issuer} = Application.fetch_env(:load_test, :sse_jwt_issuer)
{:ok, sse_jwt_secret} = Application.fetch_env(:load_test, :sse_jwt_secret)
{:ok, sse_jwt_audience} = Application.fetch_env(:load_test, :sse_jwt_audience)
Expand All @@ -57,6 +59,7 @@ defmodule LoadTest.Main do
context = %InjectionContext{
sse_timeout: sse_timeout,
sse_url: sse_url,
sse_user_agent: sse_user_agent,
sse_jwt_issuer: sse_jwt_issuer,
sse_jwt_secret: JOSE.JWK.from_oct(sse_jwt_secret),
sse_jwt_audience: sse_jwt_audience,
Expand Down Expand Up @@ -108,7 +111,9 @@ defmodule LoadTest.Main do

Task.await(sse_task, :infinity)

run_virtual_user(context)
Task.Supervisor.start_child(LoadTest.TaskSupervisor, fn ->
run_virtual_user(context)
end)
end

def start_publisher(context, user_name, topic, messages) do
Expand Down Expand Up @@ -160,7 +165,7 @@ defmodule LoadTest.Main do
end

@impl true
def handle_info({_, :ok}, state) do
def handle_info(_, state) do
{:noreply, state}
end
end
2 changes: 1 addition & 1 deletion load_test/lib/load_test/user/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ defmodule LoadTest.User.Publisher do
start_time: start_time
}

Logger.info(fn ->
Logger.debug(fn ->
"publisher_#{state.user_name}: Start publishing #{length(messages)} messages to #{state.publish_url}, topic #{topic}"
end)

Expand Down
29 changes: 20 additions & 9 deletions load_test/lib/load_test/user/sse.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ defmodule SseUser do
signed = JOSE.JWT.sign(context.sse_jwt_secret, jws, jwt)
{%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed)

[{["Authorization"], "Bearer #{compact_signed}"}]
[
{["Authorization"], "Bearer #{compact_signed}"},
{["User-Agent"], context.sse_user_agent}
]
end

def run(context, user_name, topic, expected_messages) do
Expand All @@ -52,16 +55,15 @@ defmodule SseUser do
user_name: user_name,
start_time: :os.system_time(:millisecond),
all_messages: length(expected_messages),
current_message: -1,
current_message: 0,
url: url,
sse_timeout: context.sse_timeout,
start_publisher_callback: fn ->
LoadTest.Main.start_publisher(context, user_name, topic, expected_messages)
end
}

# Adding a padding message for the connection message
wait_for_messages(state, request_id, ["" | expected_messages])
wait_for_messages(state, request_id, expected_messages)
end

defp wait_for_messages(state, request_id, [first_message | remaining_messages]) do
Expand All @@ -81,18 +83,26 @@ defmodule SseUser do
if msg =~ "event: ping" do
wait_for_messages(state, request_id, [first_message | remaining_messages])
else
check_message(state, msg, first_message)
if check_message(state, msg, first_message) == :error do
:ok = :httpc.cancel_request(request_id)
raise("#{header(state)} Message check error")
end

state = Map.put(state, :current_message, state.current_message + 1)
wait_for_messages(state, request_id, remaining_messages)
end

{:http, {_, :stream_start, headers}} ->
{~c"x-sse-server", server} = List.keyfind(headers, ~c"x-sse-server", 0)

Logger.info(fn ->
Logger.debug(fn ->
"#{header(state)} Connected, waiting: #{length(remaining_messages) + 1} messages, url #{state.url}, remote server: #{server}"
end)

state.start_publisher_callback.()

wait_for_messages(state, request_id, [first_message | remaining_messages])

msg ->
Logger.error("#{header(state)} Unexpected message #{inspect(msg)}")
:ok = :httpc.cancel_request(request_id)
Expand All @@ -108,9 +118,6 @@ defmodule SseUser do
:ok = :httpc.cancel_request(request_id)
raise("#{header(state)} Timeout waiting for message")
end

state = Map.put(state, :current_message, state.current_message + 1)
wait_for_messages(state, request_id, remaining_messages)
end

defp wait_for_messages(state, request_id, []) do
Expand Down Expand Up @@ -139,17 +146,21 @@ defmodule SseUser do

if message == expected_message do
Stats.inc_msg_received_ok()
:ok
else
Stats.inc_msg_received_unexpected_message()

Logger.error(
"#{header(state)} Received unexpected message on url #{state.url}: #{inspect(received_message)} instead of #{expected_message}"
)

:error
end
rescue
e ->
Logger.error("#{header(state)} #{inspect(e)}")
Stats.inc_msg_received_error()
:error
end
end
end
14 changes: 13 additions & 1 deletion load_test/lib/stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,16 @@ defmodule Stats do
Gauge.set([name: :messages, labels: [:published, :ok]], 0)
Gauge.set([name: :messages, labels: [:published, :error]], 0)

Gauge.declare(
name: :memory_usage,
help: "Memory usage"
)

Periodic.start_link(
run: fn -> Summary.reset(name: :propagation_delay) end,
run: fn ->
set_memory_usage()
Summary.reset(name: :propagation_delay)
end,
every: :timer.seconds(10)
)
end
Expand Down Expand Up @@ -87,4 +95,8 @@ defmodule Stats do
def observe_propagation(delay) do
Summary.observe([name: :propagation_delay], delay)
end

defp set_memory_usage() do
Gauge.set([name: :memory_usage], :recon_alloc.memory(:usage))
end
end
3 changes: 2 additions & 1 deletion load_test/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ defmodule LoadTest.MixProject do
{:uuid, "~> 1.1"},
{:finch, "~> 0.18"},
{:jose, "~> 1.11"},
{:jiffy, "~> 1.1"}
{:jiffy, "~> 1.1"},
{:observer_cli, "~> 1.7"}
]
end
end
2 changes: 2 additions & 0 deletions load_test/mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"mint": {:hex, :mint, "1.6.2", "af6d97a4051eee4f05b5500671d47c3a67dac7386045d87a904126fd4bbcea2e", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "5ee441dffc1892f1ae59127f74afe8fd82fda6587794278d924e4d90ea3d63f9"},
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"observer_cli": {:hex, :observer_cli, "1.7.5", "cf73407c40ba3933a4be8be5cdbfcd647a7ec24b49f1d75e912ae1f2e58bc5d4", [:mix, :rebar3], [{:recon, "~> 2.5.5", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "872cf8e833a3a71ebd05420692678ec8aaede8fd96c805a4687398f6b23a3014"},
"parent": {:hex, :parent, "0.12.1", "495c4386f06de0df492e0a7a7199c10323a55e9e933b27222060dd86dccd6d62", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "2ab589ef1f37bfcedbfb5ecfbab93354972fb7391201b8907a866dadd20b39d1"},
"plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"},
Expand All @@ -21,6 +22,7 @@
"prometheus_plugs": {:hex, :prometheus_plugs, "1.1.5", "25933d48f8af3a5941dd7b621c889749894d8a1082a6ff7c67cc99dec26377c5", [:mix], [{:accept, "~> 0.1", [hex: :accept, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}, {:prometheus_process_collector, "~> 1.1", [hex: :prometheus_process_collector, repo: "hexpm", optional: true]}], "hexpm", "0273a6483ccb936d79ca19b0ab629aef0dba958697c94782bb728b920dfc6a79"},
"quantile_estimator": {:hex, :quantile_estimator, "0.2.1", "ef50a361f11b5f26b5f16d0696e46a9e4661756492c981f7b2229ef42ff1cd15", [:rebar3], [], "hexpm", "282a8a323ca2a845c9e6f787d166348f776c1d4a41ede63046d72d422e3da946"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"recon": {:hex, :recon, "2.5.6", "9052588e83bfedfd9b72e1034532aee2a5369d9d9343b61aeb7fbce761010741", [:mix, :rebar3], [], "hexpm", "96c6799792d735cc0f0fd0f86267e9d351e63339cbe03df9d162010cefc26bb0"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"uuid": {:hex, :uuid, "1.1.8", "e22fc04499de0de3ed1116b770c7737779f226ceefa0badb3592e64d5cfb4eb9", [:mix], [], "hexpm", "c790593b4c3b601f5dc2378baae7efaf5b3d73c4c6456ba85759905be792f2ac"},
}
30 changes: 29 additions & 1 deletion load_test/terraform/instances.tf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ echo "instances:
- users
- propagation_delay_sum
- propagation_delay_count
- memory_usage
- erlang_vm_memory_atom_bytes_total
- erlang_vm_memory_bytes_total
- erlang_vm_memory_dets_tables
- erlang_vm_memory_ets_tables
- erlang_vm_memory_processes_bytes_total
- erlang_vm_memory_system_bytes_total
- erlang_vm_process_count
" >> /etc/datadog-agent/conf.d/prometheus.d/conf.yaml
service datadog-agent restart
EOF
Expand Down Expand Up @@ -160,6 +168,24 @@ resource "aws_security_group_rule" "neurow_load_test_outbound" {
security_group_id = aws_security_group.neurow_load_test.id
}

resource "aws_security_group_rule" "neurow_load_test_inbound_lb" {
type = "ingress"
from_port = 2999
to_port = 2999
protocol = "tcp"
source_security_group_id = aws_security_group.internal_lb.id
security_group_id = aws_security_group.neurow_load_test.id
}

resource "aws_security_group_rule" "neurow_load_test_inbound_self" {
type = "ingress"
from_port = 2999
to_port = 2999
protocol = "tcp"
source_security_group_id = aws_security_group.neurow_load_test.id
security_group_id = aws_security_group.neurow_load_test.id
}

resource "aws_autoscaling_group" "neurow_load_test" {
name = "${var.resource_prefix}-load-test"
desired_capacity = var.desired_capacity
Expand All @@ -180,5 +206,7 @@ resource "aws_autoscaling_group" "neurow_load_test" {
version = aws_launch_template.neurow_load_test.latest_version
}

health_check_type = "EC2"
target_group_arns = [aws_lb_target_group.internal.arn]

health_check_type = "ELB"
}
49 changes: 49 additions & 0 deletions load_test/terraform/nlb_internal.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
resource "aws_security_group" "internal_lb" {
name = "${var.resource_prefix}-lb-load-test"
vpc_id = data.aws_subnet.first_public.vpc_id
}

resource "aws_security_group_rule" "internal_lb_outbound" {
type = "egress"
from_port = 2999
to_port = 2999
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
security_group_id = aws_security_group.internal_lb.id
}

resource "aws_lb" "internal" {
name = "${var.resource_prefix}-load-test"
internal = true
load_balancer_type = "network"

subnets = var.public_subnet_ids
security_groups = [aws_security_group.internal_lb.id]

enable_cross_zone_load_balancing = false
}

resource "aws_lb_listener" "internal" {
load_balancer_arn = aws_lb.internal.arn
port = "2999"
protocol = "TCP"

default_action {
type = "forward"
target_group_arn = aws_lb_target_group.internal.arn
}
}

resource "aws_lb_target_group" "internal" {
name = "${var.resource_prefix}-load-test"
port = 2999
protocol = "TCP"
vpc_id = data.aws_subnet.first_public.vpc_id

health_check {
path = "/metrics"
port = 2999
protocol = "HTTP"
interval = 10
}
}
2 changes: 1 addition & 1 deletion load_test/terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ variable "max_size" {

variable "nb_users" {
type = number
default = 7500
default = 20000
}

variable "neurow_revision" {
Expand Down
16 changes: 10 additions & 6 deletions neurow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
ARG BUILDER_IMAGE=elixir:1.17-slim
ARG BUILDER_IMAGE=elixir
ARG VERSION=1.17-slim

FROM ${BUILDER_IMAGE} AS builder
FROM ${BUILDER_IMAGE}:${VERSION} AS builder

RUN apt-get update \
&& apt-get install -y --no-install-recommends git build-essential ca-certificates \
Expand All @@ -14,23 +15,26 @@ ENV MIX_ENV=prod
RUN mix local.hex --force

COPY mix.exs mix.lock /app/
RUN mix deps.get
RUN mix deps.get \
&& mix deps.compile

COPY config /app/config/
COPY lib /app/lib/

ARG GIT_COMMIT_SHA1=no_commit
RUN mix release

FROM ${BUILDER_IMAGE}
FROM ${BUILDER_IMAGE}:${VERSION}

RUN apt-get update \
&& apt-get install -y --no-install-recommends curl dnsutils
&& apt-get install -y --no-install-recommends curl dnsutils \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

RUN mkdir /app
WORKDIR /app

COPY start.sh /start.sh
COPY start.sh iex.sh /
COPY --from=builder /app/_build/prod/rel/neurow /app/

ENV RELEASE_TMP=/tmp/
Expand Down
8 changes: 8 additions & 0 deletions neurow/iex.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/sh -e

if [ "$POD_IP" != "" ]; then
export RELEASE_DISTRIBUTION="name"
export RELEASE_NODE="neurow@${POD_IP}"
fi

/app/bin/neurow remote
Loading

0 comments on commit f20de8d

Please sign in to comment.