Skip to content

Commit

Permalink
Replace websocket gem - old one has issues with ws.on :open firing un…
Browse files Browse the repository at this point in the history
  • Loading branch information
anthotsang committed Jul 31, 2023
1 parent 8e304fa commit bc8bb5a
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 46 deletions.
13 changes: 8 additions & 5 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@ PATH
bigdecimal (~> 2.0.0)
eventmachine (~> 1.2)
thread (~> 0.2.2)
websocket-client-simple (~> 0.3)
websocket-eventmachine-client (~> 1.3)

GEM
remote: https://rubygems.org/
specs:
bigdecimal (2.0.3)
event_emitter (0.2.6)
eventmachine (1.2.7)
eventmachine (1.2.7-x64-mingw32)
thread (0.2.2)
websocket (1.2.9)
websocket-client-simple (0.6.1)
event_emitter
websocket
websocket-eventmachine-base (1.2.0)
eventmachine (~> 1.0)
websocket (~> 1.0)
websocket-native (~> 1.0)
websocket-eventmachine-client (1.3.0)
websocket-eventmachine-base (~> 1.0)
websocket-native (1.0.0)

PLATFORMS
ruby
Expand Down
2 changes: 1 addition & 1 deletion intrinio-realtime.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]

spec.add_dependency "eventmachine", '~> 1.2'
spec.add_dependency "websocket-client-simple", '~> 0.3'
spec.add_dependency "websocket-eventmachine-client", '~> 1.3'
spec.add_dependency "thread", '~> 0.2.2'
spec.add_dependency "bigdecimal", '~> 2.0.0'
end
79 changes: 39 additions & 40 deletions lib/intrinio-realtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#require 'http'
require 'net/http'
require 'eventmachine'
require 'websocket-client-simple'
require 'websocket-eventmachine-client'

module Intrinio
module Realtime
Expand Down Expand Up @@ -144,7 +144,7 @@ def initialize(options, on_trade, on_quote)

@api_key = options[:api_key]
raise "API Key was formatted invalidly." if @api_key && !valid_api_key?(@api_key)

unless @api_key
@username = options[:username]
@password = options[:password]
Expand Down Expand Up @@ -178,7 +178,7 @@ def initialize(options, on_trade, on_quote)
raise "Invalid channels to join: #{bad_channels}" unless bad_channels.empty?

if options[:logger] == false
@logger = nil
@logger = nil
elsif !options[:logger].nil?
@logger = options[:logger]
else
Expand All @@ -196,30 +196,30 @@ def initialize(options, on_trade, on_quote)
def provider
@provider
end

def join(*channels)
channels = parse_channels(channels)
nonconforming = channels.select{|x| !x.is_a?(String)}
return error("Invalid channels to join: #{nonconforming}") unless nonconforming.empty?

@channels.concat(channels)
@channels.uniq!
debug "Joining channels #{channels}"

refresh_channels()
end

def leave(*channels)
channels = parse_channels(channels)
nonconforming = channels.find{|x| !x.is_a?(String)}
return error("Invalid channels to leave: #{nonconforming}") unless nonconforming.empty?

channels.each{|c| @channels.delete(c)}
debug "Leaving channels #{channels}"

refresh_channels()
end

def leave_all
@channels = []
debug "Leaving all channels"
Expand All @@ -230,7 +230,7 @@ def connect
raise "Must be run from within an EventMachine run loop" unless EM.reactor_running?
return warn("Already connected!") if @ready
debug "Connecting..."

catch :fatal do
begin
@closing = false
Expand All @@ -243,7 +243,7 @@ def connect
end
end
end

def disconnect
EM.cancel_timer(@selfheal_timer) if @selfheal_timer
@ready = false
Expand Down Expand Up @@ -272,7 +272,7 @@ def on_trade(on_trade)
def on_quote(on_quote)
@on_quote = on_quote
end

private

def queue_message(message)
Expand Down Expand Up @@ -417,11 +417,11 @@ def refresh_token
@token = response.body
debug "Token refreshed"
end
def auth_url

def auth_url
url = ""

case @provider
case @provider
when REALTIME then url = "https://realtime-mx.intrinio.com/auth"
when DELAYED_SIP then url = "https://realtime-delayed-sip.intrinio.com/auth"
when NASDAQ_BASIC then url = "https://realtime-nasdaq-basic.intrinio.com/auth"
Expand All @@ -443,7 +443,7 @@ def api_auth_url(url)
"#{url}api_key=#{@api_key}"
end

def socket_url
def socket_url
case @provider
when REALTIME then "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
when DELAYED_SIP then "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
Expand Down Expand Up @@ -476,11 +476,11 @@ def refresh_websocket
headers[:headers] = {}
headers[CLIENT_INFO_HEADER_KEY] = CLIENT_INFO_HEADER_VALUE
headers[MESSAGE_VERSION_HEADER_KEY] = MESSAGE_VERSION_HEADER_VALUE
@ws = ws = WebSocket::Client::Simple.connect(socket_url, headers)
@ws = ws = WebSocket::EventMachine::Client.connect(uri: socket_url, headers: headers)

me.send :info, "Connection opening"

ws.on :open do
ws.onopen do
me.send :info, "Connection established"
me.send :ready, true
if PROVIDERS.include?(me.send(:provider))
Expand All @@ -489,8 +489,7 @@ def refresh_websocket
me.send :stop_self_heal
end

ws.on :message do |frame|
data_message = frame.data
ws.onmessage do |data_message|
#me.send :debug, "Message: #{data_message}"
begin
unless data_message.nil?
Expand All @@ -500,24 +499,24 @@ def refresh_websocket
me.send :error, "Error adding message to queue: #{data_message} #{e}"
end
end
ws.on :close do |e|

ws.onclose do |code, reason|
me.send :ready, false
me.send :info, "Connection closing...: #{e}"
me.send :info, "Connection closing...: #{reason}"
me.send :try_self_heal
end

ws.on :error do |e|
ws.onerror do |e|
me.send :ready, false
me.send :error, "Connection error: #{e}"
me.send :try_self_heal
end
end

def refresh_channels
return unless @ready
debug "Refreshing channels"

# Join new channels
new_channels = @channels - @joined_channels
new_channels.each do |channel|
Expand All @@ -527,7 +526,7 @@ def refresh_channels
@ws.send(msg)
info "Joined #{channel}"
end

# Leave old channels
old_channels = @joined_channels - @channels
old_channels.each do |channel|
Expand All @@ -537,31 +536,31 @@ def refresh_channels
@ws.send(msg)
info "Left #{channel}"
end

@channels.uniq!
@joined_channels = Array.new(@channels)
debug "Current channels: #{@channels}"
end

def try_self_heal
return if @closing
debug "Attempting to self-heal"

time = @selfheal_backoffs.first
@selfheal_backoffs.delete_at(0) if @selfheal_backoffs.count > 1

EM.cancel_timer(@selfheal_timer) if @selfheal_timer

@selfheal_timer = EM.add_timer(time/1000) do
connect()
end
end

def stop_self_heal
EM.cancel_timer(@selfheal_timer) if @selfheal_timer
@selfheal_backoffs = Array.new(SELF_HEAL_BACKOFFS)
end

def ready(val)
@ready = val
end
Expand All @@ -571,34 +570,34 @@ def debug(message)
@logger.debug(message) rescue
nil
end

def info(message)
message = "IntrinioRealtime | #{message}"
@logger.info(message) rescue
nil
end

def error(message)
message = "IntrinioRealtime | #{message}"
@logger.error(message) rescue
nil
end

def fatal(message)
message = "IntrinioRealtime | #{message}"
@logger.fatal(message) rescue
EM.stop_event_loop
throw :fatal
nil
end

def parse_channels(channels)
channels.flatten!
channels.uniq!
channels.compact!
channels
end

def join_binary_message(channel)
if (channel == "lobby") && (@trades_only == false)
return [74, 0, 36, 70, 73, 82, 69, 72, 79, 83, 69].pack('C*') #74, not trades only, "$FIREHOSE"
Expand Down

0 comments on commit bc8bb5a

Please sign in to comment.