Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: records die niet meer in de bron staan verwijderen bij bulk proces #18

Merged
merged 10 commits into from
Jul 25, 2024
35 changes: 35 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ env:
IMAGE_NAME: ghcr.io/klantinteractie-servicesysteem/kiss-elastic-sync # lowercase

jobs:


build:
runs-on: ubuntu-latest

Expand Down Expand Up @@ -76,3 +78,36 @@ jobs:
tags: ${{env.IMAGE_NAME}}:${{ env.RELEASE }},${{env.IMAGE_NAME}}:latest
cache-from: type=gha
cache-to: type=gha,mode=max


test:
runs-on: ubuntu-latest
permissions:
contents: read
issues: read
checks: write
pull-requests: write
packages: write

steps:
- uses: actions/checkout@v3
- uses: actions/setup-dotnet@v3
with:
dotnet-version: '8.0.x'
- run: dotnet test test/Kiss.Elastic.Sync.IntegrationTest -c Release --logger trx --results-directory ./testresults -v n /p:CollectCoverage=true /p:CoverletOutput=./testresults/ /p:CoverletOutputFormat=lcov

- name: Publish Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
action_fail: true
files: |
testresults/*.trx
testresults/*.xml

# - name: Publish Coverage
# if: github.event_name == 'pull_request'
# uses: romeovs/[email protected]
# with:
# lcov-file: ./testresults/coverage.info
# github-token: ${{ secrets.GITHUB_TOKEN }}
6 changes: 6 additions & 0 deletions Kiss.Elastic.Sync.sln
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.github\workflows\ci.yml = .github\workflows\ci.yml
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kiss.Elastic.Sync.IntegrationTest", "test\Kiss.Elastic.Sync.IntegrationTest\Kiss.Elastic.Sync.IntegrationTest.csproj", "{A911527B-A4CE-4103-A67B-BB578D2E9C60}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -27,6 +29,10 @@ Global
{B1BD4D15-D7CD-4583-98FC-F63BD62B5B26}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B1BD4D15-D7CD-4583-98FC-F63BD62B5B26}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B1BD4D15-D7CD-4583-98FC-F63BD62B5B26}.Release|Any CPU.Build.0 = Release|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
201 changes: 172 additions & 29 deletions src/Kiss.Elastic.Sync/ElasticBulkClient.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Nodes;

namespace Kiss.Elastic.Sync
{
internal sealed class ElasticBulkClient : IDisposable
public sealed class ElasticBulkClient : IDisposable
{
const long MaxBytes = 50_000_000;
const byte NewLine = (byte)'\n';

private static JsonElement GetFieldMapping()
{
using var str = Helpers.GetEmbedded("field.json") ?? Stream.Null;
Expand Down Expand Up @@ -33,8 +37,6 @@ public ElasticBulkClient(Uri baseUri, string username, string password)

public void Dispose() => _httpClient?.Dispose();



public static ElasticBulkClient Create()
{
var elasticBaseUrl = Helpers.GetRequiredEnvironmentVariable("ELASTIC_BASE_URL");
Expand All @@ -59,41 +61,34 @@ public async Task<string> IndexBulk(IAsyncEnumerable<KissEnvelope> envelopes, st
});

if (!await EnsureIndex(bron, indexName, completionFields, token)) return indexName;

var existingIds = await GetExistingIds(indexName, token);

await using var enumerator = envelopes.GetAsyncEnumerator(token);
var hasNext = await enumerator.MoveNextAsync();
const long MaxLength = 50 * 1000 * 1000;
const byte NewLine = (byte)'\n';
while (hasNext)

while (hasNext || existingIds.Count > 0)
mstokericatt marked this conversation as resolved.
Show resolved Hide resolved
{
long written = 0;
using var content = new PushStreamContent(async (stream) =>
{
using var writer = new Utf8JsonWriter(stream);

while (hasNext && written < MaxLength)
// write an index statement in the bulk document for all records from the source
while (hasNext && written < MaxBytes)
{
writer.WriteStartObject();
writer.WritePropertyName("index");
writer.WriteStartObject();
writer.WriteString("_index", indexName);
writer.WriteString("_id", enumerator.Current.Id);
writer.WriteEndObject();
writer.WriteEndObject();
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;

enumerator.Current.WriteTo(writer, bron);
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;

existingIds.Remove(enumerator.Current.Id);
written = await WriteIndexStatement(writer, stream, indexName, bron, enumerator.Current, token);
hasNext = await enumerator.MoveNextAsync();
}

// write a delete statement for all ids that are no longer in the source
while (!hasNext && existingIds.Count > 0 && written < MaxBytes)
{
var existingId = existingIds.First();
existingIds.Remove(existingId);
written += await WriteDeleteStatement(stream, writer, indexName, existingId, token);
}
});
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
using var request = new HttpRequestMessage(HttpMethod.Post, "_bulk")
Expand Down Expand Up @@ -156,7 +151,85 @@ private async Task<bool> EnsureIndex(string bron, string indexName, IReadOnlyLis
return putResponse.IsSuccessStatusCode;
}

private JsonObject MapCompletionFields(IReadOnlyList<string> completionFields)
private async Task<HashSet<string>> GetExistingIds(string indexName, CancellationToken token)
{
using var request = new HttpRequestMessage(HttpMethod.Post, $"{indexName}/_search?scroll=1m");
request.Content = new StringContent("""
{
"query" : {
"match_all" : {}
},
"stored_fields": []
}
""", Encoding.UTF8, "application/json");
var result = new HashSet<string>();
using var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, token);
if (!response.IsSuccessStatusCode)
{
await Helpers.LogResponse(response, token);
return result;
}
using var stream = await response.Content.ReadAsStreamAsync(token);
using var doc = await JsonDocument.ParseAsync(stream, cancellationToken: token);

if (!TryGetHits(doc, out var hits, out var total, out var scrollId))
{
return [];
}

foreach (var item in hits)
{
result.Add(item);
}

while (result.Count < total && !string.IsNullOrWhiteSpace(scrollId))
{
using var scrollRequest = new HttpRequestMessage(HttpMethod.Post, "_search/scroll");
scrollRequest.Content = new StringContent($$"""
{
"scroll": "1m",
"scroll_id": "{{scrollId}}"
}
""", Encoding.UTF8, "application/json");
using var scrollResponse = await _httpClient.SendAsync(scrollRequest, HttpCompletionOption.ResponseHeadersRead, token);
if (!scrollResponse.IsSuccessStatusCode)
{
await Helpers.LogResponse(scrollResponse, token);
return result;
}
using var scrollStream = await scrollResponse.Content.ReadAsStreamAsync(token);
using var scrollDoc = await JsonDocument.ParseAsync(scrollStream, cancellationToken: token);

if (!TryGetHits(scrollDoc, out hits, out total, out scrollId))
{
return result;
}

foreach (var item in hits)
{
result.Add(item);
}
}

if (!string.IsNullOrWhiteSpace(scrollId))
{
using var deleteRequest = new HttpRequestMessage(HttpMethod.Delete, "_search/scroll");
mstokericatt marked this conversation as resolved.
Show resolved Hide resolved
deleteRequest.Content = new StringContent($$"""
{
"scroll_id": "{{scrollId}}"
}
""", Encoding.UTF8, "application/json");
using var deleteResponse = await _httpClient.SendAsync(deleteRequest, HttpCompletionOption.ResponseHeadersRead, token);
if (!deleteResponse.IsSuccessStatusCode)
{
await Helpers.LogResponse(deleteResponse, token);
}
}

return result;
}

private static JsonObject MapCompletionFields(IReadOnlyList<string> completionFields)
{
if (!completionFields.Any()) return JsonObject.Create(s_fieldMapping)!;
var split = completionFields.Select(x => x.Split("."));
Expand All @@ -181,10 +254,80 @@ private JsonObject MapCompletionFields(IReadOnlyList<string> completionFields)
}
}
var value = MapCompletionFields(fields);
properties[group.Key] = value;
properties[group.Key] = value;
}

return result;
}

private static async Task<long> WriteIndexStatement(Utf8JsonWriter writer, Stream stream, string indexName, string bron, KissEnvelope envelope, CancellationToken token)
{
long written = 0;
writer.WriteStartObject();
writer.WritePropertyName("index"u8);
writer.WriteStartObject();
writer.WriteString("_index"u8, indexName);
writer.WriteString("_id"u8, envelope.Id);
writer.WriteEndObject();
writer.WriteEndObject();
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;

envelope.WriteTo(writer, bron);
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;
return written;
}

private static async Task<long> WriteDeleteStatement(Stream stream, Utf8JsonWriter writer, string indexName, string existingId, CancellationToken token)
{
writer.WriteStartObject();
writer.WritePropertyName("delete"u8);
writer.WriteStartObject();
writer.WriteString("_index"u8, indexName);
writer.WriteString("_id"u8, existingId);
writer.WriteEndObject();
writer.WriteEndObject();
await writer.FlushAsync(token);
var written = writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;
return written;
}

private static bool TryGetHits(JsonDocument document, out IEnumerable<string> ids, out int total, out string scrollId)
{
if (!document.RootElement.TryGetProperty("hits"u8, out var hits)
|| !hits.TryGetProperty("total"u8, out var totalP)
|| !totalP.TryGetProperty("value"u8, out var totalV)
|| !totalV.TryGetInt32(out total)
|| !hits.TryGetProperty("hits"u8, out hits)
|| hits.ValueKind != JsonValueKind.Array
|| !document.RootElement.TryGetProperty("_scroll_id"u8, out var scrId)
|| scrId.ValueKind != JsonValueKind.String)
{
ids = [];
total = 0;
scrollId = "";
return false;
}

ids = hits.EnumerateArray()
.Select(x => x.TryGetProperty("_id"u8, out var id) && id.ValueKind == JsonValueKind.String
? id.GetString()
: null)
.OfType<string>();

scrollId = scrId.GetString() ?? "";

return true;
}
}
}
76 changes: 76 additions & 0 deletions test/Kiss.Elastic.Sync.IntegrationTest/ElasticBulkClientTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@

using System.Text.Json;
using Elastic.Clients.Elasticsearch;
using Kiss.Elastic.Sync.IntegrationTest.Infrastructure;
using Testcontainers.Elasticsearch;

namespace Kiss.Elastic.Sync.IntegrationTest
{
public class ElasticBulkClientTests(ElasticFixture fixture) : IClassFixture<ElasticFixture>
{
[Fact]
public async Task Bulk_insert_works_for_inserts_updates_and_deletes()
{
const string IndexWithoutPrefix = "my_index";
const string IndexWithPrefix = $"search-{IndexWithoutPrefix}";

using var bulkClient = new ElasticBulkClient(
fixture.BaseUri,
ElasticsearchBuilder.DefaultUsername,
ElasticsearchBuilder.DefaultPassword
);

var elastic = new ElasticsearchClient(fixture.BaseUri);

var expectedInitalRecords = new Dictionary<string, string>
{
["1"] = "first record to be deleted",
["2"] = "second record to be updated",
["3"] = "third record to remain the same",
};

var expectedUpdatedRecords = new Dictionary<string, string>
{
["2"] = "second record with update",
["3"] = "third record to remain the same",
["4"] = "fourth record which is new",
};

var initialEnvelopes = expectedInitalRecords
.Select(Map)
.AsAsyncEnumerable();

var updatedEnvelopes = expectedUpdatedRecords
.Select(Map)
.AsAsyncEnumerable();

// Bulk index the initial values
await bulkClient.IndexBulk(initialEnvelopes, IndexWithoutPrefix, [], default);

Assert.True((await elastic.Indices.RefreshAsync(IndexWithPrefix)).IsSuccess());

var firstSearchResponse = await elastic.SearchAsync<KissEnvelope>(IndexWithPrefix);
Assert.True(firstSearchResponse.IsSuccess());

var actualInitialRecords = firstSearchResponse.Hits.ToDictionary(x => x.Id!, x => x.Source.Title!);

Assert.Equivalent(expectedInitalRecords, actualInitialRecords);


// Bulk index the updated values
await bulkClient.IndexBulk(updatedEnvelopes, IndexWithoutPrefix, [], default);

Assert.True((await elastic.Indices.RefreshAsync(IndexWithPrefix)).IsSuccess());

var secondSearchResponse = await elastic.SearchAsync<KissEnvelope>(IndexWithPrefix);
Assert.True(secondSearchResponse.IsSuccess());

var actualUpdatedRecords = secondSearchResponse.Hits.ToDictionary(x => x.Id!, x => x.Source.Title!);

Assert.Equal(expectedUpdatedRecords, actualUpdatedRecords);
}

private static KissEnvelope Map(KeyValuePair<string, string> x)
=> new(JsonDocument.Parse(JsonSerializer.Serialize(x)).RootElement, x.Value, null, x.Key);
}
}
Loading
Loading