Skip to content

Commit

Permalink
feat: add some basic json pipes to parse and split (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seddryck authored Sep 1, 2024
1 parent e90fc69 commit c7dcb0c
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 0 deletions.
39 changes: 39 additions & 0 deletions Streamistry.Core/Pipes/Mappers/JsonPathPlucker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Text.Json.Nodes;
using System.Threading.Tasks;
using Json.Path;

namespace Streamistry.Pipes.Mappers;
public abstract class BaseJsonPathPlucker<TJson, T> : Mapper<TJson, T> where TJson : JsonNode
{
public BaseJsonPathPlucker(IChainablePipe<TJson> upstream, string path)
: base(upstream, (x) => GetValue(x, JsonPath.Parse(path)))
{ }

protected static T? GetValue(TJson? node, JsonPath path)
{
var matches = path.Evaluate(node).Matches;
if (matches.Count==0)
return default;
if (matches[0].Value.TryGetValue<T>(out var value))
return value;
return default;
}
}

public class JsonPathPlucker<T> : BaseJsonPathPlucker<JsonObject, T>
{
public JsonPathPlucker(IChainablePipe<JsonObject> upstream, string path)
: base(upstream, path)
{ }
}

public class JsonPathArrayPlucker<T> : BaseJsonPathPlucker<JsonArray, T>
{
public JsonPathArrayPlucker(IChainablePipe<JsonArray> upstream, string path)
: base(upstream, path)
{ }
}
29 changes: 29 additions & 0 deletions Streamistry.Core/Pipes/Parsers/JsonArrayParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json.Nodes;
using System.Threading.Tasks;

namespace Streamistry.Pipes.Parsers;
public class JsonArrayParser : StringParser<JsonArray>
{
public JsonArrayParser(IChainablePipe<string> upstream)
: base(upstream, TryParse)
{ }

private static bool TryParse(string? text, out JsonArray? value)
{
value = null;
if (text == null)
return true;

try
{
value = (JsonArray)JsonNode.Parse(text)!;
return true;
}
catch
{ return false; }
}
}
29 changes: 29 additions & 0 deletions Streamistry.Core/Pipes/Parsers/JsonObjectParser.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;
using System.Threading.Tasks;

namespace Streamistry.Pipes.Parsers;
public class JsonObjectParser : StringParser<JsonObject>
{
public JsonObjectParser(IChainablePipe<string> upstream)
: base(upstream, new ParserDelegate<string, JsonObject>(TryParse))
{ }

private static bool TryParse(string? text, out JsonObject? value)
{
value = null;
if (text == null)
return true;
try
{
value = (JsonObject)JsonNode.Parse(text)!;
return true;
}
catch
{ return false; }
}
}
24 changes: 24 additions & 0 deletions Streamistry.Core/Pipes/Splitters/JsonArraySplitter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection.Metadata.Ecma335;
using System.Text;
using System.Text.Json.Nodes;
using System.Threading.Tasks;

namespace Streamistry.Pipes.Splitters;
internal class JsonArraySplitter : Splitter<JsonArray, JsonObject>
{
public JsonArraySplitter(IChainablePipe<JsonArray> upstream)
: base(upstream, x => [..Split(x)])
{ }

private static IEnumerable<JsonObject?>? Split(JsonArray? array)
{
if (array is null)
yield return null;
else
foreach (var item in array!)
yield return (JsonObject?)item;
}
}
1 change: 1 addition & 0 deletions Streamistry.Core/Streamistry.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JsonPath.Net" Version="1.1.4" />
<PackageReference Include="MethodBoundaryAspect.Fody" Version="2.0.149" />
<PackageReference Include="Mono.Cecil" Version="0.11.5" />
</ItemGroup>
Expand Down
98 changes: 98 additions & 0 deletions Streamistry.Testing/JsonTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NUnit.Framework;
using Streamistry.Pipes.Sinks;
using Streamistry.Pipes.Sources;
using Streamistry.Pipes.Mappers;
using System.Text.Json.Nodes;
using Streamistry.Pipes.Parsers;
using Streamistry.Pipes.Splitters;

namespace Streamistry.Testing;
public class JsonTests
{
private const string JsonFirst = @"
{
""user"": {
""name"": ""Albert Einstein"",
""age"": 30,
""contact"": {
""email"": ""[email protected]"",
""phone"": ""123-456-7890""
}
}
}";

private const string JsonSecond = @"
{
""user"": {
""name"": ""Nikola Tesla"",
""age"": 35,
""contact"": {
""email"": ""[email protected]"",
""phone"": ""321-654-0987""
}
}
}";

private const string JsonThird = @"
{
""user"": {
""name"": ""John von Neumann"",
""age"": 72,
""contact"": {
""phone"": ""456-123-0987""
}
}
}";

[Test]
[TestCase(JsonFirst, "[email protected]")]
[TestCase(JsonThird, null)]
public void JsonPathPlucker_ValidPath_ExistingValue(string jsonString, string? email)
{
var pipeline = new Pipeline<JsonObject>();
var plucker = new JsonPathPlucker<string>(pipeline, "$.user.contact.email");
var sink = new MemorySink<string>(plucker);
plucker.Emit((JsonObject)JsonNode.Parse(jsonString)!);

Assert.That(sink.State, Has.Count.EqualTo(1));
Assert.That(sink.State.First(), Is.EqualTo(email));
}

[Test]
public void JsonObjectParser_ValidString_ExistingValue()
{
var source = new EnumerableSource<string>([JsonFirst, JsonSecond, JsonThird]);
var pipeline = new Pipeline(source);
var parser = new JsonObjectParser(source);
var plucker = new JsonPathPlucker<string>(parser, "$.user.contact.email");
var sink = new MemorySink<string>(plucker);
pipeline.Start();

Assert.That(sink.State, Has.Count.EqualTo(3));
Assert.That(sink.State.First(), Is.EqualTo("[email protected]"));
Assert.That(sink.State.Last(), Is.Null);
}

[Test]
public void JsonArrayParser_ValidString_ExistingValue()
{
var array = $"[{JsonFirst}, {JsonSecond}, {JsonThird}]";
var source = new EnumerableSource<string>([array]);
var pipeline = new Pipeline(source);
var parser = new JsonArrayParser(source);
var splitter = new JsonArraySplitter(parser);
var plucker = new JsonPathPlucker<string>(splitter, "$.user.contact.email");
var sink = new MemorySink<string>(plucker);
pipeline.Start();

Assert.That(sink.State, Has.Count.EqualTo(3));
Assert.That(sink.State.First(), Is.EqualTo("[email protected]"));
Assert.That(sink.State.Last(), Is.Null);
}
}

0 comments on commit c7dcb0c

Please sign in to comment.