Skip to content

Commit

Permalink
Fixed queries on ResultTable
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo committed Mar 6, 2023
1 parent 8c61f15 commit 5a5ec26
Showing 1 changed file with 43 additions and 24 deletions.
67 changes: 43 additions & 24 deletions Adaptors/MongoDB/src/ResultTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ public IAsyncEnumerable<Result> GetResults(string sessionId,
var sessionHandle = sessionProvider_.Get();
var resultCollection = resultCollectionProvider_.Get();

var ids = keys.Select(id => Result.GenerateId(sessionId,
id))
.ToList();
return resultCollection.AsQueryable(sessionHandle)
.Where(model => keys.Contains(model.Name) && model.SessionId == sessionId)
.Where(model => ids.Contains(model.Id))
.ToAsyncEnumerable(cancellationToken);
}

Expand All @@ -145,8 +148,12 @@ public async Task<IEnumerable<ResultStatusCount>> AreResultsAvailableAsync(strin
var sessionHandle = sessionProvider_.Get();
var resultCollection = resultCollectionProvider_.Get();

var ids = keys.Select(id => Result.GenerateId(sessionId,
id))
.ToList();

return await resultCollection.AsQueryable(sessionHandle)
.Where(model => model.SessionId == sessionId && keys.Contains(model.Name))
.Where(model => ids.Contains(model.Id))
.GroupBy(model => model.Status)
.Select(models => new ResultStatusCount(models.Key,
models.Count()))
Expand Down Expand Up @@ -198,14 +205,14 @@ public async Task SetResult(string sessionId,
key);
var resultCollection = resultCollectionProvider_.Get();

var res = await resultCollection
.UpdateOneAsync(Builders<Result>.Filter.Where(model => model.Name == key && model.OwnerTaskId == ownerTaskId && model.SessionId == sessionId),
Builders<Result>.Update.Set(model => model.Status,
ResultStatus.Completed)
.Set(model => model.Data,
smallPayload),
cancellationToken: cancellationToken)
.ConfigureAwait(false);
var res = await resultCollection.UpdateOneAsync(Builders<Result>.Filter.Where(model => model.Id == Result.GenerateId(sessionId,
key) && model.OwnerTaskId == ownerTaskId),
Builders<Result>.Update.Set(model => model.Status,
ResultStatus.Completed)
.Set(model => model.Data,
smallPayload),
cancellationToken: cancellationToken)
.ConfigureAwait(false);
if (res.ModifiedCount == 0)
{
throw new ResultNotFoundException($"Key '{key}' not found");
Expand All @@ -228,7 +235,8 @@ public async Task SetResult(string sessionId,

var resultCollection = resultCollectionProvider_.Get();

var res = await resultCollection.UpdateOneAsync(Builders<Result>.Filter.Where(model => model.Name == key && model.OwnerTaskId == ownerTaskId),
var res = await resultCollection.UpdateOneAsync(Builders<Result>.Filter.Where(model => model.Id == Result.GenerateId(sessionId,
key) && model.OwnerTaskId == ownerTaskId),
Builders<Result>.Update.Set(model => model.Status,
ResultStatus.Completed),
cancellationToken: cancellationToken)
Expand All @@ -240,7 +248,7 @@ public async Task SetResult(string sessionId,
}

/// <inheritdoc />
public async Task<IEnumerable<GetResultStatusReply.Types.IdStatus>> GetResultStatus(IEnumerable<string> ids,
public async Task<IEnumerable<GetResultStatusReply.Types.IdStatus>> GetResultStatus(IEnumerable<string> keys,
string sessionId,
CancellationToken cancellationToken = default)
{
Expand All @@ -249,9 +257,12 @@ public async Task SetResult(string sessionId,
var sessionHandle = sessionProvider_.Get();
var resultCollection = resultCollectionProvider_.Get();

var ids = keys.Select(id => Result.GenerateId(sessionId,
id))
.ToList();

return await resultCollection.AsQueryable(sessionHandle)
.Where(model => ids.Contains(model.Name) && model.SessionId == sessionId)
.Where(model => ids.Contains(model.Id))
.Select(model => new GetResultStatusReply.Types.IdStatus
{
ResultId = model.Name,
Expand Down Expand Up @@ -288,16 +299,23 @@ public async Task ChangeResultOwnership(string

var resultCollection = resultCollectionProvider_.Get();

await resultCollection.BulkWriteAsync(requests.Select(r => new UpdateManyModel<Result>(Builders<Result>.Filter.And(Builders<Result>.Filter.In(model => model.Name,
r.Keys),
Builders<Result>.Filter
.Eq(model => model.OwnerTaskId,
oldTaskId),
Builders<Result>.Filter
.Eq(model => model.SessionId,
sessionId)),
Builders<Result>.Update.Set(model => model.OwnerTaskId,
r.NewTaskId))),
await resultCollection.BulkWriteAsync(requests.Select(r =>
{
var ids = r.Keys.Select(id => Result.GenerateId(sessionId,
id))
.ToList();
return new UpdateManyModel<Result>(Builders<Result>.Filter.And(Builders<Result>.Filter.In(model => model.Id,
ids),
Builders<Result>.Filter
.Eq(model
=> model.OwnerTaskId,
oldTaskId),
Builders<Result>.Filter
.Eq(model => model.SessionId,
sessionId)),
Builders<Result>.Update.Set(model => model.OwnerTaskId,
r.NewTaskId));
}),
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
Expand All @@ -315,7 +333,8 @@ public async Task DeleteResult(string session,
key);
var resultCollection = resultCollectionProvider_.Get();

var result = await resultCollection.DeleteOneAsync(model => model.Name == key && model.SessionId == session,
var result = await resultCollection.DeleteOneAsync(model => model.Id == Result.GenerateId(session,
key),
cancellationToken)
.ConfigureAwait(false);

Expand Down

0 comments on commit 5a5ec26

Please sign in to comment.