diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6b9a82b..710eeb6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -60,6 +60,9 @@ jobs: - name: Test (pgmq 1.0.0) run: Npgmq.Test/scripts/run-tests.sh 1.0.0 + - name: Test (pgmq 0.33.1) + run: Npgmq.Test/scripts/run-tests.sh 0.33.1 + - name: Test (pgmq 0.31.0) run: Npgmq.Test/scripts/run-tests.sh 0.31.0 diff --git a/Npgmq.Example/Program.cs b/Npgmq.Example/Program.cs index d0166fe..49c31d4 100644 --- a/Npgmq.Example/Program.cs +++ b/Npgmq.Example/Program.cs @@ -27,7 +27,7 @@ var msg = await npgmq.ReadAsync("example_queue"); if (msg != null) { - Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); + Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); await npgmq.ArchiveAsync("example_queue", msg.MsgId); } } @@ -59,13 +59,13 @@ var msg = await npgmq.ReadAsync("example_queue"); if (msg != null) { - Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); + Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); await npgmq.ArchiveAsync("example_queue", msg.MsgId); } msg = await npgmq.ReadAsync("example_queue"); if (msg != null) { - Console.WriteLine($"Read message with id {msg.MsgId}: Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); + Console.WriteLine($"Read message with id {msg.MsgId} (EnqueuedAt = {msg.EnqueuedAt}, Vt = {msg.Vt}): Foo = {msg.Message?.Foo}, Bar = {msg.Message?.Bar}"); await npgmq.ArchiveAsync("example_queue", msg.MsgId); } } diff --git a/Npgmq.Test/NpgmqClientTest.cs b/Npgmq.Test/NpgmqClientTest.cs index ca3baf9..8e84aa0 100644 --- a/Npgmq.Test/NpgmqClientTest.cs +++ b/Npgmq.Test/NpgmqClientTest.cs @@ -277,6 +277,7 @@ public async Task PollAsync_should_wait_for_message_and_return_it() // Assert Assert.NotNull(msg); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.Equal(1, msg.ReadCt); msg.Message.ShouldDeepEqual(new TestMessage { @@ -387,6 +388,7 @@ public async Task PopAsync_should_read_and_delete_message() Assert.NotNull(msg); Assert.Equal(msgId, msg.MsgId); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.Equal(0, msg.ReadCt); msg.Message.ShouldDeepEqual(new TestMessage { @@ -477,7 +479,9 @@ public async Task ReadAsync_should_read_message() Assert.NotNull(msg); Assert.Equal(msgId, msg.MsgId); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.True(msg.Vt > DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.Vt.Offset); Assert.Equal(1, msg.ReadCt); msg.Message.ShouldDeepEqual(new TestMessage { @@ -507,7 +511,9 @@ public async Task ReadAsync_should_read_string_message() Assert.NotNull(msg); Assert.Equal(msgId, msg.MsgId); Assert.True(msg.EnqueuedAt < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.EnqueuedAt.Offset); Assert.True(msg.Vt > DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, msg.Vt.Offset); Assert.Equal(1, msg.ReadCt); msg.Message.ShouldDeepEqual("{\"Bar\": \"Test\", \"Baz\": \"2023-09-01T01:23:45-04:00\", \"Foo\": 123}"); } @@ -797,4 +803,148 @@ public async Task SetVtAsync_should_change_vt_for_a_message() Assert.NotNull(message2); Assert.Equal(msgId, message2.MsgId); } + + [Fact] + public async Task GetMetricsAsync_should_return_metrics_for_a_single_queue() + { + // Arrange + await ResetTestQueueAsync(); + + var metrics1 = await _sut.GetMetricsAsync(TestQueueName); + await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 1 }); + await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 2 }); + var msgId3 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 3 }); + var msgId4 = await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 4 }); + await _sut.SendAsync(TestQueueName, new TestMessage { Foo = 5 }); + await _sut.DeleteAsync(TestQueueName, msgId3); + await _sut.ArchiveAsync(TestQueueName, msgId4); + + // Act + var metrics2 = await _sut.GetMetricsAsync(TestQueueName); + await _sut.ReadAsync(TestQueueName); + var metrics3 = await _sut.GetMetricsAsync(TestQueueName); + await _sut.PurgeQueueAsync(TestQueueName); + var metrics4 = await _sut.GetMetricsAsync(TestQueueName); + + // Assert + Assert.Equal(TestQueueName, metrics1.QueueName); + Assert.Equal(0, metrics1.QueueLength); + Assert.Null(metrics1.NewestMessageAge); + Assert.Null(metrics1.OldestMessageAge); + Assert.Equal(0, metrics1.TotalMessages); + Assert.True(metrics1.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + if (await IsMinPgmqVersion("1.5.0")) + { + Assert.Equal(0, metrics1.QueueVisibleLength); + } + else + { + // The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older. + Assert.Equal(-1, metrics1.QueueVisibleLength); + } + + Assert.Equal(TestQueueName, metrics2.QueueName); + Assert.Equal(3, metrics2.QueueLength); + Assert.True(metrics2.NewestMessageAge >= 0); + Assert.True(metrics2.OldestMessageAge >= 0); + if (await IsMinPgmqVersion("0.33.1")) + { + // There was a bug in PGMQ prior to 0.33.1 that caused the total messages to be incorrect, + // so we only want to check the result if the version is 0.33.1 or later. + Assert.Equal(5, metrics2.TotalMessages); + } + Assert.True(metrics2.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + if (await IsMinPgmqVersion("1.5.0")) + { + Assert.Equal(3, metrics2.QueueVisibleLength); + } + else + { + // The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older. + Assert.Equal(-1, metrics2.QueueVisibleLength); + } + + Assert.Equal(TestQueueName, metrics3.QueueName); + Assert.Equal(3, metrics3.QueueLength); + Assert.True(metrics3.NewestMessageAge >= 0); + Assert.True(metrics3.OldestMessageAge >= 0); + if (await IsMinPgmqVersion("0.33.1")) + { + // There was a bug in PGMQ prior to 0.33.1 that caused the total messages to be incorrect, + // so we only want to check the result if the version is 0.33.1 or later. + Assert.Equal(5, metrics2.TotalMessages); + } + Assert.True(metrics3.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + if (await IsMinPgmqVersion("1.5.0")) + { + Assert.Equal(2, metrics3.QueueVisibleLength); + } + else + { + // The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older. + Assert.Equal(-1, metrics3.QueueVisibleLength); + } + + Assert.Equal(TestQueueName, metrics4.QueueName); + Assert.Equal(0, metrics4.QueueLength); + Assert.Null(metrics1.NewestMessageAge); + Assert.Null(metrics1.OldestMessageAge); + if (await IsMinPgmqVersion("0.33.1")) + { + // There was a bug in PGMQ prior to 0.33.1 that caused the total messages to be incorrect, + // so we only want to check the result if the version is 0.33.1 or later. + Assert.Equal(5, metrics2.TotalMessages); + } + Assert.True(metrics4.ScrapeTime < DateTimeOffset.UtcNow); + Assert.Equal(TimeSpan.Zero, metrics1.ScrapeTime.Offset); + if (await IsMinPgmqVersion("1.5.0")) + { + Assert.Equal(0, metrics4.QueueVisibleLength); + } + else + { + // The QueueVisibleLength metric was added in PGMQ 1.5.0, so it will be -1 if the version is older. + Assert.Equal(-1, metrics4.QueueVisibleLength); + } + } + + [Fact] + public async Task GetMetricsAsync_should_return_metrics_for_all_queues() + { + // Create some queues just for testing this function. + var testMetricsQueueName1 = TestQueueName + "_m1"; + var testMetricsQueueName2 = TestQueueName + "_m2"; + var testMetricsQueueName3 = TestQueueName + "_m3"; + try + { + // Arrange + await ResetTestQueueAsync(); + await _sut.CreateQueueAsync(testMetricsQueueName1); + await _sut.CreateQueueAsync(testMetricsQueueName2); + await _sut.CreateQueueAsync(testMetricsQueueName3); + + await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 1 }); + await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 2 }); + await _sut.SendAsync(testMetricsQueueName1, new TestMessage { Foo = 3 }); + await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 4 }); + await _sut.SendAsync(testMetricsQueueName2, new TestMessage { Foo = 5 }); + + // Act + var allMetrics = await _sut.GetMetricsAsync(); + + // Assert + Assert.Equal(3, allMetrics.Single(x => x.QueueName == testMetricsQueueName1).QueueLength); + Assert.Equal(2, allMetrics.Single(x => x.QueueName == testMetricsQueueName2).QueueLength); + Assert.Equal(0, allMetrics.Single(x => x.QueueName == testMetricsQueueName3).QueueLength); + } + finally + { + try { await _sut.DropQueueAsync(testMetricsQueueName1); } catch { /* ignored */ } + try { await _sut.DropQueueAsync(testMetricsQueueName2); } catch { /* ignored */ } + try { await _sut.DropQueueAsync(testMetricsQueueName3); } catch { /* ignored */ } + } + } } diff --git a/Npgmq/DbDataReaderExtensions.cs b/Npgmq/DbDataReaderExtensions.cs new file mode 100644 index 0000000..b73ab66 --- /dev/null +++ b/Npgmq/DbDataReaderExtensions.cs @@ -0,0 +1,24 @@ +using System.Data.Common; + +namespace Npgmq; + +internal static class DbDataReaderExtensions +{ + /// + /// Get the ordinal of a column by name, or -1 if the column does not exist. + /// + /// The reader. + /// The column name. + /// The ordinal position of the column, or -1 if it does not exist in the reader. + public static int TryGetOrdinal(this DbDataReader reader, string columnName) + { + try + { + return reader.GetOrdinal(columnName); + } + catch (IndexOutOfRangeException) + { + return -1; + } + } +} \ No newline at end of file diff --git a/Npgmq/INpgmqClient.cs b/Npgmq/INpgmqClient.cs index a00d5b0..d37a646 100644 --- a/Npgmq/INpgmqClient.cs +++ b/Npgmq/INpgmqClient.cs @@ -206,4 +206,17 @@ public interface INpgmqClient /// The message ID. /// The number of seconds to be added to the current Vt. Task SetVtAsync(string queueName, long msgId, int vtOffset); + + /// + /// Get metrics for all queues. + /// + /// A list of + Task> GetMetricsAsync(); + + /// + /// Get metrics for a specific queue. + /// + /// The queue name. + /// An + Task GetMetricsAsync(string queueName); } \ No newline at end of file diff --git a/Npgmq/NpgmqClient.cs b/Npgmq/NpgmqClient.cs index e06f526..210501f 100644 --- a/Npgmq/NpgmqClient.cs +++ b/Npgmq/NpgmqClient.cs @@ -468,18 +468,99 @@ public async Task SetVtAsync(string queueName, long msgId, int vtOffset) } } + public async Task> GetMetricsAsync() + { + try + { + var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.metrics_all();").ConfigureAwait(false); + await using (cmd.ConfigureAwait(false)) + { + var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false); + await using (reader.ConfigureAwait(false)) + { + return await ReadMetricsAsync(reader).ConfigureAwait(false); + } + } + } + catch (Exception ex) + { + throw new NpgmqException("Failed to get metrics.", ex); + } + } + + public async Task GetMetricsAsync(string queueName) + { + try + { + var cmd = await _commandFactory.CreateAsync("SELECT * FROM pgmq.metrics(@queue_name);").ConfigureAwait(false); + await using (cmd.ConfigureAwait(false)) + { + cmd.Parameters.AddWithValue("@queue_name", queueName); + var reader = await cmd.ExecuteReaderAsync().ConfigureAwait(false); + await using (reader.ConfigureAwait(false)) + { + var results = await ReadMetricsAsync(reader).ConfigureAwait(false); + return results.Count switch + { + 1 => results.Single(), + 0 => throw new NpgmqException($"Failed to get metrics for queue {queueName}. No data returned."), + _ => throw new NpgmqException($"Failed to get metrics for queue {queueName}. Multiple results returned.") + }; + } + } + } + catch (Exception ex) + { + throw new NpgmqException($"Failed to get metrics for queue {queueName}.", ex); + } + } + + private static async Task> ReadMetricsAsync(DbDataReader reader) + { + var queueNameOrdinal = reader.GetOrdinal("queue_name"); + var queueLengthOrdinal = reader.GetOrdinal("queue_length"); + var newestMsgAgeSecOrdinal = reader.GetOrdinal("newest_msg_age_sec"); + var oldestMsgAgeSecOrdinal = reader.GetOrdinal("oldest_msg_age_sec"); + var totalMessagesOrdinal = reader.GetOrdinal("total_messages"); + var scrapeTimeOrdinal = reader.GetOrdinal("scrape_time"); + var queueVisibleLengthOrdinal = reader.TryGetOrdinal("queue_visible_length"); + + var results = new List(); + while (await reader.ReadAsync().ConfigureAwait(false)) + { + results.Add(new NpgmqMetricsResult + { + QueueName = reader.GetString(queueNameOrdinal), + QueueLength = reader.GetInt64(queueLengthOrdinal), + NewestMessageAge = await reader.IsDBNullAsync(newestMsgAgeSecOrdinal) ? null : reader.GetInt32(newestMsgAgeSecOrdinal), + OldestMessageAge = await reader.IsDBNullAsync(oldestMsgAgeSecOrdinal) ? null : reader.GetInt32(oldestMsgAgeSecOrdinal), + TotalMessages = reader.GetInt64(totalMessagesOrdinal), + ScrapeTime = reader.GetDateTime(scrapeTimeOrdinal), + // visible_queue_length column was added in PGMQ 1.5.0, so we need to handle if it doesn't exist by setting -1 as the default. + QueueVisibleLength = queueVisibleLengthOrdinal >= 0 ? reader.GetInt64(queueVisibleLengthOrdinal) : -1 + }); + } + return results; + } + private static async Task>> ReadMessagesAsync(DbDataReader reader) where T : class { + var msgIdOrdinal = reader.GetOrdinal("msg_id"); + var readCtOrdinal = reader.GetOrdinal("read_ct"); + var enqueuedAtOrdinal = reader.GetOrdinal("enqueued_at"); + var vtOrdinal = reader.GetOrdinal("vt"); + var messageOrdinal = reader.GetOrdinal("message"); + var result = new List>(); while (await reader.ReadAsync().ConfigureAwait(false)) { result.Add(new NpgmqMessage { - MsgId = reader.GetInt64(0), - ReadCt = reader.GetInt32(1), - EnqueuedAt = reader.GetDateTime(2), - Vt = reader.GetDateTime(3), - Message = DeserializeMessage(reader.GetString(4)) + MsgId = reader.GetInt64(msgIdOrdinal), + ReadCt = reader.GetInt32(readCtOrdinal), + EnqueuedAt = reader.GetDateTime(enqueuedAtOrdinal), + Vt = reader.GetDateTime(vtOrdinal), + Message = DeserializeMessage(reader.GetString(messageOrdinal)) }); } return result; diff --git a/Npgmq/NpgmqMetricsResult.cs b/Npgmq/NpgmqMetricsResult.cs new file mode 100644 index 0000000..efa89b7 --- /dev/null +++ b/Npgmq/NpgmqMetricsResult.cs @@ -0,0 +1,46 @@ +namespace Npgmq; + +/// +/// Metrics data for a queue. +/// +public class NpgmqMetricsResult +{ + /// + /// Name of the queue. + /// + public string QueueName { get; set; } = null!; + + /// + /// Number of messages in the queue. + /// + public long QueueLength { get; set; } + + /// + /// Age, in seconds, of the newest message in the queue. + /// + public int? NewestMessageAge { get; set; } + + /// + /// Age, in seconds, of the oldest message in the queue. + /// + public int? OldestMessageAge { get; set; } + + /// + /// Total number of messages that have been in the queue. + /// + public long TotalMessages { get; set; } + + /// + /// When the metric was scraped. + /// + public DateTimeOffset ScrapeTime { get; set; } + + /// + /// Number of visible messages in the queue. + /// + /// + /// This field was added in PGMQ 1.5.0. + /// When using an earlier version of PGMQ, this field will be set to -1. + /// + public long QueueVisibleLength { get; set; } +} \ No newline at end of file diff --git a/README.md b/README.md index 7b53b90..283f251 100644 --- a/README.md +++ b/README.md @@ -8,10 +8,19 @@ A .NET client for [Postgres Message Queue](https://github.com/tembo-io/pgmq) (PG ## Compatibility -| PGMQ Version | Compatibility | -|----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 1.5.0+ | Fully supported | -| 0.31.0 - 1.4.5 | The following method overloads are *not* supported, since they were introduced in PGMQ 1.5.0:
- `SendAsync(string, T, DateTimeOffset)`
- `SendBatchAsync(string, IEnumerable, DateTimeOffset)` | +Npgmq is compatible with PGMQ version 1.5.0 and later. + +### Backward Compatibility + +An attempt is made to maintain compatibility with previous versions of PGMQ, but there may be limitations when using Npgmq with older versions of PGMQ. + +Known limitations when using Npgmq with older versions of PGMQ: +* The `SendAsync(string, T, DateTimeOffset)` and `SendBatchAsync(string, IEnumerable, DateTimeOffset)` method overloads will throw an exception when used with PGMQ < 1.5.0. +* `NpgmqMetricResult.VisibleQueueLength` will always be -1 when used with PGMQ < 1.5.0. +* `GetMetricsAsync()` and `GetMetricsAsync(string)` require PGMQ 0.33.1 or later. +* PGMQ < 0.31.0 is not supported. + +See the [build workflow](.github/workflows/build.yml) for the versions of PGMQ that are tested. ## Installation To install the package via [Nuget](https://www.nuget.org/packages/Npgmq/), run the following command: