Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite dataloader to use Fiber#transfer #4625

Merged
merged 14 commits into from
Dec 11, 2023
232 changes: 100 additions & 132 deletions lib/graphql/dataloader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,12 @@ def with(source_class, *batch_args, **batch_kwargs)
#
# @return [void]
def yield
Fiber.yield
if use_fiber_resume?
Fiber.yield
else
parent_fiber = Thread.current[:parent_fiber]
parent_fiber.transfer
end
nil
end

Expand Down Expand Up @@ -167,120 +172,121 @@ def run_isolated
end
end

# @api private Move along, move along
def run
if @nonblocking && !Fiber.scheduler
raise "`nonblocking: true` requires `Fiber.scheduler`, assign one with `Fiber.set_scheduler(...)` before executing GraphQL."
end
# At a high level, the algorithm is:
#
# A) Inside Fibers, run jobs from the queue one-by-one
# - When one of the jobs yields to the dataloader (`Fiber.yield`), then that fiber will pause
# - In that case, if there are still pending jobs, a new Fiber will be created to run jobs
# - Continue until all jobs have been _started_ by a Fiber. (Any number of those Fibers may be waiting to be resumed, after their data is loaded)
# B) Once all known jobs have been run until they are complete or paused for data, run all pending data sources.
# - Similarly, create a Fiber to consume pending sources and tell them to load their data.
# - If one of those Fibers pauses, then create a new Fiber to continue working through remaining pending sources.
# - When a source causes another source to become pending, run the newly-pending source _first_, since it's a dependency of the previous one.
# C) After all pending sources have been completely loaded (there are no more pending sources), resume any Fibers that were waiting for data.
# - Those Fibers assume that source caches will have been populated with the data they were waiting for.
# - Those Fibers may request data from a source again, in which case they will yeilded and be added to a new pending fiber list.
# D) Once all pending fibers have been resumed once, return to `A` above.
#
# For whatever reason, the best implementation I could find was to order the steps `[D, A, B, C]`, with a special case for skipping `D`
# on the first pass. I just couldn't find a better way to write the loops in a way that was DRY and easy to read.
#
pending_fibers = []
next_fibers = []
pending_source_fibers = []
job_fibers = []
next_job_fibers = []
source_fibers = []
next_source_fibers = []
first_pass = true

while first_pass || (f = pending_fibers.shift)
if first_pass
manager = spawn_fiber do
while first_pass || job_fibers.any?
first_pass = false
else
# These fibers were previously waiting for sources to load data,
# resume them. (They might wait again, in which case, re-enqueue them.)
resume(f)
if f.alive?
next_fibers << f
end
end

while @pending_jobs.any?
# Create a Fiber to consume jobs until one of the jobs yields
# or jobs run out
f = spawn_fiber {
while (job = @pending_jobs.shift)
job.call
while (f = job_fibers.shift || spawn_job_fiber)
if f.alive?
finished = run_fiber(f)
if !finished
next_job_fibers << f
end
end
}
resume(f)
# In this case, the job yielded. Queue it up to run again after
# we load whatever it's waiting for.
if f.alive?
next_fibers << f
end
end

if pending_fibers.empty?
# Now, run all Sources which have become pending _before_ resuming GraphQL execution.
# Sources might queue up other Sources, which is fine -- those will also run before resuming execution.
#
# This is where an evented approach would be even better -- can we tell which
# fibers are ready to continue, and continue execution there?
#
if (first_source_fiber = create_source_fiber)
pending_source_fibers << first_source_fiber
end

while pending_source_fibers.any?
while (outer_source_fiber = pending_source_fibers.pop)
resume(outer_source_fiber)
if outer_source_fiber.alive?
next_source_fibers << outer_source_fiber
end
if (next_source_fiber = create_source_fiber)
pending_source_fibers << next_source_fiber
if job_fibers.empty?
any_pending_sources = true
while any_pending_sources
while (f = source_fibers.shift || spawn_source_fiber)
if f.alive?
finished = run_fiber(f)
if !finished
next_source_fibers << f
end
end
end
join_queues(source_fibers, next_source_fibers)
any_pending_sources = @source_cache.each_value.any? { |group_sources| group_sources.each_value.any?(&:pending?) }
end
join_queues(pending_source_fibers, next_source_fibers)
next_source_fibers.clear
end
# Move newly-enqueued Fibers on to the list to be resumed.
# Clear out the list of next-round Fibers, so that
# any Fibers that pause can be put on it.
join_queues(pending_fibers, next_fibers)
next_fibers.clear
join_queues(job_fibers, next_job_fibers)
end
end

if @pending_jobs.any?
raise "Invariant: #{@pending_jobs.size} pending jobs"
elsif pending_fibers.any?
raise "Invariant: #{pending_fibers.size} pending fibers"
elsif next_fibers.any?
raise "Invariant: #{next_fibers.size} next fibers"
run_fiber(manager)

rescue UncaughtThrowError => e
throw e.tag, e.value
end

def run_fiber(f)
if use_fiber_resume?
f.resume
else
f.transfer
end
nil
end

def join_queues(previous_queue, next_queue)
if @nonblocking
Fiber.scheduler.run
next_queue.select!(&:alive?)
def spawn_fiber
fiber_vars = get_fiber_variables
parent_fiber = use_fiber_resume? ? nil : Fiber.current
Fiber.new(blocking: !@nonblocking) {
set_fiber_variables(fiber_vars)
Thread.current[:parent_fiber] = parent_fiber
yield
# With `.transfer`, you have to explicitly pass back to the parent --
# if the fiber is allowed to terminate normally, control is passed to the main fiber instead.
if parent_fiber
parent_fiber.transfer(true)
else
true
end
}
end


def get_fiber_state
fiber_locals = {}

Thread.current.keys.each do |fiber_var_key|
# This variable should be fresh in each new fiber
if fiber_var_key != :__graphql_runtime_info
fiber_locals[fiber_var_key] = Thread.current[fiber_var_key]
end
end
previous_queue.concat(next_queue)

fiber_locals
end

def set_fiber_state(state)
state.each { |k, v| Thread.current[k] = v }
end

private

# If there are pending sources, return a fiber for running them.
# Otherwise, return `nil`.
#
# @return [Fiber, nil]
def create_source_fiber
def join_queues(prev_queue, new_queue)
@nonblocking && Fiber.scheduler.run
prev_queue.concat(new_queue)
new_queue.clear
end

def use_fiber_resume?
Fiber.respond_to?(:scheduler) &&
(
(defined?(::DummyScheduler) && Fiber.scheduler.is_a?(::DummyScheduler)) ||
(defined?(::Evt) && ::Evt::Scheduler.singleton_class::BACKENDS.any? { |be| Fiber.scheduler.is_a?(be) }) ||
(defined?(::Libev) && Fiber.scheduler.is_a?(::Libev::Scheduler))
)
end

def spawn_job_fiber
if @pending_jobs.any?
spawn_fiber do
while job = @pending_jobs.shift
job.call
end
end
end
end

def spawn_source_fiber
pending_sources = nil
@source_cache.each_value do |source_by_batch_params|
source_by_batch_params.each_value do |source|
Expand All @@ -292,48 +298,10 @@ def create_source_fiber
end

if pending_sources
# By passing the whole array into this Fiber, it's possible that we set ourselves up for a bunch of no-ops.
# For example, if you have sources `[a, b, c]`, and `a` is loaded, then `b` yields to wait for `d`, then
# the next fiber would be dispatched with `[c, d]`. It would fulfill `c`, then `d`, then eventually
# the previous fiber would start up again. `c` would no longer be pending, but it would still receive `.run_pending_keys`.
# That method is short-circuited since it isn't pending any more, but it's still a waste.
#
# This design could probably be improved by maintaining a `@pending_sources` queue which is shared by the fibers,
# similar to `@pending_jobs`. That way, when a fiber is resumed, it would never pick up work that was finished by a different fiber.
source_fiber = spawn_fiber do
spawn_fiber do
pending_sources.each(&:run_pending_keys)
end
end

source_fiber
end

def resume(fiber)
fiber.resume
rescue UncaughtThrowError => e
throw e.tag, e.value
end

# Copies the thread local vars into the fiber thread local vars. Many
# gems (such as RequestStore, MiniRacer, etc.) rely on thread local vars
# to keep track of execution context, and without this they do not
# behave as expected.
#
# @see https://github.com/rmosolgo/graphql-ruby/issues/3449
def spawn_fiber
fiber_vars = get_fiber_variables

if @nonblocking
Fiber.new(blocking: false) do
set_fiber_variables(fiber_vars)
yield
end
else
Fiber.new do
set_fiber_variables(fiber_vars)
yield
end
end
end
end
end
17 changes: 8 additions & 9 deletions spec/graphql/dataloader/async_dataloader_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,10 @@ def wait_for(tag:, wait:)
end

def with_scheduler
prev_scheduler = Fiber.scheduler
Fiber.set_scheduler(scheduler_class.new)
yield
ensure
Fiber.set_scheduler(prev_scheduler)
Fiber.set_scheduler(nil)
end

module AsyncDataloaderAssertions
Expand Down Expand Up @@ -240,13 +239,13 @@ def self.included(child_class)
include AsyncDataloaderAssertions
end

# if RUBY_ENGINE == "ruby" && !ENV["GITHUB_ACTIONS"]
# describe "With libev_scheduler" do
# require "libev_scheduler"
# let(:scheduler_class) { Libev::Scheduler }
# include AsyncDataloaderAssertions
# end
# end
if RUBY_ENGINE == "ruby" && !ENV["GITHUB_ACTIONS"]
describe "With libev_scheduler" do
require "libev_scheduler"
let(:scheduler_class) { Libev::Scheduler }
include AsyncDataloaderAssertions
end
end

describe "with evt" do
require "evt"
Expand Down