-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add MessagePack item serializer and validator #628
base: main
Are you sure you want to change the base?
Conversation
And fix issues found during testing
This change applies only to types in which leading zeros are insignificant.
The parser mapping ByteVector to MsgpackItem can be seen as a not injective morphism, that is, there are many ByteVectors that will map to the same MsgpackItem. Because of this, we cannot possibly guarantee that `serialize(parse(bs))` is fixpoint for an arbitrary `bs`. However, currently implemented serializers *are* injective (if we exclude the Timestamp format family as it can be represented with Extension types) and so, we can guarantee `serialize(parse(bs)) == bs` if `bs` is a member of a subset of ByteVector that is emitted by a serializer. In other words, the following code will be true for any `bs` if `serialize` is injective and we ignore the Timestamp type family: ``` val first = serialize(parse(bs)) val second = serialize(parse(first)) first == second ``` This test makes sure that the above holds.
- There was very little performance difference between serializers so the `fast` serializer was entirely scrapped. - The current serializer buffers the output in 4KiB segments before emitting it. This change brought a significant speedup.
The initial concept of having two serializers was dropped as they ran at a nearly identical time 😃. These are the current benchmarks on i7-8550U:
Validation seems to slow things down a bit and I think that maybe it is possible to make it a little faster. |
msgpack/src/main/scala/fs2/data/msgpack/low/internal/ItemSerializer.scala
Outdated
Show resolved
Hide resolved
msgpack/src/main/scala/fs2/data/msgpack/low/internal/ItemSerializer.scala
Show resolved
Hide resolved
msgpack/src/main/scala/fs2/data/msgpack/low/internal/ItemValidator.scala
Outdated
Show resolved
Hide resolved
Thanks a ton for this new contribution. I had a first look at it and left a few comments. But it looks great already. 👏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall 👏 , left only some smaller comments.
.compile | ||
.string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken, this destroys all chunking, you'll work with one gigantic Chunk
– not what real life code should usually do. I think you can simply combine those two streams roughly along the lines of
fs2.io
.readClassLoaderResource[SyncIO]("twitter_msgpack.txt", 4096)
.through(fs2.text.utf8.decode)
.map(str => Chunk.byteVector(ByteVector.fromHex(str).get))
.unchunks
.through(fs2.data.msgpack.low.items[SyncIO])
.compile
.toList
.unsafeRunSync()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the twitter_msgpack.txt
file contained space separated hex representations of bytes, the fs2.text.utf8.decode
pipe could slice some bytes in half between two chunks. E.g. "A1 B2 C3"
would be converted into Chunk(0x0a, 0x1b)
and Chunk(0x02, 0xc3)
if our byte stream had a chunk size equal to 4.
We could remove spaces from input data but even then we would rely on the chunk size being a multiple of two which I think is a bit fragile and hard to debug.
Instead, I converted the test data into a raw binary form which is loaded into memory as a chunked stream before benchmarks are run. The implementation looks a bit clunky but I'm not aware of any better way to achieve this.
msgpack/src/main/scala/fs2/data/msgpack/low/internal/ItemSerializer.scala
Outdated
Show resolved
Hide resolved
msgpack/src/main/scala/fs2/data/msgpack/low/internal/ItemSerializer.scala
Outdated
Show resolved
Hide resolved
msgpack/src/main/scala/fs2/data/msgpack/low/internal/ItemSerializer.scala
Outdated
Show resolved
Hide resolved
The serializer itself was corrected in 041e135
MessagePack Arrays and Maps can hold up to 2^32 - 1 items which is more than the `Int` type can represent without negative values.
Also drop the `fitsIn` function as we now use `Long`s instead of `Int`s and so we don't need to compare unsigned values.
Hi @jarmuszz. I am not sure anymore what the state of this PR is. Is it ready to be re-reviewed or do you still want to address some comments or improvements? |
Hi @satabin, I'll try to address these comments this weekend. I've been trying to combine work with full-time studies which turned out to be more time sinking than I imagined 😃. I hope that in the following weeks I'll be working on the fs2-data-msgpack project in a more regular fashion since I'm resigning from the job. |
instead of relying on conversion from hex representation via scodec. This also makes it easier to load input data into a properly chunked stream.
Fixes the benchmark and reflects changes made in 989ec8a.
Hi @jarmuszz. Sorry I am replying late again. What is the state of this PR? Do you still plan on working on some parts? Or is it ready to be merged when approved? |
Hi, I think it is ready to be merged if approved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed it again, and it looks good to me as a first shot. Thanks again @jarmuszz!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found a few nitpick places, but will take care of fixing them myself the next days, no action required. Good work!
Stream | ||
.emits(cases) | ||
.evalMap { case (lhs, rhs) => | ||
Stream | ||
.emit(lhs) | ||
.through(low.toBinary[F]) | ||
.compile | ||
.drain | ||
.redeem(expect.same(_, rhs), _ => failure(s"Expected error for item ${lhs}")) | ||
} | ||
.compile | ||
.foldMonoid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just delegate to validation
here:
Stream | |
.emits(cases) | |
.evalMap { case (lhs, rhs) => | |
Stream | |
.emit(lhs) | |
.through(low.toBinary[F]) | |
.compile | |
.drain | |
.redeem(expect.same(_, rhs), _ => failure(s"Expected error for item ${lhs}")) | |
} | |
.compile | |
.foldMonoid | |
validation[F](cases.map { case (i, t) => List(i) -> t }*) |
if (processed == pre) | ||
success | ||
else | ||
failure(s"Serializer should be fixpoint for ${pre} but it emitted ${processed}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (processed == pre) | |
success | |
else | |
failure(s"Serializer should be fixpoint for ${pre} but it emitted ${processed}") | |
expect(processed == pre, s"Serializer should be fixpoint for ${pre} but it emitted ${processed}") |
This pull request brings serialization and validation to the
msgpack.low
module.Serialization is provided for the outside use through
tobinary: Pipe[F, MsgpackItem, Byte]
andtoNonValidatedBinary: Pipe[F, MsgpackItem, Byte]
functions. Akin to how thecbor.low
serialization is constructed, serialization without validation can potentialy produce malformed data.Validation API is also exposed through
validated: Pipe[F, MsgpackItem, MsgpackItem]
.