Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OrleansMessageRejection exception and Orleans stream messages stuck in azure storage queue #8540

Closed
iamsamcoder opened this issue Jul 10, 2023 · 18 comments
Assignees

Comments

@iamsamcoder
Copy link

I've encountered this a couple times in the last 1.5 weeks. I'll deploy a new revision of my Orleans application and within a couple days silos will become unavailable and messages will be undeliverable on some instances. The problematic silos will not recover and I have to restart the cluster to resolve this issue.

When 1 or more of 9 silos get in this state where grain messages can't be delivered, then Orleans stream messages pushed to the queue will also get stuck until I restart the cluster (container app environment). The issue may have started shortly after the last deployment. The last few times this issue occurred, it seemed to follow shortly after the new release.

I'd appreciate some further guidance on tracking down the issue here.

Here are some further observations:

  • Silo running in Azure container app environment
  • Last revision was deployed 2023-07-07T20:24:08z
  • Very little load over weekend, then Sunday night for seemingly no reason, silos are terminating and messages can't be delivered from Orleans storage queue.
  • Running Orleans 7.1.2
  • Silo exits with code 1
  • container logs
    • 2023-07-10T05:32:23.4393447Z
      • message: Container silo failed liveness probe, will be restarted
    • 2023-07-10T05:34:30.094215Z
      • message: Container 'silo' was terminated with exit code '1'
  • The most closely related exeptions
    • 2023-07-10T05:34:20.2993575Z -- Orleans.Runtime.OrleansMessageRejectionException
      • message:
      Exception while sending message: Orleans.Runtime.Messaging.ConnectionFailedException: Unable to connect to endpoint S100.100.0.120:11111:47972033. See InnerException
       ---> Orleans.Networking.Shared.SocketConnectionException: Unable to connect to 100.100.0.120:11111. Error: ConnectionRefused
         at Orleans.Networking.Shared.SocketConnectionFactory.ConnectAsync(EndPoint endpoint, CancellationToken cancellationToken) in /_/src/Orleans.Core/Networking/Shared/SocketConnectionFactory.cs:line 54
         at Orleans.Runtime.Messaging.ConnectionFactory.ConnectAsync(SiloAddress address, CancellationToken cancellationToken) in /_/src/Orleans.Core/Networking/ConnectionFactory.cs:line 61
         at Orleans.Runtime.Messaging.ConnectionManager.ConnectAsync(SiloAddress address, ConnectionEntry entry) in /_/src/Orleans.Core/Networking/ConnectionManager.cs:line 228
         --- End of inner exception stack trace ---
         at Orleans.Runtime.Messaging.ConnectionManager.ConnectAsync(SiloAddress address, ConnectionEntry entry) in /_/src/Orleans.Core/Networking/ConnectionManager.cs:line 228
         at Orleans.Runtime.Messaging.ConnectionManager.GetConnectionAsync(SiloAddress endpoint) in /_/src/Orleans.Core/Networking/ConnectionManager.cs:line 108
         at Orleans.Runtime.Messaging.MessageCenter.<SendMessage>g__SendAsync|30_0(MessageCenter messageCenter, ValueTask`1 connectionTask, Message msg) in /_/src/Orleans.Runtime/Messaging/MessageCenter.cs:line 231
      
    • 2023-07-10T05:34:23.1327681Z -- System.ObjectDisposedException at Orleans.Serialization.Serializers.CodecProvider.GetServiceOrCreateInstance
      • message:
      Cannot access a disposed object.
      Object name: 'IServiceProvider'.
      
    • 2023-07-10T05:37:08.2696465Z -- System.InvalidOperationException at Orleans.Runtime.ActivationData.StartDeactivating
      • message:
      Calling DeactivateOnIdle from within OnActivateAsync is not supported
      

As of 2023-07-10T20:48:15.9391397Z still seeing Orleans.Runtime.OrleansMessageRejectionException and there are 34k orleans messages stuck in queue-1.

@ghost ghost added the Needs: triage 🔍 label Jul 10, 2023
@iamsamcoder
Copy link
Author

iamsamcoder commented Jul 14, 2023

I have been able to make further observations on another incident of delayed processing of Orleans stream messages.

Today, I observed that 1 of 18 silos was unable to activate grains to process pubsub messages from Orleans streams (azure storage queue). From the time the silo started until it was shutdown (scaling on a schedule), it was not able to process an Orleans stream message. The other 17 silos processed stream messages, only this one could not.

This 1 silo was able to process Orleans grain messages coming from other silos or clients. Once this silo was shut down another silo was able to process the stream messages.

I'm attaching logs of the OrleansMessageRejection exceptions. Again, I'm running Orleans in Azure container apps on version 7.1.2.
silo-exceptions-delayed-stream-messages-23-07-14.csv

Here's a sample message for one of the OrleansMessageRejection errors

Forwarding failed: tried to forward message Request [S100.100.0.111:11111:48337284 sys.svc.stream.agent/100.100.0.111:11111@48337284+AzureQueueProvider_1_azurequeueprovider-6-0xC0000006]->[S100.100.0.7:11111:48250883 pubsubrendezvous/AzureQueueProvider/ProjectionStream/8c24921ff0ee4620a95f05813e48fb14] Orleans.Streams.IPubSubRendezvousGrainOrleans.Streams.IPubSubRendezvousGrain.RegisterProducer(AzureQueueProvider/ProjectionStream/8c24921ff0ee4620a95f05813e48fb14, sys.svc.stream.agent/100.100.0.111:11111@48337284+AzureQueueProvider_1_azurequeueprovider-6-0xC0000006) #7342[ForwardCount=2] for 2 times after "Failed to register activation in grain directory." to invalid activation. Rejecting now. 

@ReubenBond
Copy link
Member

Are you able to share your configuration?

@iamsamcoder
Copy link
Author

Thanks @ReubenBond !

Here's my Orleans configuration:

public static class OrleansStartup
    {
        public static IHostBuilder UseOrleans2(this IHostBuilder hostBuilder)
        {
            return hostBuilder.UseOrleans((ctx, siloBuilder) =>
            {
                string AZURE_STORAGE_CONNECTION_STRING = ctx.Configuration["AZURE_STORAGE_CONNECTION_STRING"];

                Log.Logger.Debug($"Configure Orleans with azure storage account: {AZURE_STORAGE_CONNECTION_STRING}");

                siloBuilder.Services.AddSerializer(b => b.CommonSiteDocsSerialization());

                var collectionAgeMinutes = GetCollectionAgeMinutes(ctx);
                Log.Logger.Information("Setting grain collection age to {CollectionAgeMinutes} seconds", collectionAgeMinutes);

                siloBuilder.Configure<GrainCollectionOptions>(options =>
                {
                    // docs here: http://sergeybykov.github.io/orleans/Documentation/clusters_and_clients/configuration_guide/activation_garbage_collection.html
                    // set the value of CollectionAge for all grains
                    options.CollectionAge = TimeSpan.FromSeconds(collectionAgeMinutes);
                });

                SetupOrleansStreams(siloBuilder, AZURE_STORAGE_CONNECTION_STRING, ctx.Configuration);
                SetupCommonOrleans(siloBuilder, AZURE_STORAGE_CONNECTION_STRING);

                if (!ctx.HostingEnvironment.IsProduction())
                {
                    var testIp = ctx.Configuration["testIp"];
                    SetupOrleansDevelopment(siloBuilder, testIp, ctx.Configuration);
                }
                else
                {
                    Log.Logger.Information("Initializing Orleans in non-development mode");

                    siloBuilder
                        .Configure<ClusterOptions>(options =>
                        {
                            options.ClusterId = ctx.Configuration["OrleansClusterId"] ?? "Cluster";
                            options.ServiceId = "Service";
                            Log.Logger.Information("Initializing cluster with {ServiceId} and {ClusterId}", options.ServiceId, options.ClusterId);
                        })
                        .ConfigureEndpoints(
                            hostname: Dns.GetHostName(),
                            siloPort: 11_111, 
                            gatewayPort: 30_000)
                        .ConfigureLogging(logging =>
                        {
                            logging.AddConsole(options => options.LogToStandardErrorThreshold = LogLevel.Error);
                            logging.AddSerilog();
                        });
                }

            });
        }

        private static int GetCollectionAgeMinutes(HostBuilderContext ctx)
        {
            // The minimum must be > 60 seconds according to Orleans docs.
            var collectionAge = ctx.Configuration["GrainCollectionAgeSeconds"] ?? "300";
            if(int.TryParse(collectionAge, out int collectionAgeSeconds))
            {
                return collectionAgeSeconds > 60 ? collectionAgeSeconds : 65;
            }
            ;
            return 300; // default to 5 minutes
        }

        private static void SetupCommonOrleans(ISiloBuilder siloBuilder, string AZURE_STORAGE_CONNECTION_STRING)
        {
            siloBuilder
                .UseAzureStorageClustering(opts => opts.ConfigureTableServiceClient(AZURE_STORAGE_CONNECTION_STRING))
                .AddAzureBlobGrainStorageAsDefault(options =>
                {
                    options.ConfigureBlobServiceClient(AZURE_STORAGE_CONNECTION_STRING);
                })
                .AddAzureBlobGrainStorage(name: "documentIntegrationState",
                    configureOptions: options =>
                    {
                        options.ConfigureBlobServiceClient(AZURE_STORAGE_CONNECTION_STRING);
                    })
                .AddAzureBlobGrainStorage(name: "projectionState",
                    configureOptions: options =>
                    {
                        options.ConfigureBlobServiceClient(AZURE_STORAGE_CONNECTION_STRING);
                    })
                .AddLogStorageBasedLogConsistencyProvider("LogStorage");
        }

        private static void SetupOrleansDevelopment(ISiloBuilder siloBuilder, string testIp, IConfiguration config)
        {
            Log.Logger.Information("Initializing Orleans in development mode");
            siloBuilder
                .Configure<ClusterOptions>(options =>
                {
                    options.ClusterId = ctx.Configuration["OrleansClusterId"] ?? "Cluster";
                    options.ServiceId = "Service";
                    Log.Logger.Information("Initializing cluster with {ServiceId} and {ClusterId}", options.ServiceId, options.ClusterId);
                })
                .Configure<EndpointOptions>(options =>
                {
                    // Port to use for Silo-to-Silo
                    options.SiloPort = 11111;
                    // Port to use for the gateway
                    options.GatewayPort = 30000;
                    // IP Address to advertise in the cluster
                    options.AdvertisedIPAddress = IPAddress.Parse(testIp);
                    // The socket used for silo-to-silo will bind to this endpoint
                    options.GatewayListeningEndpoint = new IPEndPoint(IPAddress.Any, 30000);
                    // The socket used by the gateway will bind to this endpoint
                    options.SiloListeningEndpoint = new IPEndPoint(IPAddress.Any, 11111);
                })
                ;
        }

        private static void SetupOrleansStreams(ISiloBuilder siloBuilder, string AZURE_STORAGE_CONNECTION_STRING, IConfiguration config)
        {
            siloBuilder.AddAzureQueueStreams("AzureQueueProvider", configurator =>
            {
                configurator.ConfigureAzureQueue(builder => builder.Configure(options =>
                {
                    options.CommonStreamQueueOptions(AZURE_STORAGE_CONNECTION_STRING, config);
                }));
                configurator.ConfigureCacheSize(1024);
                configurator.ConfigurePullingAgent(ob => ob.Configure(options =>
                {
                    options.GetQueueMsgsTimerPeriod = TimeSpan.FromMilliseconds(200);
                    options.BatchContainerBatchSize = 256;
                }));
            });
            siloBuilder.AddAzureBlobGrainStorage("PubSubStore",
                options => options.ConfigureBlobServiceClient(AZURE_STORAGE_CONNECTION_STRING));
        }

    }

@ReubenBond
Copy link
Member

Nothing jumps out at me, maybe @benjaminpetit will have an idea when he gets back.

Do you know why DeactivateOnIdle is being called during activation (refering to the original message)?

The serialization exception looks like it occurred after the silo was shutdown

@iamsamcoder
Copy link
Author

Thanks @ReubenBond ! I'm not sure why that DeactivateOnIdle was called. However, we have not seen it again. We continue to see Orleans.Runtime.OrleansMessageRejectionException exceptions starting when our cluster scales up from 6 to 18 silos on a schedule based scale rule. The exceptions persist until the cluster scales down at the end of the scheduled scale rule.

This seem to only impact Orleans streams (Azure storage stream provider).

@benjaminpetit
Copy link
Member

It seems that one silo wa still trying to get information on another silo that wasn't alive anymore. Directory cache poisoning?

@iamsamcoder
Copy link
Author

Hi @benjaminpetit,
Are your referring to the DeactivateOnIdle error as due to directory cache poisoning or the MessageRejectionException?

To add further context, we only see MessageRejectionException errors during scale out of our cluster. We have a custom scale provider that scales out on a schedule to meet our peak load. The schedule is for 4 hours. Once the cluster scales back in, the MessageRejectionException errors go away and the stream messages stuck in storage queues are processed.

These message rejection exceptions are only happening for some of the Orleans stream messages, not the standard grain messages from a cluster client or from other grains.

I'm trying to determine where the error is triggered. It may be our grain code, but the grains activate and handle the stream message after the cluster scales back in to 6, from 18 at the peak. Does the MessageRejectionException get triggered before or during our grain OnActivate code?

The message rejection exception message we are seeing most often is

Forwarding failed: tried to forward message Request [S100.100.0.109:11111:49546881 sys.svc.stream.agent/100.100.0.109:11111@49546881+AzureQueueProvider_1_azurequeueprovider-3-0x60000003]->[S100.100.0.159:11111:49546816 pubsubrendezvous/AzureQueueProvider/ProjectionStream/1fc0b5415e5f476d961e4f773ca272d9] Orleans.Streams.IPubSubRendezvousGrain.RegisterProducer(AzureQueueProvider/ProjectionStream/1fc0b5415e5f476d961e4f773ca272d9, sys.svc.stream.agent/100.100.0.109:11111@49546881+AzureQueueProvider_1_azurequeueprovider-3-0x60000003) #17300[ForwardCount=2] for 2 times after "Failed to register activation in grain directory." to invalid activation. Rejecting now.

@benjaminpetit
Copy link
Member

I was reffering about the MessageRejectionException.

The streaming infrastructure is using some internal grains, called PubSubRendezVousGrain. Here is seems the directory is in a bad state, and the cluster isn't able to create a new activation of the PubSubRendezVousGrain for some streams.

It would be interesting to see if you have more directory related logs.

Also when you scale your cluster up, do you see some silo dying in the meantime?

@iamsamcoder
Copy link
Author

Thanks @benjaminpetit.

Where would we find more directory logs? What setting would we need to see those? I'm anticipating they are debug level?

We do not see silos dying while scaling up, but it does take time for the nodes to be initialized.

@iamsamcoder
Copy link
Author

Hi @benjaminpetit,

We continue to see these Orleans.Runtime.OrleansMessageRejectionException and only related to the Orleans.Streams.IPubSubRendezvousGrain. It only occurs after an automated scaling up of silo instances, then it resolves after scaling back down.

This causes delays for processing mission critical messages. We are developing an alternative solution that migrates processes depending on Orleans streams to azure functions, but hoping we can find a solution to this.

Can you provide any guidance on this? Do you know of other users that have Orleans streams reliability issues with clusters that periodically scale up and down?

Thank you very much!

@tanordheim
Copy link

I believe I just saw this in one of our clusters as well. We did not have an auto scale event, this just happened out of the blue with a sudden spike in silo failures/restarts and the message mentioned by @iamsamcoder being logged repeatedly. I saw no clear reason as to why, and didn't know about this issue so I did a full restart of all deployments involved in the cluster which seems to have remediated the issue for us.

Looking at our telemetry afterwards it look like there was a problem with a single queue; we run SQS streaming with 4 queues for the affected stream provider and we only saw one of those queues get backlogged during the incident. Once restarted, all remaining messages in that queue was delivered.

@tanordheim
Copy link

tanordheim commented Oct 11, 2023

After some digging today we are definitely bitten by this. We see quite often that streams start failing with this kind of error, unrelated to scaleups/downs or rolling deployments. I don't know right now if we have had any other types of silo failures around that time causing a silo restart, I'll continue digging tomorrow.

What is consistent is that once a stream starts logging this is can never recover without restarting the silo that owns the stream/queue. This always fixes it, the silo that starts taking over the responsibility chugs along just fine.

Errors we typically see when this starts happening is:

RegisterAsStreamProducer failed

Orleans.Runtime.OrleansMessageRejectionException: Forwarding failed: tried to forward message Request [S10.36.26.146:11111:56055436 sys.client/hosted-10.36.26.146:11111@56055436]->[S10.36.21.240:11111:56054790 pubsubrendezvous/internal-charger-states-stream/device:charger-state/2e467989e70240a09b955e9a2fb11a5a] Orleans.Streams.IPubSubRendezvousGrain.RegisterProducer(internal-charger-states-stream/device:charger-state/2e467989e70240a09b955e9a2fb11a5a, sys.svc.stream.agent/10.36.26.146:11111@56055436+internal-charger-states-stream_1_internal-charger-states-stream-8-0xAAAAAAB0) #84373[ForwardCount=2] for 2 times after "Failed to register activation in grain directory." to invalid activation. Rejecting now. 
   at Orleans.Serialization.Invocation.ResponseCompletionSource.GetResult(Int16 token) in /_/src/Orleans.Serialization/Invocation/ResponseCompletionSource.cs:line 90
...omitted Polly retry grain call filter stack frames...
   at Orleans.Runtime.OutgoingCallInvoker`1.Invoke() in /_/src/Orleans.Core/Runtime/OutgoingCallInvoker.cs:line 129
   at Orleans.Runtime.ActivityPropagationGrainCallFilter.Process(IGrainCallContext context, Activity activity) in /_/src/Orleans.Core/Diagnostics/ActivityPropagationGrainCallFilter.cs:line 75
   at Orleans.Runtime.OutgoingCallInvoker`1.Invoke() in /_/src/Orleans.Core/Runtime/OutgoingCallInvoker.cs:line 129
   at Orleans.Runtime.GrainReferenceRuntime.InvokeMethodWithFiltersAsync[TResult](GrainReference reference, IInvokable request, InvokeMethodOptions options) in /_/src/Orleans.Core/Runtime/GrainReferenceRuntime.cs:line 93
   at Orleans.Streams.StreamPubSubImpl.RegisterProducer(QualifiedStreamId streamId, GrainId streamProducer) in /_/src/Orleans.Streaming/PubSub/StreamPubSubImpl.cs:line 37
   at Orleans.Streams.PersistentStreamPullingAgent.PubsubRegisterProducer(IStreamPubSub pubSub, QualifiedStreamId streamId, GrainId meAsStreamProducer, ILogger logger) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 827

As it currently stands I can't find a good way to detect this has happened from within the silo so I could terminate it and have it restart, so we are relying on manual monitoring for unblocking queues that provide important data to our grains.

We could consider swapping to a Redis grain directory here instead if this is suspected to be a grain directory issue. Though looking at #8632 that seems to not be straight forward as - as far as I understand - unless you can add an [GrainDirectory] annotation on the grain you need to replace the default directory.

Its also worth noticing we have millions of concurrent grains and the only grains we ever really see the "Failed to register activation in grain directory." outside of a silo crash/rolling restart - and then only as transient errors where retries succeed - is the PubSubRendevouz grain.

@tanordheim
Copy link

I don't want to prematurely celebrate, but I think this problem went away when I changed the grain directory of the IPubSubRendezvousGrain to use Redis instead of the default directory. We have been seeing this issue many times a day since we started pushing anything but test data through our streams, but after changing the grain directory this has not occurred a single time today.

Due to #8632 this wasn't super straight forward but I managed to solve it by adding a named Redis grain directory and registering a custom Orleans.Runtime.GrainDirectory.IGrainDirectoryResolver in the service collection, like this:

public class PubSubRendezvousGrainUseRedisGrainDirectoryResolver : IGrainDirectoryResolver
{
    private readonly IServiceProvider _services;

    public PubSubRendezvousGrainUseRedisGrainDirectoryResolver(IServiceProvider services)
    {
        _services = services;
    }

    public bool TryResolveGrainDirectory(GrainType grainType, GrainProperties properties, out IGrainDirectory grainDirectory)
    {
        if (grainType.Value.ToString() != "pubsubrendezvous")
        {
            grainDirectory = default!;
            return false;
        }

        grainDirectory = _services.GetRequiredServiceByName<IGrainDirectory>("redis-grain-directory");
        return true;
    }
}

services.AddSingleton<IGrainDirectoryResolver, PubSubRendezvousGrainUseRedisGrainDirectoryResolver>();

I guess this points to it being a grain directory issue and not directly a streaming issue, but as already mentioned the only grain in our cluster having this issue was the IPubSubRendezvousGrain.

@iamsamcoder
Copy link
Author

@tanordheim That is great to hear! I hope it has resolved the issue.

We didn't try a different directory for pubsub grains. That is interesting. We were in prod and this issue was causing significant delays so we migrated off Orleans streams to use servicebus and azure function apps.

This is helpful to know for the future, perhaps we can try Orleans streams again. Thank you for sharing!

@oising
Copy link
Contributor

oising commented Oct 13, 2023

Just an aside, but this looks suspicious... @iamsamcoder

siloBuilder.Configure<GrainCollectionOptions>(options =>
                {
                    // docs here: http://sergeybykov.github.io/orleans/Documentation/clusters_and_clients/configuration_guide/activation_garbage_collection.html
                    // set the value of CollectionAge for all grains
                    options.CollectionAge = TimeSpan.FromSeconds(collectionAgeMinutes);
                })

FromSeconds against collectionAgeMinutes? This could contribute to a very spammy DHT

@iamsamcoder
Copy link
Author

Good observation @oising ! Naming should have been refactored. I've confirmed that my default of 5 mins (300 s) is being used. Would that still be a concern for a spammy DHT?

We lowered the collection age because memory was becoming an issue for us. However, we plan to increase our machine resources.

@ReubenBond
Copy link
Member

We've merged #8696 & #8704 to fix this issue. We will create a release shortly.

@ReubenBond ReubenBond self-assigned this Nov 3, 2023
@ReubenBond
Copy link
Member

The v7.2.3 release which aims to fix this is now available, so I will close this but please open a new issue and reference this if you still encounter this issue: https://github.com/dotnet/orleans/releases/tag/v7.2.3

@github-actions github-actions bot locked and limited conversation to collaborators Dec 4, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

5 participants