Skip to content

Commit

Permalink
Fine-grained lock
Browse files Browse the repository at this point in the history
  • Loading branch information
ursm committed Jul 8, 2024
1 parent 8e711a5 commit 2d32e82
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 26 deletions.
15 changes: 10 additions & 5 deletions lib/fetch/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ module Fetch
class Client
include Singleton

def initialize
@pool = ConnectionPool.new
end

def fetch(resource, method: :get, headers: [], body: nil, redirect: :follow, _redirected: false)
uri = URI.parse(resource)
req = Net::HTTP.const_get(method.capitalize).new(uri)
Expand All @@ -45,7 +41,8 @@ def fetch(resource, method: :get, headers: [], body: nil, redirect: :follow, _re
req.body = body
end

res = @pool.with_connection(uri) { _1.request(req) }
# @type var uri: URI::HTTP
res = pool.with_connection(uri) { _1.request(req) }

case res
when Net::HTTPRedirection
Expand All @@ -66,6 +63,14 @@ def fetch(resource, method: :get, headers: [], body: nil, redirect: :follow, _re

private

def pool
if pool = Thread.current.thread_variable_get(:fetch_connection_pool)
pool
else
Thread.current.thread_variable_set :fetch_connection_pool, ConnectionPool.new
end
end

def to_response(url, res, redirected)
Response.new(
url: url.to_str,
Expand Down
14 changes: 5 additions & 9 deletions lib/fetch/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def with_connection(uri, &block)

def acquire(uri)
@mutex.synchronize {
entry = @connections[key(uri)]
entry = @connections[uri.origin]

if entry
entry.in_use = true
Expand All @@ -46,7 +46,7 @@ def acquire(uri)
http.use_ssl = uri.scheme == 'https'
http.keep_alive_timeout = Fetch.config.keep_alive_timeout

@connections[key(uri)] = Entry.new(connection: http, in_use: true)
@connections[uri.origin] = Entry.new(connection: http, in_use: true)

http.start
}
Expand All @@ -58,7 +58,7 @@ def acquire(uri)

def release(uri)
@mutex.synchronize do
if entry = @connections[key(uri)]
if entry = @connections[uri.origin]
entry.in_use = false
entry.last_used = Time.now
end
Expand All @@ -67,20 +67,16 @@ def release(uri)

def sweep
@mutex.synchronize do
@connections.each do |key, entry|
@connections.each do |origin, entry|
next if entry.in_use

if entry.last_used + Fetch.config.connection_max_idle_time < Time.now
entry.connection.finish

@connections.delete key
@connections.delete origin
end
end
end
end

def key(uri)
"#{Thread.current.object_id}/#{uri.origin}".freeze
end
end
end
1 change: 1 addition & 0 deletions sig/fetch/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Fetch
private

def initialize: () -> void
def pool: () -> ConnectionPool
def to_response: (string, Net::HTTPResponse, bool) -> Response
end
end
1 change: 0 additions & 1 deletion sig/fetch/connection_pool.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ module Fetch
def acquire: (URI::HTTP) -> Net::HTTP
def release: (URI::HTTP) -> void
def sweep: () -> void
def key: (URI::HTTP) -> String
end
end
11 changes: 0 additions & 11 deletions spec/fetch/connection_pool_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,4 @@
expect(conn1).not_to eq(conn2)
end
end

example 'thread safety' do
pool = Fetch::ConnectionPool.new

Fetch.config.with connection_max_idle_time: 10 do
conn1 = pool.with_connection(uri, &:itself)
conn2 = Thread.new { pool.with_connection(uri, &:itself) }.value

expect(conn1).not_to eq(conn2)
end
end
end

0 comments on commit 2d32e82

Please sign in to comment.