Skip to content

Commit

Permalink
Release 5.0 (intrinio#18)
Browse files Browse the repository at this point in the history
* format changes

* no heartbeat

* working

* use the provider list

* version bump
  • Loading branch information
ssnyder-intrinio authored Jun 8, 2023
1 parent e96d8e6 commit dbc196b
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 57 deletions.
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
intrinio-realtime (4.2.0)
intrinio-realtime (5.0.0)
bigdecimal (~> 1.4.0)
eventmachine (~> 1.2)
thread (~> 0.2.2)
Expand Down
2 changes: 1 addition & 1 deletion intrinio-realtime.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "intrinio-realtime"
spec.version = "4.2.0"
spec.version = "5.0.0"
spec.authors = ["Intrinio"]
spec.email = ["[email protected]"]
spec.description = %q{Intrinio Ruby SDK for Real-Time Stock Prices}
Expand Down
166 changes: 111 additions & 55 deletions lib/intrinio-realtime.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,25 @@

module Intrinio
module Realtime
HEARTBEAT_TIME = 3
SELF_HEAL_BACKOFFS = [0, 100, 500, 1000, 2000, 5000].freeze
REALTIME = "REALTIME".freeze
MANUAL = "MANUAL".freeze
DELAYED_SIP = "DELAYED_SIP".freeze
NASDAQ_BASIC = "NASDAQ_BASIC".freeze
NO_SUBPROVIDER = "NONE".freeze
CTA_A = "CTA_A".freeze
CTA_B = "CTA_B".freeze
UTP = "UTP".freeze
OTC = "OTC".freeze
IEX = "IEX".freeze
PROVIDERS = [REALTIME, MANUAL, DELAYED_SIP, NASDAQ_BASIC].freeze
SUBPROVIDERS = [NO_SUBPROVIDER, CTA_A, CTA_B, UTP, OTC, NASDAQ_BASIC, IEX].freeze
ASK = "Ask".freeze
BID = "Bid".freeze
CLIENT_INFO_HEADER_KEY = "Client-Information".freeze
CLIENT_INFO_HEADER_VALUE = "IntrinioRealtimeRubySDKv5.0".freeze
MESSAGE_VERSION_HEADER_KEY = "UseNewEquitiesFormat".freeze
MESSAGE_VERSION_HEADER_VALUE = "v2".freeze

def self.connect(options, on_trade, on_quote)
EM.run do
Expand All @@ -25,12 +35,15 @@ def self.connect(options, on_trade, on_quote)
end

class Trade
def initialize(symbol, price, size, timestamp, total_volume)
def initialize(symbol, price, size, timestamp, total_volume, subprovider, market_center, condition)
@symbol = symbol
@price = price
@size = size
@timestamp = timestamp
@total_volume = total_volume
@subprovider = subprovider
@market_center = market_center
@condition = condition
end

def symbol
Expand All @@ -53,18 +66,33 @@ def total_volume
@total_volume
end

def subprovider
@subprovider
end

def market_center
@market_center
end

def condition
@condition
end

def to_s
[@symbol, @price, @size, @timestamp, @total_volume].join(",")
[@symbol, @price, @size, @timestamp, @total_volume, @subprovider, @market_center, @condition].join(",")
end
end

class Quote
def initialize(type, symbol, price, size, timestamp)
def initialize(type, symbol, price, size, timestamp, subprovider, market_center, condition)
@type = type
@symbol = symbol
@price = price
@size = size
@timestamp = timestamp
@subprovider = subprovider
@market_center = market_center
@condition = condition
end

def type
Expand All @@ -87,8 +115,20 @@ def timestamp
@timestamp
end

def subprovider
@subprovider
end

def market_center
@market_center
end

def condition
@condition
end

def to_s
[@symbol, @type, @price, @size, @timestamp].join(",")
[@symbol, @type, @price, @size, @timestamp, @subprovider, @market_center, @condition].join(",")
end
end

Expand All @@ -115,7 +155,7 @@ def initialize(options, on_trade, on_quote)
unless @provider
@provider = REALTIME
end
raise "Provider must be 'REALTIME' or 'MANUAL'" unless PROVIDERS.include?(@provider)
raise "Provider must be 'REALTIME', 'DELAYED_SIP', 'NASDAQ_BASIC', or 'MANUAL'" unless PROVIDERS.include?(@provider)

@ip_address = options[:ip_address]
raise "Missing option ip_address while in MANUAL mode." if @provider == MANUAL and (@ip_address.nil? || @ip_address.empty?)
Expand Down Expand Up @@ -148,7 +188,6 @@ def initialize(options, on_trade, on_quote)

@ready = false
@joined_channels = []
@heartbeat_timer = nil
@selfheal_timer = nil
@selfheal_backoffs = Array.new(SELF_HEAL_BACKOFFS)
@ws = nil
Expand Down Expand Up @@ -206,7 +245,6 @@ def connect
end

def disconnect
EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
EM.cancel_timer(@selfheal_timer) if @selfheal_timer
@ready = false
@closing = true
Expand Down Expand Up @@ -257,38 +295,71 @@ def parse_float32(data)
data.map { |i| [sprintf('%02x',i)].pack('H2') }.join.unpack('e').first
end

def parse_trade(data, start_index, symbol_length)
symbol = data[start_index + 2, symbol_length].map!{|c| c.chr}.join
price = parse_float32(data[start_index + 2 + symbol_length, 4])
size = parse_uint32(data[start_index + 6 + symbol_length, 4])
timestamp = parse_uint64(data[start_index + 10 + symbol_length, 8])
total_volume = parse_uint32(data[start_index + 18 + symbol_length, 4])
return Trade.new(symbol, price, size, timestamp, total_volume)
def parse_subprovider(byte)
case byte
when 0
NO_SUBPROVIDER
when 1
CTA_A
when 2
CTA_B
when 3
UTP
when 4
OTC
when 5
NASDAQ_BASIC
when 6
IEX
else
IEX
end
end

def parse_quote(data, start_index, symbol_length, msg_type)
def parse_trade(data, start_index)
symbol_length = data[start_index + 2]
condition_length = data[start_index + 26 + symbol_length]
symbol = data[start_index + 3, symbol_length].map!{|c| c.chr}.join
price = parse_float32(data[start_index + 6 + symbol_length, 4])
size = parse_uint32(data[start_index + 10 + symbol_length, 4])
timestamp = parse_uint64(data[start_index + 14 + symbol_length, 8])
total_volume = parse_uint32(data[start_index + 22 + symbol_length, 4])
subprovider = parse_subprovider(data[start_index + 3 + symbol_length])
market_center = data[start_index + 4 + symbol_length, 2].pack("C*").encode!('UTF-8', 'UTF-16LE')
condition = if condition_length > 0 then data[start_index + 27 + symbol_length, condition_length].map!{|c| c.chr}.join else "" end

return Trade.new(symbol, price, size, timestamp, total_volume, subprovider, market_center, condition)
end

def parse_quote(data, start_index, msg_type)
symbol_length = data[start_index + 2]
condition_length = data[start_index + 22 + symbol_length]
type = case when msg_type == 1 then ASK when msg_type == 2 then BID end
symbol = data[start_index + 2, symbol_length].map!{|c| c.chr}.join
price = parse_float32(data[start_index + 2 + symbol_length, 4])
size = parse_uint32(data[start_index + 6 + symbol_length, 4])
timestamp = parse_uint64(data[start_index + 10 + symbol_length, 8])
return Quote.new(type, symbol, price, size, timestamp)
symbol = data[start_index + 3, symbol_length].map!{|c| c.chr}.join
price = parse_float32(data[start_index + 6 + symbol_length, 4])
size = parse_uint32(data[start_index + 10 + symbol_length, 4])
timestamp = parse_uint64(data[start_index + 14 + symbol_length, 8])
subprovider = parse_subprovider(data[start_index + 3 + symbol_length])
market_center = data[start_index + 4 + symbol_length, 2].pack("C*").encode!('UTF-8', 'UTF-16LE')
condition = if condition_length > 0 then data[start_index + 23 + symbol_length, condition_length].map!{|c| c.chr}.join else "" end

return Quote.new(type, symbol, price, size, timestamp, subprovider, market_center, condition)
end

def handle_message(data, start_index)
msg_type = data[start_index]
symbol_length = data[start_index + 1]
msg_length = data[start_index + 1]
case msg_type
when 0 then
trade = parse_trade(data, start_index, symbol_length)
trade = parse_trade(data, start_index)
@on_trade.call(trade)
return start_index + 22 + symbol_length
return start_index + msg_length
when 1 || 2 then
quote = parse_quote(data, start_index, symbol_length, msg_type)
quote = parse_quote(data, start_index, msg_type)
@on_quote.call(quote)
return start_index + 18 + symbol_length
return start_index + msg_length
end
return start_index
return start_index + msg_length
end

def handle_data
Expand Down Expand Up @@ -332,7 +403,7 @@ def refresh_token
http.use_ssl = true if (auth_url.include?("https"))
http.start
request = Net::HTTP::Get.new(uri.request_uri)
request.add_field("Client-Information", "IntrinioRealtimeRubySDKv4.2")
request.add_field(CLIENT_INFO_HEADER_KEY, CLIENT_INFO_HEADER_VALUE)

unless @api_key
request.basic_auth(@username, @password)
Expand Down Expand Up @@ -374,10 +445,10 @@ def api_auth_url(url)

def socket_url
case @provider
when REALTIME then "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}"
when DELAYED_SIP then "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}"
when NASDAQ_BASIC then "wss://realtime-nasdaq-basic.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}"
when MANUAL then "ws://" + @ip_address + "/socket/websocket?vsn=1.0.0&token=#{@token}"
when REALTIME then "wss://realtime-mx.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
when DELAYED_SIP then "wss://realtime-delayed-sip.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
when NASDAQ_BASIC then "wss://realtime-nasdaq-basic.intrinio.com/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
when MANUAL then "ws://" + @ip_address + "/socket/websocket?vsn=1.0.0&token=#{@token}&#{CLIENT_INFO_HEADER_KEY}=#{CLIENT_INFO_HEADER_VALUE}&#{MESSAGE_VERSION_HEADER_KEY}=#{MESSAGE_VERSION_HEADER_VALUE}"
end
end

Expand All @@ -400,17 +471,21 @@ def refresh_websocket
@threads = []
@stop = false
@thread_quantity.times {@threads << Thread.new{handle_data}}

@ws = ws = WebSocket::Client::Simple.connect(socket_url)

headers = {}
headers[:headers] = {}
headers[CLIENT_INFO_HEADER_KEY] = CLIENT_INFO_HEADER_VALUE
headers[MESSAGE_VERSION_HEADER_KEY] = MESSAGE_VERSION_HEADER_VALUE
@ws = ws = WebSocket::Client::Simple.connect(socket_url, headers)

me.send :info, "Connection opening"

ws.on :open do
me.send :info, "Connection established"
me.send :ready, true
if [REALTIME, MANUAL].include?(me.send(:provider))
if PROVIDERS.include?(me.send(:provider))
me.send :refresh_channels
end
me.send :start_heartbeat
me.send :stop_self_heal
end

Expand Down Expand Up @@ -468,32 +543,13 @@ def refresh_channels
debug "Current channels: #{@channels}"
end

def start_heartbeat
EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
@heartbeat_timer = EM.add_periodic_timer(HEARTBEAT_TIME) do
if msg = heartbeat_msg()
@ws.send(msg)
debug "Heartbeat #{msg}"
end
end
end

def heartbeat_msg
""
end

def stop_heartbeat
EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
end

def try_self_heal
return if @closing
debug "Attempting to self-heal"

time = @selfheal_backoffs.first
@selfheal_backoffs.delete_at(0) if @selfheal_backoffs.count > 1

EM.cancel_timer(@heartbeat_timer) if @heartbeat_timer
EM.cancel_timer(@selfheal_timer) if @selfheal_timer

@selfheal_timer = EM.add_timer(time/1000) do
Expand Down

0 comments on commit dbc196b

Please sign in to comment.