Skip to content

Commit

Permalink
Merge pull request #80 from rabbitmq/add_lz4
Browse files Browse the repository at this point in the history
Implement lz4 codec example
  • Loading branch information
lukebakken authored Mar 3, 2022
2 parents a702c19 + 604870b commit 8dd8a23
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 1 deletion.
11 changes: 11 additions & 0 deletions Examples/CompressCodecs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Sub Entry compress codecs
---

By default the client implements only `None` and `Gzip` see [here](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client#sub-entries-batching) for more details.


You need to implement `ICompressionCodec` interface to create a new codec.
Here you can find all the missing implementations:
- [lz4](./lz4)
- Snappy not implemented yet
- Zstd not implemented yet
19 changes: 19 additions & 0 deletions Examples/CompressCodecs/lz4/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Lz4 Compress codec
---

You need to add https://github.com/MiloszKrajewski/K4os.Compression.LZ4 as dependency


How to use:

- Register the codec
```csharp
StreamCompressionCodecs.RegisterCodec<StreamLz4Codec>(CompressionType.Lz4);
```

- Send messages:
```csharp
var messages = new List<Message>();
...
await producerLog.Send(1, messages, CompressionType.Lz4);
```
64 changes: 64 additions & 0 deletions Examples/CompressCodecs/lz4/StreamLz4Codec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
public class StreamLz4Codec : ICompressionCodec
{
private ReadOnlySequence<byte> compressedReadOnlySequence;

private static int WriteUInt32(Span<byte> span, uint value)
{
BinaryPrimitives.WriteUInt32BigEndian(span, value);
return 4;
}

private static int Write(Span<byte> span, ReadOnlySequence<byte> msg)
{
msg.CopyTo(span);
return (int) msg.Length;
}

public void Compress(List<Message> messages)
{
MessagesCount = messages.Count;
UnCompressedSize = messages.Sum(msg => 4 + msg.Size);
var messagesSource = new Span<byte>(new byte[UnCompressedSize]);
var offset = 0;
foreach (var msg in messages)
{
offset += WriteUInt32(messagesSource.Slice(offset), (uint) msg.Size);
offset += msg.Write(messagesSource.Slice(offset));
}

using var source = new MemoryStream(messagesSource.ToArray());
using var destination = new MemoryStream();
var settings = new LZ4EncoderSettings
{
ChainBlocks = false
};
using (var target = LZ4Stream.Encode(destination, settings, false))
{
source.CopyTo(target);
}
compressedReadOnlySequence = new ReadOnlySequence<byte>(destination.ToArray());
}

public int Write(Span<byte> span)
{
return Write(span, compressedReadOnlySequence);
}

public int CompressedSize => (int) compressedReadOnlySequence.Length;

public int UnCompressedSize { get; private set; }
public int MessagesCount { get; private set; }

public CompressionType CompressionType => CompressionType.Lz4;

public ReadOnlySequence<byte> UnCompress(ReadOnlySequence<byte> source, uint dataLen, uint unCompressedDataSize)
{
using var target = new MemoryStream();
using (var sourceDecode = LZ4Stream.Decode(new MemoryStream(source.ToArray())))
{
sourceDecode.CopyTo(target);
}

return new ReadOnlySequence<byte>(target.ToArray());
}
}
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ await producer.Send(publishingId, subEntryMessages, CompressionType.Gzip);
messages.Clear();
```

Not all the compressions are implemented by defaults, to avoid to many dependencies.
See the table:

| Compression | Description | Provided by client |
|------------------------|----------------|--------------------|
| CompressionType.None | No compression | yes |
Expand All @@ -239,7 +242,8 @@ messages.Clear();
| CompressionType.Snappy | Snappy | No |
| CompressionType.Zstd | Zstd | No |

See the section: "Implement a Custom Compression Codec" for more details.
You can add missing codecs with `StreamCompressionCodecs.RegisterCodec` api.
See [Examples/CompressCodecs](./Examples/CompressCodecs) for `Lz4`,`Snappy` and `Zstd` implementations.

### Deduplication
[See here for more details](https://rabbitmq.github.io/rabbitmq-stream-java-client/snapshot/htmlsingle/#outbound-message-deduplication)
Expand Down

0 comments on commit 8dd8a23

Please sign in to comment.