Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: records die niet meer in de bron staan verwijderen bij bulk proces #18

Merged
merged 10 commits into from
Jul 25, 2024
35 changes: 35 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ env:
IMAGE_NAME: ghcr.io/klantinteractie-servicesysteem/kiss-elastic-sync # lowercase

jobs:


build:
runs-on: ubuntu-latest

Expand Down Expand Up @@ -76,3 +78,36 @@ jobs:
tags: ${{env.IMAGE_NAME}}:${{ env.RELEASE }},${{env.IMAGE_NAME}}:latest
cache-from: type=gha
cache-to: type=gha,mode=max


test:
runs-on: ubuntu-latest
permissions:
contents: read
issues: read
checks: write
pull-requests: write
packages: write

steps:
- uses: actions/checkout@v3
- uses: actions/setup-dotnet@v3
with:
dotnet-version: '8.0.x'
- run: dotnet test test/Kiss.Elastic.Sync.IntegrationTest -c Release --logger trx --results-directory ./testresults -v n /p:CollectCoverage=true /p:CoverletOutput=./testresults/ /p:CoverletOutputFormat=lcov

- name: Publish Test Results
uses: EnricoMi/publish-unit-test-result-action@v2
if: always()
with:
action_fail: true
files: |
testresults/*.trx
testresults/*.xml

# - name: Publish Coverage
# if: github.event_name == 'pull_request'
# uses: romeovs/[email protected]
# with:
# lcov-file: ./testresults/coverage.info
# github-token: ${{ secrets.GITHUB_TOKEN }}
6 changes: 6 additions & 0 deletions Kiss.Elastic.Sync.sln
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.github\workflows\ci.yml = .github\workflows\ci.yml
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kiss.Elastic.Sync.IntegrationTest", "test\Kiss.Elastic.Sync.IntegrationTest\Kiss.Elastic.Sync.IntegrationTest.csproj", "{A911527B-A4CE-4103-A67B-BB578D2E9C60}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -27,6 +29,10 @@ Global
{B1BD4D15-D7CD-4583-98FC-F63BD62B5B26}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B1BD4D15-D7CD-4583-98FC-F63BD62B5B26}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B1BD4D15-D7CD-4583-98FC-F63BD62B5B26}.Release|Any CPU.Build.0 = Release|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A911527B-A4CE-4103-A67B-BB578D2E9C60}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
160 changes: 130 additions & 30 deletions src/Kiss.Elastic.Sync/ElasticBulkClient.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
using System.Net.Http.Headers;
using System.Text.Json;
using System.Text.Json.Nodes;
using Elastic.Clients.Elasticsearch;
using Elastic.Transport;
using HttpMethod = System.Net.Http.HttpMethod;

namespace Kiss.Elastic.Sync
{
internal sealed class ElasticBulkClient : IDisposable
public sealed class ElasticBulkClient : IDisposable
{
const long MaxBytesForBulk = 50_000_000;
const byte NewLine = (byte)'\n';
const int MaxPageSizeForScrolling = 10_000;

private static JsonElement GetFieldMapping()
{
using var str = Helpers.GetEmbedded("field.json") ?? Stream.Null;
Expand All @@ -16,8 +23,10 @@ private static JsonElement GetFieldMapping()
private static readonly JsonElement s_fieldMapping = GetFieldMapping();

private readonly HttpClient _httpClient;
private readonly ElasticsearchClient _elasticsearchClient;
private readonly int _scrollPageSize;

public ElasticBulkClient(Uri baseUri, string username, string password)
public ElasticBulkClient(Uri baseUri, string username, string password, int scrollPageSize = MaxPageSizeForScrolling)
{
var handler = new HttpClientHandler();
handler.ClientCertificateOptions = ClientCertificateOption.Manual;
Expand All @@ -29,12 +38,14 @@ public ElasticBulkClient(Uri baseUri, string username, string password)
_httpClient = new HttpClient(handler);
_httpClient.BaseAddress = baseUri;
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", Helpers.EncodeCredential(username, password));
var clientSettings = new ElasticsearchClientSettings(baseUri)
.Authentication(new BasicAuthentication(username, password));
_elasticsearchClient = new ElasticsearchClient(clientSettings);
_scrollPageSize = scrollPageSize;
}

public void Dispose() => _httpClient?.Dispose();



public static ElasticBulkClient Create()
{
var elasticBaseUrl = Helpers.GetRequiredEnvironmentVariable("ELASTIC_BASE_URL");
Expand All @@ -59,41 +70,42 @@ public async Task<string> IndexBulk(IAsyncEnumerable<KissEnvelope> envelopes, st
});

if (!await EnsureIndex(bron, indexName, completionFields, token)) return indexName;

var existingIds = await GetExistingIds(indexName, token);

await using var enumerator = envelopes.GetAsyncEnumerator(token);
var hasNext = await enumerator.MoveNextAsync();
const long MaxLength = 50 * 1000 * 1000;
const byte NewLine = (byte)'\n';
while (hasNext)

// we need to continue sending requests to elasticsearch until:
// - there are no more records from the source left to process
// - there are no more records to delete
// we enter this loop multiple times if there are so much records in the source,
// that the total size of the request would become too large if we send it to Elasticsearch in one go.
while (hasNext || existingIds.Count > 0)
mstokericatt marked this conversation as resolved.
Show resolved Hide resolved
{
long written = 0;
using var content = new PushStreamContent(async (stream) =>
{
using var writer = new Utf8JsonWriter(stream);

while (hasNext && written < MaxLength)
// here we loop through the records from the source and add them to the index,
// as long as the request size is not too large
while (hasNext && written < MaxBytesForBulk)
{
writer.WriteStartObject();
writer.WritePropertyName("index");
writer.WriteStartObject();
writer.WriteString("_index", indexName);
writer.WriteString("_id", enumerator.Current.Id);
writer.WriteEndObject();
writer.WriteEndObject();
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;

enumerator.Current.WriteTo(writer, bron);
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;

existingIds.Remove(enumerator.Current.Id);
written += await WriteIndexStatement(writer, stream, indexName, bron, enumerator.Current, token);
hasNext = await enumerator.MoveNextAsync();
}

// once there are no more records from the source left to process,
// and there is still room in the request to add delete statements
// write a delete statement for all ids that are no longer in the source
while (!hasNext && existingIds.Count > 0 && written < MaxBytesForBulk)
{
var existingId = existingIds.First();
existingIds.Remove(existingId);
written += await WriteDeleteStatement(stream, writer, indexName, existingId, token);
}
});
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");
using var request = new HttpRequestMessage(HttpMethod.Post, "_bulk")
Expand Down Expand Up @@ -156,7 +168,53 @@ private async Task<bool> EnsureIndex(string bron, string indexName, IReadOnlyLis
return putResponse.IsSuccessStatusCode;
}

private JsonObject MapCompletionFields(IReadOnlyList<string> completionFields)
private async Task<HashSet<string>> GetExistingIds(string indexName, CancellationToken token)
{
// 1 minute is used in the elasticsearch examples
var scrollDuration = TimeSpan.FromMinutes(1);
var result = new HashSet<string>();

var searchResponse = await _elasticsearchClient.SearchAsync<object>(x => x
.Index(indexName)
// we don't need any stored fields
.StoredFields(Array.Empty<string>())
// we don't need the source documents
.Source(new(false))
.Size(_scrollPageSize)
// scrolling is the most efficient way to loop through big result sets
.Scroll(scrollDuration),
token);

var scrollId = searchResponse.ScrollId;
var hits = searchResponse.Hits;

while (scrollId is not null && hits.Count > 0)
{
foreach (var id in hits.Select(x=> x.Id).OfType<string>())
{
result.Add(id);
}

// get the next result set by specifying the scrollId we got previously
var scrollResponse = await _elasticsearchClient.ScrollAsync<object>(new ScrollRequest {
ScrollId = scrollId,
Scroll = scrollDuration,
}, token);

scrollId = scrollResponse.ScrollId;
hits = scrollResponse.Hits;
}

if (scrollId is not null)
{
// it's best practice to clear the active scroll when you are done
await _elasticsearchClient.ClearScrollAsync(x => x.ScrollId(searchResponse.ScrollId!), token);
}

return result;
}

private static JsonObject MapCompletionFields(IReadOnlyList<string> completionFields)
{
if (!completionFields.Any()) return JsonObject.Create(s_fieldMapping)!;
var split = completionFields.Select(x => x.Split("."));
Expand All @@ -181,10 +239,52 @@ private JsonObject MapCompletionFields(IReadOnlyList<string> completionFields)
}
}
var value = MapCompletionFields(fields);
properties[group.Key] = value;
properties[group.Key] = value;
}

return result;
}

private static async Task<long> WriteIndexStatement(Utf8JsonWriter writer, Stream stream, string indexName, string bron, KissEnvelope envelope, CancellationToken token)
{
long written = 0;
writer.WriteStartObject();
writer.WritePropertyName("index"u8);
writer.WriteStartObject();
writer.WriteString("_index"u8, indexName);
writer.WriteString("_id"u8, envelope.Id);
writer.WriteEndObject();
writer.WriteEndObject();
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;

envelope.WriteTo(writer, bron);
await writer.FlushAsync(token);
written += writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;
return written;
}

private static async Task<long> WriteDeleteStatement(Stream stream, Utf8JsonWriter writer, string indexName, string existingId, CancellationToken token)
{
writer.WriteStartObject();
writer.WritePropertyName("delete"u8);
writer.WriteStartObject();
writer.WriteString("_index"u8, indexName);
writer.WriteString("_id"u8, existingId);
writer.WriteEndObject();
writer.WriteEndObject();
await writer.FlushAsync(token);
var written = writer.BytesCommitted;
writer.Reset();
stream.WriteByte(NewLine);
written++;
return written;
}
}
}
1 change: 1 addition & 0 deletions src/Kiss.Elastic.Sync/Kiss.Elastic.Sync.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.14.6" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.20.1" />
<PackageReference Include="System.IdentityModel.Tokens.Jwt" Version="7.5.1" />
</ItemGroup>
Expand Down
70 changes: 70 additions & 0 deletions test/Kiss.Elastic.Sync.IntegrationTest/ElasticBulkClientTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@

using System.Text.Json;
using Elastic.Clients.Elasticsearch;
using Kiss.Elastic.Sync.IntegrationTest.Infrastructure;
using Testcontainers.Elasticsearch;

namespace Kiss.Elastic.Sync.IntegrationTest
{
public class ElasticBulkClientTests(ElasticFixture fixture) : IClassFixture<ElasticFixture>
{
const string IndexWithoutPrefix = "my_index";
const string IndexWithPrefix = $"search-{IndexWithoutPrefix}";

[Fact]
public async Task Bulk_insert_works_for_inserts_updates_and_deletes()
{
using var bulkClient = new ElasticBulkClient(
fixture.BaseUri,
ElasticsearchBuilder.DefaultUsername,
ElasticsearchBuilder.DefaultPassword,
1 // page size of 1 so we also test if scrolling works correctly
);

var elastic = new ElasticsearchClient(fixture.BaseUri);

// index some records and assert if we get the same records back from elasticsearch
await BulkIndexRecordsAndAssertOutput(bulkClient, elastic, new ()
{
["1"] = "first record to be deleted",
["2"] = "second record to be updated",
["3"] = "third record to remain the same",
});

// index a new set of records, with:
// - an excluded record from the first set
// - an update
// - an unchanged record
// - a new record
// and assert if we get that exact same records back from elasticsearch
await BulkIndexRecordsAndAssertOutput(bulkClient, elastic, new()
{
["2"] = "second record with update",
["3"] = "third record to remain the same",
["4"] = "fourth record which is new",
});
}

private static async Task BulkIndexRecordsAndAssertOutput(ElasticBulkClient bulkClient, ElasticsearchClient elastic, Dictionary<string, string> expectedRecords)
{
var envelopes = expectedRecords
.Select(Map)
.AsAsyncEnumerable();

await bulkClient.IndexBulk(envelopes, IndexWithoutPrefix, [], default);

var refreshResponse = await elastic.Indices.RefreshAsync(IndexWithPrefix);
Assert.True(refreshResponse.IsSuccess());

var searchResponse = await elastic.SearchAsync<KissEnvelope>(IndexWithPrefix);
Assert.True(searchResponse.IsSuccess());

var actualRecords = searchResponse.Hits.ToDictionary(x => x.Id!, x => x.Source.Title!);

Assert.Equal(expectedRecords, actualRecords);
}

private static KissEnvelope Map(KeyValuePair<string, string> x)
=> new(JsonDocument.Parse(JsonSerializer.Serialize(x)).RootElement, x.Value, null, x.Key);
}
}
Loading
Loading