Skip to content

Commit

Permalink
Apply locking to fix join race conditions. Fixes #787 (#788)
Browse files Browse the repository at this point in the history
* Apply locking to fix  join race conditions. Fixes #787

* Add better test thread issue test as suggested by @JakenVeina

---------

Co-authored-by: Jake Meiergerd <[email protected]>
  • Loading branch information
RolandPheasant and JakenVeina authored Dec 11, 2023
1 parent 2c0f459 commit 67a8e74
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 5 deletions.
40 changes: 40 additions & 0 deletions src/DynamicData.Tests/Cache/InnerJoinFixtureRaceCondition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,46 @@ public void LetsSeeWhetherWeCanRandomlyHitARaceCondition()
ids.InnerJoin(itemsCache.Connect(), x => x.Id, (_, thing) => thing).Subscribe((z) => { }, ex => { }, () => { });
}

// See https://github.com/reactivemarbles/DynamicData/issues/787
[Fact]
public void LetsSeeWhetherWeCanRandomlyHitADifferentRaceCondition()
{
using var leftSource = new SourceCache<Thing, long>(thing => thing.Id);
using var rightSource = new SourceCache<Thing, long>(thing => thing.Id);

var resultStream = ObservableCacheEx.InnerJoin(
left: leftSource.Connect(),
right: rightSource.Connect(),
rightKeySelector: rightThing => rightThing.Id,
(keys, leftThing, rightThing) => new Thing()
{
Id = keys.leftKey,
Name = $"{leftThing.Name} x {rightThing.Name}"
});

using var leftThingGenerator = BeginGeneratingThings(leftSource, "Left");
using var rightThingGenerator = BeginGeneratingThings(rightSource, "Left");

for (var i = 0; i < 100; ++i)
{
using var subscription = resultStream.Subscribe();
}

IDisposable BeginGeneratingThings(SourceCache<Thing, long> source, string namePrefix)
// Generate items infinitely. The runtime of the test is limited by the .Subscribe() loop.
=> Observable.Range(1, int.MaxValue, ThreadPoolScheduler.Instance)
.Subscribe(id =>
{
source.AddOrUpdate(new Thing()
{
Id = id,
Name = $"{namePrefix}Thing #{id}"
});
// Start removing items after the first 100, to keep the overhead of calling .Subscribe() down.
source.RemoveKey(id - 100);
});
}

public class Thing
{
public long Id { get; set; }
Expand Down
5 changes: 4 additions & 1 deletion src/DynamicData/Cache/Internal/FullJoin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
return joinedCache.CaptureChanges();
});

return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache);
lock (locker)
{
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache);
}
});
}
9 changes: 6 additions & 3 deletions src/DynamicData/Cache/Internal/InnerJoin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ internal class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination>(IObse
{
foreach (var change in changes.ToConcreteType())
{
var leftCurent = change.Current;
var leftCurrent = change.Current;
var rightLookup = rightGrouped.Lookup(change.Key);

if (rightLookup.HasValue)
Expand All @@ -52,7 +52,7 @@ internal class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination>(IObse
case ChangeReason.Update:
foreach (var keyvalue in rightLookup.Value.KeyValues)
{
joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurent, keyvalue.Value), (change.Key, keyvalue.Key));
joinedCache.AddOrUpdate(_resultSelector((change.Key, keyvalue.Key), leftCurrent, keyvalue.Value), (change.Key, keyvalue.Key));
}

break;
Expand Down Expand Up @@ -117,6 +117,9 @@ internal class InnerJoin<TLeft, TLeftKey, TRight, TRightKey, TDestination>(IObse
return joinedCache.CaptureChanges();
});

return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect());
lock (locker)
{
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache, rightShare.Connect());
}
});
}
5 changes: 4 additions & 1 deletion src/DynamicData/Cache/Internal/LeftJoin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public IObservable<IChangeSet<TDestination, TLeftKey>> Run() => Observable.Creat
return joined.CaptureChanges();
});

return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache);
lock (locker)
{
return new CompositeDisposable(leftLoader.Merge(rightLoader).SubscribeSafe(observer), leftCache, rightCache);
}
});
}

0 comments on commit 67a8e74

Please sign in to comment.