diff --git a/apps/transport/lib/transport_web/plugs/router.ex b/apps/transport/lib/transport_web/plugs/router.ex index d1cbfb54a9..4ee9e686e6 100644 --- a/apps/transport/lib/transport_web/plugs/router.ex +++ b/apps/transport/lib/transport_web/plugs/router.ex @@ -2,7 +2,7 @@ defmodule TransportWeb.Plugs.Router do use Plug.Router plug(TransportWeb.Plugs.HealthCheck, at: "/health-check") - plug(TransportWeb.Plugs.Halt, if: {Transport.Application, :worker_only?}, message: "UP (WORKER-ONLY)") + plug(TransportWeb.Plugs.WorkerHealthcheck, if: {Transport.Application, :worker_only?}) plug(:match) plug(:dispatch) diff --git a/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex new file mode 100644 index 0000000000..269386d259 --- /dev/null +++ b/apps/transport/lib/transport_web/plugs/worker_healthcheck.ex @@ -0,0 +1,82 @@ +defmodule TransportWeb.Plugs.WorkerHealthcheck do + @moduledoc """ + A plug for the worker. + It can be conditionally enabled by passing an `:if` condition that will be evaluated. + + It displays: + - when the app was started + - the last attempt for Oban jobs + - if the system is healthy + + The system is considered healthy if the app was started recently or + if Oban attempted jobs recently. + """ + import Plug.Conn + + @app_start_waiting_delay {20, :minute} + @oban_max_delay_since_last_attempt {60, :minute} + + def init(options), do: options + + def call(conn, opts) do + {mod, fun} = opts[:if] + + if apply(mod, fun, []) do + store_last_attempted_at_delay_metric() + status_code = if healthy_state?(), do: 200, else: 503 + + conn + |> put_resp_content_type("text/plain") + |> send_resp(status_code, """ + UP (WORKER-ONLY) + App start time: #{app_start_datetime()} + App started recently?: #{app_started_recently?()} + Oban last attempt: #{oban_last_attempted_at()} + Oban attempted jobs recently?: #{oban_attempted_jobs_recently?()} + Healthy state?: #{healthy_state?()} + """) + |> halt() + else + conn + end + end + + def store_last_attempted_at_delay_metric do + value = DateTime.diff(oban_last_attempted_at(), DateTime.utc_now(), :second) + Appsignal.add_distribution_value("oban.last_attempted_at_delay", value) + end + + def healthy_state? do + app_started_recently?() or oban_attempted_jobs_recently?() + end + + def app_started_recently? do + {delay, unit} = @app_start_waiting_delay + DateTime.diff(DateTime.utc_now(), app_start_datetime(), unit) < delay + end + + def app_start_datetime do + Transport.Cache.fetch(app_start_datetime_cache_key_name(), fn -> DateTime.utc_now() end, expire: nil) + end + + def app_start_datetime_cache_key_name, do: "#{__MODULE__}::app_start_datetime" + + def oban_attempted_jobs_recently? do + {delay, unit} = @oban_max_delay_since_last_attempt + DateTime.after?(oban_last_attempted_at(), DateTime.add(DateTime.utc_now(), -delay, unit)) + end + + def oban_last_attempted_at do + %Postgrex.Result{rows: [[delay]]} = + DB.Repo.query!(""" + SELECT MAX(attempted_at) + FROM oban_jobs + WHERE state = 'completed' + """) + + case delay do + nil -> DateTime.new!(~D[1970-01-01], ~T[00:00:00.000], "Etc/UTC") + %NaiveDateTime{} = nt -> DateTime.from_naive!(nt, "Etc/UTC") + end + end +end diff --git a/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs new file mode 100644 index 0000000000..ce6da59f9d --- /dev/null +++ b/apps/transport/test/transport_web/plugs/worker_healthcheck_test.exs @@ -0,0 +1,129 @@ +defmodule TransportWeb.Plugs.WorkerHealthcheckTest do + # async: false is required because we use real in-memory caching in these tests, + # and we swap application config (shared state) + use TransportWeb.ConnCase, async: false + alias TransportWeb.Plugs.WorkerHealthcheck + + @cache_name Transport.Cache.Cachex.cache_name() + @cache_key WorkerHealthcheck.app_start_datetime_cache_key_name() + + setup do + # Use a real in-memory cache for these tests to test the caching mecanism + old_value = Application.fetch_env!(:transport, :cache_impl) + Application.put_env(:transport, :cache_impl, Transport.Cache.Cachex) + + on_exit(fn -> + Application.put_env(:transport, :cache_impl, old_value) + Cachex.reset(@cache_name) + end) + + Ecto.Adapters.SQL.Sandbox.checkout(DB.Repo) + end + + describe "healthy_state?" do + test "app was started recently, no Oban jobs" do + assert WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + assert WorkerHealthcheck.healthy_state?() + end + + test "app was not started recently, Oban jobs have not been attempted recently" do + datetime = DateTime.add(DateTime.utc_now(), -30, :minute) + Cachex.put(@cache_name, @cache_key, datetime) + + refute WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + refute WorkerHealthcheck.healthy_state?() + end + + test "app was not started recently, Oban jobs have been attempted recently" do + datetime = DateTime.add(DateTime.utc_now(), -30, :minute) + Cachex.put(@cache_name, @cache_key, datetime) + + # A completed job was attempted 55 minutes ago + Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1}) + |> Oban.insert!() + |> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -55, :minute), state: "completed") + |> DB.Repo.update!() + + refute WorkerHealthcheck.app_started_recently?() + assert WorkerHealthcheck.oban_attempted_jobs_recently?() + assert WorkerHealthcheck.healthy_state?() + end + end + + describe "app_started_recently?" do + test "value is set when executed for the first time" do + assert {:ok, false} == Cachex.exists?(@cache_name, @cache_key) + # Calling for the first time creates the key + assert WorkerHealthcheck.app_started_recently?() + assert {:ok, true} == Cachex.exists?(@cache_name, @cache_key) + + # Calling again does not refresh the initial value + start_datetime = WorkerHealthcheck.app_start_datetime() + WorkerHealthcheck.app_started_recently?() + assert start_datetime == WorkerHealthcheck.app_start_datetime() + + # Key does not expire + assert {:ok, nil} == Cachex.ttl(@cache_name, @cache_key) + end + + test "acceptable delay is 20 minutes" do + # Just right + datetime = DateTime.add(DateTime.utc_now(), -19, :minute) + Cachex.put(@cache_name, @cache_key, datetime) + + assert WorkerHealthcheck.app_started_recently?() + + # Too long ago + datetime = DateTime.add(DateTime.utc_now(), -21, :minute) + Cachex.put(@cache_name, @cache_key, datetime) + refute WorkerHealthcheck.app_started_recently?() + end + end + + describe "oban_attempted_jobs_recently?" do + test "job attempted recently" do + # Attempted less than 60 minutes ago + Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1}) + |> Oban.insert!() + |> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -59, :minute), state: "completed") + |> DB.Repo.update!() + + assert WorkerHealthcheck.oban_attempted_jobs_recently?() + end + + test "job attempted too long ago" do + # Attempted more than 60 minutes ago + Transport.Jobs.ResourceUnavailableJob.new(%{resource_id: 1}) + |> Oban.insert!() + |> Ecto.Changeset.change(attempted_at: DateTime.add(DateTime.utc_now(), -61, :minute), state: "completed") + |> DB.Repo.update!() + + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + end + end + + describe "call" do + test "healthy system", %{conn: conn} do + assert WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + assert WorkerHealthcheck.healthy_state?() + + assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(200) + end + + test "unhealthy system", %{conn: conn} do + datetime = DateTime.add(DateTime.utc_now(), -30, :minute) + Cachex.put(@cache_name, @cache_key, datetime) + + refute WorkerHealthcheck.app_started_recently?() + refute WorkerHealthcheck.oban_attempted_jobs_recently?() + refute WorkerHealthcheck.healthy_state?() + + assert conn |> WorkerHealthcheck.call(if: {__MODULE__, :plug_enabled?}) |> text_response(503) + end + end + + def plug_enabled?, do: true +end