Skip to content

Commit

Permalink
Add metrics support
Browse files Browse the repository at this point in the history
- GetMetricsAsync()
- GetMetricsAsync(string)
  • Loading branch information
brianpursley committed Dec 18, 2024
1 parent 56545c4 commit 5f16e4f
Show file tree
Hide file tree
Showing 8 changed files with 338 additions and 12 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ jobs:
- name: Test (pgmq 1.0.0)
run: Npgmq.Test/scripts/run-tests.sh 1.0.0

- name: Test (pgmq 0.33.1)
run: Npgmq.Test/scripts/run-tests.sh 0.33.1

- name: Test (pgmq 0.31.0)
run: Npgmq.Test/scripts/run-tests.sh 0.31.0

Expand Down
6 changes: 3 additions & 3 deletions Npgmq.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}
Expand Down Expand Up @@ -59,13 +59,13 @@
var msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
msg = await npgmq.ReadAsync<MyMessageType>("example_queue");
if (msg != null)
{
Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}");
await npgmq.ArchiveAsync("example_queue", msg.MsgId);
}
}
Expand Down
150 changes: 150 additions & 0 deletions Npgmq.Test/NpgmqClientTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ public async Task PollAsync_should_wait_for_message_and_return_it()
// Assert
Assert.NotNull(msg);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.Equal(1, msg.ReadCt);
msg.Message.ShouldDeepEqual(new TestMessage
{
Expand Down Expand Up @@ -387,6 +388,7 @@ public async Task PopAsync_should_read_and_delete_message()
Assert.NotNull(msg);
Assert.Equal(msgId, msg.MsgId);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.Equal(0, msg.ReadCt);
msg.Message.ShouldDeepEqual(new TestMessage
{
Expand Down Expand Up @@ -477,7 +479,9 @@ public async Task ReadAsync_should_read_message()
Assert.NotNull(msg);
Assert.Equal(msgId, msg.MsgId);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.True(msg.Vt > DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.Vt.Offset);
Assert.Equal(1, msg.ReadCt);
msg.Message.ShouldDeepEqual(new TestMessage
{
Expand Down Expand Up @@ -507,7 +511,9 @@ public async Task ReadAsync_should_read_string_message()
Assert.NotNull(msg);
Assert.Equal(msgId, msg.MsgId);
Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset);
Assert.True(msg.Vt > DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, msg.Vt.Offset);
Assert.Equal(1, msg.ReadCt);
msg.Message.ShouldDeepEqual("{\"Bar\": \"Test\", \"Baz\": \"2023-09-01T01:23:45-04:00\", \"Foo\": 123}");
}
Expand Down Expand Up @@ -797,4 +803,148 @@ public async Task SetVtAsync_should_change_vt_for_a_message()
Assert.NotNull(message2);
Assert.Equal(msgId, message2.MsgId);
}

[Fact]
public async Task GetMetricsAsync_should_return_metrics_for_a_single_queue()
{
// Arrange
await ResetTestQueueAsync();

var metrics1 = await _sut.GetMetricsAsync(TestQueueName);
await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 1 });
await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 2 });
var msgId3 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 3 });
var msgId4 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 4 });
await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 5 });
await _sut.DeleteAsync(TestQueueName, msgId3);
await _sut.ArchiveAsync(TestQueueName, msgId4);

// Act
var metrics2 = await _sut.GetMetricsAsync(TestQueueName);
await _sut.ReadAsync<string>(TestQueueName);
var metrics3 = await _sut.GetMetricsAsync(TestQueueName);
await _sut.PurgeQueueAsync(TestQueueName);
var metrics4 = await _sut.GetMetricsAsync(TestQueueName);

// Assert
Assert.Equal(TestQueueName, metrics1.QueueName);
Assert.Equal(0, metrics1.QueueLength);
Assert.Null(metrics1.NewestMessageAge);
Assert.Null(metrics1.OldestMessageAge);
Assert.Equal(0, metrics1.TotalMessages);
Assert.True(metrics1.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);
if (await IsMinPgmqVersion("1.5.0"))
{
Assert.Equal(0, metrics1.QueueVisibleLength);
}
else
{
// The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older.
Assert.Equal(-1, metrics1.QueueVisibleLength);
}

Assert.Equal(TestQueueName, metrics2.QueueName);
Assert.Equal(3, metrics2.QueueLength);
Assert.True(metrics2.NewestMessageAge >= 0);
Assert.True(metrics2.OldestMessageAge >= 0);
if (await IsMinPgmqVersion("0.33.1"))
{
// There was a bug in PGMQ prior to 0.33.1 that caused the total messages to be incorrect,
// so we only want to check the result if the version is 0.33.1 or later.
Assert.Equal(5, metrics2.TotalMessages);
}
Assert.True(metrics2.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);
if (await IsMinPgmqVersion("1.5.0"))
{
Assert.Equal(3, metrics2.QueueVisibleLength);
}
else
{
// The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older.
Assert.Equal(-1, metrics2.QueueVisibleLength);
}

Assert.Equal(TestQueueName, metrics3.QueueName);
Assert.Equal(3, metrics3.QueueLength);
Assert.True(metrics3.NewestMessageAge >= 0);
Assert.True(metrics3.OldestMessageAge >= 0);
if (await IsMinPgmqVersion("0.33.1"))
{
// There was a bug in PGMQ prior to 0.33.1 that caused the total messages to be incorrect,
// so we only want to check the result if the version is 0.33.1 or later.
Assert.Equal(5, metrics2.TotalMessages);
}
Assert.True(metrics3.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);
if (await IsMinPgmqVersion("1.5.0"))
{
Assert.Equal(2, metrics3.QueueVisibleLength);
}
else
{
// The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older.
Assert.Equal(-1, metrics3.QueueVisibleLength);
}

Assert.Equal(TestQueueName, metrics4.QueueName);
Assert.Equal(0, metrics4.QueueLength);
Assert.Null(metrics1.NewestMessageAge);
Assert.Null(metrics1.OldestMessageAge);
if (await IsMinPgmqVersion("0.33.1"))
{
// There was a bug in PGMQ prior to 0.33.1 that caused the total messages to be incorrect,
// so we only want to check the result if the version is 0.33.1 or later.
Assert.Equal(5, metrics2.TotalMessages);
}
Assert.True(metrics4.ScrapeTime < DateTimeOffset.UtcNow);
Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset);
if (await IsMinPgmqVersion("1.5.0"))
{
Assert.Equal(0, metrics4.QueueVisibleLength);
}
else
{
// The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older.
Assert.Equal(-1, metrics4.QueueVisibleLength);
}
}

[Fact]
public async Task GetMetricsAsync_should_return_metrics_for_all_queues()
{
// Create some queues just for testing this function.
var testMetricsQueueName1 = TestQueueName + "_m1";
var testMetricsQueueName2 = TestQueueName + "_m2";
var testMetricsQueueName3 = TestQueueName + "_m3";
try
{
// Arrange
await ResetTestQueueAsync();
await _sut.CreateQueueAsync(testMetricsQueueName1);
await _sut.CreateQueueAsync(testMetricsQueueName2);
await _sut.CreateQueueAsync(testMetricsQueueName3);

await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 1 });
await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 2 });
await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 3 });
await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 4 });
await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 5 });

// Act
var allMetrics = await _sut.GetMetricsAsync();

// Assert
Assert.Equal(3, allMetrics.Single(x => x.QueueName == testMetricsQueueName1).QueueLength);
Assert.Equal(2, allMetrics.Single(x => x.QueueName == testMetricsQueueName2).QueueLength);
Assert.Equal(0, allMetrics.Single(x => x.QueueName == testMetricsQueueName3).QueueLength);
}
finally
{
try { await _sut.DropQueueAsync(testMetricsQueueName1); } catch { /* ignored */ }
try { await _sut.DropQueueAsync(testMetricsQueueName2); } catch { /* ignored */ }
try { await _sut.DropQueueAsync(testMetricsQueueName3); } catch { /* ignored */ }
}
}
}
24 changes: 24 additions & 0 deletions Npgmq/DbDataReaderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System.Data.Common;

namespace Npgmq;

internal static class DbDataReaderExtensions
{
/// <summary>
/// Get the ordinal of a column by name, or -1 if the column does not exist.
/// </summary>
/// <param name="reader">The reader.</param>
/// <param name="columnName">The column name.</param>
/// <returns>The ordinal position of the column, or -1 if it does not exist in the reader.</returns>
public static int TryGetOrdinal(this DbDataReader reader, string columnName)
{
try
{
return reader.GetOrdinal(columnName);
}
catch (IndexOutOfRangeException)
{
return -1;
}
}
}
13 changes: 13 additions & 0 deletions Npgmq/INpgmqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,17 @@ public interface INpgmqClient
/// <param name="msgId">The message ID.</param>
/// <param name="vtOffset">The number of seconds to be added to the current Vt.</param>
Task SetVtAsync(string queueName, long msgId, int vtOffset);

/// <summary>
/// Get metrics for all queues.
/// </summary>
/// <returns>A list of <see cref="NpgmqMetricsResult" /></returns>
Task<List<NpgmqMetricsResult>> GetMetricsAsync();

/// <summary>
/// Get metrics for a specific queue.
/// </summary>
/// <param name="queueName">The queue name.</param>
/// <returns>An <see cref="NpgmqMetricsResult" /></returns>
Task<NpgmqMetricsResult> GetMetricsAsync(string queueName);
}
91 changes: 86 additions & 5 deletions Npgmq/NpgmqClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -468,18 +468,99 @@ public async Task SetVtAsync(string queueName, long msgId, int vtOffset)
}
}

public async Task<List<NpgmqMetricsResult>> GetMetricsAsync()
{
try
{
var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.metrics_all();").ConfigureAwait(false);
await using (cmd.ConfigureAwait(false))
{
var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false);
await using (reader.ConfigureAwait(false))
{
return await ReadMetricsAsync(reader).ConfigureAwait(false);
}
}
}
catch (Exception ex)
{
throw new NpgmqException("Failed to get metrics.", ex);
}
}

public async Task<NpgmqMetricsResult> GetMetricsAsync(string queueName)
{
try
{
var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.metrics(@queue_name);").ConfigureAwait(false);
await using (cmd.ConfigureAwait(false))
{
cmd.Parameters.AddWithValue("@queue_name", queueName);
var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false);
await using (reader.ConfigureAwait(false))
{
var results = await ReadMetricsAsync(reader).ConfigureAwait(false);
return results.Count switch
{
1 => results.Single(),
0 => throw new NpgmqException($"Failed to get metrics for queue {queueName}. No data returned."),
_ => throw new NpgmqException($"Failed to get metrics for queue {queueName}. Multiple results returned.")
};
}
}
}
catch (Exception ex)
{
throw new NpgmqException($"Failed to get metrics for queue {queueName}.", ex);
}
}

private static async Task<List<NpgmqMetricsResult>> ReadMetricsAsync(DbDataReader reader)
{
var queueNameOrdinal = reader.GetOrdinal("queue_name");
var queueLengthOrdinal = reader.GetOrdinal("queue_length");
var newestMsgAgeSecOrdinal = reader.GetOrdinal("newest_msg_age_sec");
var oldestMsgAgeSecOrdinal = reader.GetOrdinal("oldest_msg_age_sec");
var totalMessagesOrdinal = reader.GetOrdinal("total_messages");
var scrapeTimeOrdinal = reader.GetOrdinal("scrape_time");
var queueVisibleLengthOrdinal = reader.TryGetOrdinal("queue_visible_length");

var results = new List<NpgmqMetricsResult>();
while (await reader.ReadAsync().ConfigureAwait(false))
{
results.Add(new NpgmqMetricsResult
{
QueueName = reader.GetString(queueNameOrdinal),
QueueLength = reader.GetInt64(queueLengthOrdinal),
NewestMessageAge = await reader.IsDBNullAsync(newestMsgAgeSecOrdinal) ? null : reader.GetInt32(newestMsgAgeSecOrdinal),
OldestMessageAge = await reader.IsDBNullAsync(oldestMsgAgeSecOrdinal) ? null : reader.GetInt32(oldestMsgAgeSecOrdinal),
TotalMessages = reader.GetInt64(totalMessagesOrdinal),
ScrapeTime = reader.GetDateTime(scrapeTimeOrdinal),
// visible_queue_length column was added in PGMQ 1.5.0, so we need to handle if it doesn't exist by setting -1 as the default.
QueueVisibleLength = queueVisibleLengthOrdinal >= 0 ? reader.GetInt64(queueVisibleLengthOrdinal) : -1
});
}
return results;
}

private static async Task<List<NpgmqMessage<T>>> ReadMessagesAsync<T>(DbDataReader reader) where T : class
{
var msgIdOrdinal = reader.GetOrdinal("msg_id");
var readCtOrdinal = reader.GetOrdinal("read_ct");
var enqueuedAtOrdinal = reader.GetOrdinal("enqueued_at");
var vtOrdinal = reader.GetOrdinal("vt");
var messageOrdinal = reader.GetOrdinal("message");

var result = new List<NpgmqMessage<T>>();
while (await reader.ReadAsync().ConfigureAwait(false))
{
result.Add(new NpgmqMessage<T>
{
MsgId = reader.GetInt64(0),
ReadCt = reader.GetInt32(1),
EnqueuedAt = reader.GetDateTime(2),
Vt = reader.GetDateTime(3),
Message = DeserializeMessage<T>(reader.GetString(4))
MsgId = reader.GetInt64(msgIdOrdinal),
ReadCt = reader.GetInt32(readCtOrdinal),
EnqueuedAt = reader.GetDateTime(enqueuedAtOrdinal),
Vt = reader.GetDateTime(vtOrdinal),
Message = DeserializeMessage<T>(reader.GetString(messageOrdinal))
});
}
return result;
Expand Down
Loading

0 comments on commit 5f16e4f

Please sign in to comment.