Skip to content

Commit

Permalink
Merge pull request #466 from akira/signal
Browse files Browse the repository at this point in the history
Add api to send signal to worker nodes
  • Loading branch information
ananthakumaran authored Mar 13, 2022
2 parents 0e6bc69 + 3cf9dda commit f80d8c9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 0 deletions.
18 changes: 18 additions & 0 deletions lib/exq/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -541,4 +541,22 @@ defmodule Exq.Api do
def retry_job(pid, jid) do
GenServer.call(pid, {:retry_job, jid})
end

@doc """
Send signal to the given node.
Expected args:
* `pid` - Exq.Api process
* `node_id` - node identifier
* `signal_name` - Name of the signal.
Supported Signals
* TSTP - unsubscibe from all queues
Returns:
* :ok
"""
def send_signal(pid, node_id, signal_name) do
GenServer.call(pid, {:send_signal, node_id, signal_name})
end
end
5 changes: 5 additions & 0 deletions lib/exq/api/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ defmodule Exq.Api.Server do
{:reply, :ok, state}
end

def handle_call({:send_signal, node_id, signal_name}, _from, state) do
result = JobStat.node_signal(state.redis, state.namespace, node_id, signal_name)
{:reply, result, state}
end

def terminate(_reason, _state) do
:ok
end
Expand Down
15 changes: 15 additions & 0 deletions lib/exq/redis/job_stat.ex
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,21 @@ defmodule Exq.Redis.JobStat do
end
end

def node_signal(redis, namespace, node_id, signal_name) do
key = node_info_key(namespace, node_id)
signal_key = "#{key}-signals"

case Connection.qp(redis, [
["MULTI"],
["LPUSH", signal_key, signal_name],
["EXPIRE", signal_key, 60],
["EXEC"]
]) do
{:ok, ["OK", "QUEUED", "QUEUED", [_, 1]]} -> :ok
error -> error
end
end

def node_ids(redis, namespace) do
Connection.smembers!(redis, nodes_key(namespace))
end
Expand Down
6 changes: 6 additions & 0 deletions test/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ defmodule ApiTest do
assert %Process{pid: ^my_pid_str} = processes
end

test "send signal" do
assert nil == JobStat.node_ping(:testredis, "test", %Node{identity: "host1", busy: 1})
assert :ok = Exq.Api.send_signal(Exq.Api, "host1", "TSTP")
assert "TSTP" == JobStat.node_ping(:testredis, "test", %Node{identity: "host1", busy: 1})
end

test "jobs when empty" do
assert {:ok, []} = Exq.Api.jobs(Exq.Api)
end
Expand Down

0 comments on commit f80d8c9

Please sign in to comment.