Skip to content
This repository has been archived by the owner on Dec 7, 2018. It is now read-only.

Blocking Pub/Sub (Is this gem dead?) #22

Open
eppdot opened this issue May 8, 2017 · 2 comments
Open

Blocking Pub/Sub (Is this gem dead?) #22

eppdot opened this issue May 8, 2017 · 2 comments

Comments

@eppdot
Copy link

eppdot commented May 8, 2017

I want to subscribe to redis, but it seems that the blocking redis subscribe blocks the whole actor. Thus, I tried celluloid-redis, but it seems I dont get it working. The following actor blocks inside subscribe and does not handle TerminationRequests within its Mailbox.

  require 'celluloid/redis'

  class RedisObserver
    include Celluloid

    finalizer :unsubscribe

    def initialize
      @redis = ::Redis.new(driver: :celluloid)

      @redis.config(:set, "notify-keyspace-events", "EA")

      after(1) { async.subscribe }

      puts "initialized"
    end

    def subscribe
      puts "starting"

      @redis.psubscribe("__key*__:*") do |on|
        on.psubscribe do |pattern, total|
          puts "Subscribed to ##{pattern} (#{total} subscriptions)"
        end

        on.pmessage do |pattern, channel, message|
          puts "#{pattern} ##{channel}: #{message}"
        end

        on.punsubscribe do |pattern, total|
          puts "Unsubscribed from ##{pattern} (#{total} subscriptions)"
        end
      end

      puts "started"
    end

    def unsubscribe
      puts "unsubscribe"
      begin
        @redis.unsubscribe
      rescue
        puts $!
      end
    end
  end
@eppdot eppdot changed the title Is this gem dead? Blocking Pub/Sub (Is this gem dead?) May 8, 2017
@tarcieri
Copy link
Member

tarcieri commented May 8, 2017

To answer your original "Is this gem dead?" question, yes this gem is not actively maintained anymore. Perhaps that should be reflected in the README.

@eppdot
Copy link
Author

eppdot commented May 8, 2017

@tarcieri thank you very much. What would you say is the best way to have an actor that subscribes to redis pubsub. Right now, I start a custom Thread that subscribes to redis - which is blocking the thread. e.g.

  class RedisObserver
    include Celluloid
    finalizer :stop

    def initialize
      @redis = Redis.new
      @redis.config(:set, "notify-keyspace-events", "EA")

      after(1) { async.start }
    end

    def start
      return if @thread

      puts "starting"
      @thread = Thread.new do
        puts "Subscribe"
        @redis.psubscribe("__key*__:*") do |on|
          on.psubscribe do |pattern, total|
            puts "Subscribed to ##{pattern} (#{total} subscriptions)"
          end

          on.pmessage do |pattern, channel, message|
            puts "#{pattern} ##{channel}: #{message}"
          end

          on.punsubscribe do |pattern, total|
            puts "Unsubscribed from ##{pattern} (#{total} subscriptions)"
          end
        end
        puts "subscription terminated"
      end
      @thread.abort_on_exception = true

      puts "started"
    end

    def stop
      begin
        puts "close redis connection"
        # Thread.kill @thread
        @redis.close
        puts "punsubscribed"
      rescue => e
        puts "ERRRRRRRROR #{e}"
      end
    end
  end

When I kill the thread at the end, redis server still has an open connection - thus @redis is not being garbage collected correctly, I guess. When I call @redis.close I receive once again the on.psubscribe hook inside the thread, thus I guess the redis client tries to resubscribe to the server. The only chance to make a correct unsubscribe is to listen to a side channel (redis-observer-cmd) and publish an please-exit event that is then handled within the thread to unsubscribe as described in the redis docs - because the thread synchronizes/locks the redis client within the psubscribe.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants