From bc8bb5a535a37348d7cda18ec41a74dd20b4979f Mon Sep 17 00:00:00 2001 From: Anthony Tsang Date: Tue, 1 Aug 2023 00:06:34 +0900 Subject: [PATCH] Replace websocket gem - old one has issues with ws.on :open firing unreliably (see: https://github.com/ruby-jp/websocket-client-simple/issues/17) --- Gemfile.lock | 13 ++++--- intrinio-realtime.gemspec | 2 +- lib/intrinio-realtime.rb | 79 +++++++++++++++++++-------------------- 3 files changed, 48 insertions(+), 46 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 9b13fe6..8a3f3c0 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -5,20 +5,23 @@ PATH bigdecimal (~> 2.0.0) eventmachine (~> 1.2) thread (~> 0.2.2) - websocket-client-simple (~> 0.3) + websocket-eventmachine-client (~> 1.3) GEM remote: https://rubygems.org/ specs: bigdecimal (2.0.3) - event_emitter (0.2.6) eventmachine (1.2.7) eventmachine (1.2.7-x64-mingw32) thread (0.2.2) websocket (1.2.9) - websocket-client-simple (0.6.1) - event_emitter - websocket + websocket-eventmachine-base (1.2.0) + eventmachine (~> 1.0) + websocket (~> 1.0) + websocket-native (~> 1.0) + websocket-eventmachine-client (1.3.0) + websocket-eventmachine-base (~> 1.0) + websocket-native (1.0.0) PLATFORMS ruby diff --git a/intrinio-realtime.gemspec b/intrinio-realtime.gemspec index ac780c4..e87c10a 100644 --- a/intrinio-realtime.gemspec +++ b/intrinio-realtime.gemspec @@ -14,7 +14,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.add_dependency "eventmachine", '~> 1.2' - spec.add_dependency "websocket-client-simple", '~> 0.3' + spec.add_dependency "websocket-eventmachine-client", '~> 1.3' spec.add_dependency "thread", '~> 0.2.2' spec.add_dependency "bigdecimal", '~> 2.0.0' end diff --git a/lib/intrinio-realtime.rb b/lib/intrinio-realtime.rb index c074eaf..d6654b5 100644 --- a/lib/intrinio-realtime.rb +++ b/lib/intrinio-realtime.rb @@ -3,7 +3,7 @@ #require 'http' require 'net/http' require 'eventmachine' -require 'websocket-client-simple' +require 'websocket-eventmachine-client' module Intrinio module Realtime @@ -144,7 +144,7 @@ def initialize(options, on_trade, on_quote) @api_key = options[:api_key] raise "API Key was formatted invalidly." if @api_key && !valid_api_key?(@api_key) - + unless @api_key @username = options[:username] @password = options[:password] @@ -178,7 +178,7 @@ def initialize(options, on_trade, on_quote) raise "Invalid channels to join: #{bad_channels}" unless bad_channels.empty? if options[:logger] == false - @logger = nil + @logger = nil elsif !options[:logger].nil? @logger = options[:logger] else @@ -196,30 +196,30 @@ def initialize(options, on_trade, on_quote) def provider @provider end - + def join(*channels) channels = parse_channels(channels) nonconforming = channels.select{|x| !x.is_a?(String)} return error("Invalid channels to join: #{nonconforming}") unless nonconforming.empty? - + @channels.concat(channels) @channels.uniq! debug "Joining channels #{channels}" - + refresh_channels() end - + def leave(*channels) channels = parse_channels(channels) nonconforming = channels.find{|x| !x.is_a?(String)} return error("Invalid channels to leave: #{nonconforming}") unless nonconforming.empty? - + channels.each{|c| @channels.delete(c)} debug "Leaving channels #{channels}" - + refresh_channels() end - + def leave_all @channels = [] debug "Leaving all channels" @@ -230,7 +230,7 @@ def connect raise "Must be run from within an EventMachine run loop" unless EM.reactor_running? return warn("Already connected!") if @ready debug "Connecting..." - + catch :fatal do begin @closing = false @@ -243,7 +243,7 @@ def connect end end end - + def disconnect EM.cancel_timer(@selfheal_timer) if @selfheal_timer @ready = false @@ -272,7 +272,7 @@ def on_trade(on_trade) def on_quote(on_quote) @on_quote = on_quote end - + private def queue_message(message) @@ -417,11 +417,11 @@ def refresh_token @token = response.body debug "Token refreshed" end - - def auth_url + + def auth_url url = "" - case @provider + case @provider when REALTIME then url = "https://realtime-mx.intrinio.com/auth" when DELAYED_SIP then url = "https://realtime-delayed-sip.intrinio.com/auth" when NASDAQ_BASIC then url = "https://realtime-nasdaq-basic.intrinio.com/auth" @@ -443,7 +443,7 @@ def api_auth_url(url) "#{url}api_key=#{@api_key}" end - def socket_url + def socket_url case @provider when REALTIME then "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}" when DELAYED_SIP then "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}" @@ -476,11 +476,11 @@ def refresh_websocket headers[:headers] = {} headers[CLIENT_INFO_HEADER_KEY] = CLIENT_INFO_HEADER_VALUE headers[MESSAGE_VERSION_HEADER_KEY] = MESSAGE_VERSION_HEADER_VALUE - @ws = ws = WebSocket::Client::Simple.connect(socket_url, headers) + @ws = ws = WebSocket::EventMachine::Client.connect(uri: socket_url, headers: headers) me.send :info, "Connection opening" - ws.on :open do + ws.onopen do me.send :info, "Connection established" me.send :ready, true if PROVIDERS.include?(me.send(:provider)) @@ -489,8 +489,7 @@ def refresh_websocket me.send :stop_self_heal end - ws.on :message do |frame| - data_message = frame.data + ws.onmessage do |data_message| #me.send :debug, "Message: #{data_message}" begin unless data_message.nil? @@ -500,24 +499,24 @@ def refresh_websocket me.send :error, "Error adding message to queue: #{data_message} #{e}" end end - - ws.on :close do |e| + + ws.onclose do |code, reason| me.send :ready, false - me.send :info, "Connection closing...: #{e}" + me.send :info, "Connection closing...: #{reason}" me.send :try_self_heal end - ws.on :error do |e| + ws.onerror do |e| me.send :ready, false me.send :error, "Connection error: #{e}" me.send :try_self_heal end end - + def refresh_channels return unless @ready debug "Refreshing channels" - + # Join new channels new_channels = @channels - @joined_channels new_channels.each do |channel| @@ -527,7 +526,7 @@ def refresh_channels @ws.send(msg) info "Joined #{channel}" end - + # Leave old channels old_channels = @joined_channels - @channels old_channels.each do |channel| @@ -537,31 +536,31 @@ def refresh_channels @ws.send(msg) info "Left #{channel}" end - + @channels.uniq! @joined_channels = Array.new(@channels) debug "Current channels: #{@channels}" end - + def try_self_heal return if @closing debug "Attempting to self-heal" - + time = @selfheal_backoffs.first @selfheal_backoffs.delete_at(0) if @selfheal_backoffs.count > 1 - + EM.cancel_timer(@selfheal_timer) if @selfheal_timer - + @selfheal_timer = EM.add_timer(time/1000) do connect() end end - + def stop_self_heal EM.cancel_timer(@selfheal_timer) if @selfheal_timer @selfheal_backoffs = Array.new(SELF_HEAL_BACKOFFS) end - + def ready(val) @ready = val end @@ -571,19 +570,19 @@ def debug(message) @logger.debug(message) rescue nil end - + def info(message) message = "IntrinioRealtime | #{message}" @logger.info(message) rescue nil end - + def error(message) message = "IntrinioRealtime | #{message}" @logger.error(message) rescue nil end - + def fatal(message) message = "IntrinioRealtime | #{message}" @logger.fatal(message) rescue @@ -591,14 +590,14 @@ def fatal(message) throw :fatal nil end - + def parse_channels(channels) channels.flatten! channels.uniq! channels.compact! channels end - + def join_binary_message(channel) if (channel == "lobby") && (@trades_only == false) return [74, 0, 36, 70, 73, 82, 69, 72, 79, 83, 69].pack('C*') #74, not trades only, "$FIREHOSE"