diff --git a/lib/Cro/HTTP/Client.pm6 b/lib/Cro/HTTP/Client.pm6 index f4b4514..2776f53 100644 --- a/lib/Cro/HTTP/Client.pm6 +++ b/lib/Cro/HTTP/Client.pm6 @@ -10,10 +10,12 @@ use Cro::HTTP::LogTimelineSchema; use Cro::HTTP::Request; use Cro::HTTP::RequestSerializer; use Cro::HTTP::ResponseParser; +use Cro::HTTP2::Frame; use Cro::HTTP2::FrameParser; use Cro::HTTP2::FrameSerializer; use Cro::HTTP2::RequestSerializer; use Cro::HTTP2::ResponseParser; +use Cro::HTTP2::GeneralParser; use Cro::TCP; use Cro::TLS; use Cro::Uri; @@ -163,6 +165,10 @@ class Cro::HTTP::Client { method close() { $!in.done } } + my class GoAwayRetry is Exception { + has $.goaway-exception; + } + my class Pipeline2 { has Lock $!lock = Lock.new; has Bool $.secure; @@ -174,18 +180,7 @@ class Cro::HTTP::Client { has $!next-stream-id = 1; has %!outstanding-stream-responses{Int}; - submethod BUILD(:$!secure!, :$!host!, :$!port!, :$!in!, :$out!, :$go-away-supply!) { - $go-away-supply.tap: -> $last-processed-sid { - $!dead = True; - $!lock.protect: { - for %!outstanding-stream-responses.kv -> $sid, $vow { - if $sid > $last-processed-sid { - %!outstanding-stream-responses{$sid}:delete; - $vow.break(X::AdHoc.new(message => 'GoAway packet received')); - } - } - } - } + submethod BUILD(:$!secure!, :$!host!, :$!port!, :$!in!, :$out!) { $!tap = supply { whenever $out -> $response { @@ -195,6 +190,22 @@ class Cro::HTTP::Client { self.break-all-responses(X::AdHoc.new(message => 'Connection to server lost')); } QUIT { + when X::Cro::HTTP2::GoAway { + $!dead = True; + $!lock.protect: { + for %!outstanding-stream-responses.kv -> $sid, $vow { + if $sid > .last-processed-sid { + %!outstanding-stream-responses{$sid}:delete; + if .code == NO_ERROR { + $vow.break(GoAwayRetry.new(goaway-exception => $_)); + } + else { + $vow.break($_); + } + } + } + } + } default { $!dead = True; self.break-all-responses($_); @@ -339,6 +350,10 @@ class Cro::HTTP::Client { #| Request timeout policy. has Cro::Policy::Timeout $.timeout-policy; + #| How often should we retry to send a request when the server answered + #| with a NO_ERROR GoAway packet? + has $.http2-goaway-retries; + has $!persistent; has $!connection-cache = ConnectionCache.new; @@ -357,7 +372,8 @@ class Cro::HTTP::Client { :$http-proxy, :$https-proxy, :$!follow = $DEFAULT-MAX-REDIRECTS, :%!auth, :$!http, :$!persistent = True, :$!ca, :$!push-promises = False, - :ssl(:%!tls), :$timeout, :$!user-agent = 'Cro') { + :ssl(:%!tls), :$timeout, :$!http2-goaway-retries = 1, + :$!user-agent = 'Cro') { if $cookie-jar ~~ Bool { $!cookie-jar = Cro::HTTP::Client::CookieJar.new; } @@ -577,123 +593,145 @@ class Cro::HTTP::Client { Promise(supply { my $request-start-time = now; my $conn-timeout = $timeout-policy.get-timeout(0, 'connection'); - whenever self!get-pipeline($proxy-url // $parsed-url, $http, $conn-timeout, $request-log, ca => %options, tls => %options // %options, :$enable-push) -> $pipeline { - # Handle connection persistence. - if $pipeline !~~ Pipeline2 { - unless self.persistent || $request-object.has-header('connection') { - $request-object.append-header('Connection', 'close'); + my $goaway-retries = self ?? $!http2-goaway-retries !! %options // 1; + my Supplier $retry-supplier .= new; + my $retry-supply = $retry-supplier.Supply; + sub do-request-on-pipeline() { + whenever self!get-pipeline($proxy-url // $parsed-url, $http, $conn-timeout, $request-log, ca => %options, tls => %options // %options, :$enable-push) -> $pipeline { + + # Handle connection persistence. + if $pipeline !~~ Pipeline2 { + unless self.persistent || $request-object.has-header('connection') { + $request-object.append-header('Connection', 'close'); + } } - } - # Set up any timeout for receiving the response headers. - my $timeout = $timeout-policy.get-timeout(now - $request-start-time, 'headers'); - my Bool $headers-kept = False; - if $timeout !~~ Inf { - whenever Promise.in($timeout) { - die X::Cro::HTTP::Client::Timeout.new(phase => 'headers', uri => $url) unless $headers-kept; + # Set up any timeout for receiving the response headers. + my $timeout = $timeout-policy.get-timeout(now - $request-start-time, 'headers'); + my Bool $headers-kept = False; + if $timeout !~~ Inf { + whenever Promise.in($timeout) { + die X::Cro::HTTP::Client::Timeout.new(phase => 'headers', uri => $url) unless $headers-kept || $pipeline.dead; + } } - } - # Send the request. - whenever $pipeline.send-request($request-object) { - $headers-kept = True; - QUIT { $request-log.end } - - # Consider adding the connection back into the cache to use it - # again. - if self && $!persistent { - unless .http-version eq '1.0' || (.header('connection') // '').lc eq 'close' { - $!connection-cache.add-pipeline($pipeline); + # Send the request. + whenever $pipeline.send-request($request-object) { + $headers-kept = True; + QUIT { + $request-log.end; + when GoAwayRetry { + if $goaway-retries > 0 && !$headers-kept { + $retry-supplier.emit: True; + } + else { + .goaway-exception.rethrow; + } + } } - } - else { - $pipeline.close; - } - # If there's a body timeout, enforce it. Note that we need to detach - # this from the current supply, since it outlives it. - my $body-timeout = $timeout-policy.get-timeout(now - $request-start-time, 'body'); - if $body-timeout != Inf { - my $response-to-timeout = $_; - Promise.in($body-timeout).then: { $response-to-timeout.cancel } - } + # Consider adding the connection back into the cache to use it + # again. + if self && $!persistent { + unless .http-version eq '1.0' || (.header('connection') // '').lc eq 'close' { + $!connection-cache.add-pipeline($pipeline); + } + } + else { + $pipeline.close; + } - # Set request object for received response. - .request = $request-object; - .request.http-version = $pipeline ~~ Pipeline2 ?? '2' !! '1.1'; - - # Pick next steps according to response. - if 200 <= .status < 400 || .status == 101 { - my $follow; - if self { - $follow = %options // $!follow // $DEFAULT-MAX-REDIRECTS; - } else { - $follow = %options // $DEFAULT-MAX-REDIRECTS; + # If there's a body timeout, enforce it. Note that we need to detach + # this from the current supply, since it outlives it. + my $body-timeout = $timeout-policy.get-timeout(now - $request-start-time, 'body'); + if $body-timeout != Inf { + my $response-to-timeout = $_; + Promise.in($body-timeout).then: { $response-to-timeout.cancel } } - if .status ⊂ $redirect-codes && ($follow !=== False) { - my $remain = $follow === True ?? 4 !! $follow.Int - 1; - if $remain < 0 { - $request-log.end; - die X::Cro::HTTP::Client::TooManyRedirects.new; - } - my $new-method = .status == 302 | 303 ?? 'GET' !! $method; - my %new-opts = %options; - %new-opts = $remain; - if .status == 302 | 303 { - %new-opts:delete; - %new-opts:delete; - %new-opts:delete; + + # Set request object for received response. + .request = $request-object; + .request.http-version = $pipeline ~~ Pipeline2 ?? '2' !! '1.1'; + + # Pick next steps according to response. + if 200 <= .status < 400 || .status == 101 { + my $follow; + if self { + $follow = %options // $!follow // $DEFAULT-MAX-REDIRECTS; + } else { + $follow = %options // $DEFAULT-MAX-REDIRECTS; } - my $new-url = $parsed-url.add(Cro::Uri::HTTP.parse-ref(.header('location'))); - %new-opts = $request-log; - Cro::HTTP::LogTimeline::Redirected.log($request-log, :status(.status), :url($new-url)); - my $req = self.request($new-method, $new-url, %new-opts); - CATCH { $request-log.end; } - whenever $req { - QUIT { $request-log.end; } + if .status ⊂ $redirect-codes && ($follow !=== False) { + my $remain = $follow === True ?? 4 !! $follow.Int - 1; + if $remain < 0 { + $request-log.end; + die X::Cro::HTTP::Client::TooManyRedirects.new; + } + my $new-method = .status == 302 | 303 ?? 'GET' !! $method; + my %new-opts = %options; + %new-opts = $remain; + if .status == 302 | 303 { + %new-opts:delete; + %new-opts:delete; + %new-opts:delete; + } + my $new-url = $parsed-url.add(Cro::Uri::HTTP.parse-ref(.header('location'))); + %new-opts = $request-log; + Cro::HTTP::LogTimeline::Redirected.log($request-log, :status(.status), :url($new-url)); + my $req = self.request($new-method, $new-url, %new-opts); + CATCH { $request-log.end; } + whenever $req { + QUIT { $request-log.end; } + $request-log.end; + .emit; + done; + }; + } else { + if self && $.cookie-jar.defined { + $.cookie-jar.add-from-response($_, $parsed-url); + } $request-log.end; .emit; done; - }; - } else { - if self && $.cookie-jar.defined { - $.cookie-jar.add-from-response($_, $parsed-url); } - $request-log.end; - .emit; - done; - } - } elsif 400 <= .status < 500 { - my $auth; - if self { - $auth = %options // %!auth; - } else { - $auth = %options // {}; - } - if .status == 401 && (%options:exists) { - my %opts = %options; - %opts:delete; - %opts = $request-log; - Cro::HTTP::LogTimeline::AuthorizationRequested.log($request-log); - CATCH { $request-log.end; } - whenever self.request($method, $parsed-url, %opts) { - QUIT { $request-log.end; } + } elsif 400 <= .status < 500 { + my $auth; + if self { + $auth = %options // %!auth; + } else { + $auth = %options // {}; + } + if .status == 401 && (%options:exists) { + my %opts = %options; + %opts:delete; + %opts = $request-log; + Cro::HTTP::LogTimeline::AuthorizationRequested.log($request-log); + CATCH { $request-log.end; } + whenever self.request($method, $parsed-url, %opts) { + QUIT { $request-log.end; } + $request-log.end; + .emit; + done; + }; + } else { + Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status)); $request-log.end; - .emit; - done; - }; - } else { + die X::Cro::HTTP::Error::Client.new(response => $_); + } + } elsif .status >= 500 { Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status)); $request-log.end; - die X::Cro::HTTP::Error::Client.new(response => $_); + die X::Cro::HTTP::Error::Server.new(response => $_); } - } elsif .status >= 500 { - Cro::HTTP::LogTimeline::ErrorResponse.log($request-log, :status(.status)); - $request-log.end; - die X::Cro::HTTP::Error::Server.new(response => $_); } } } + + whenever $retry-supply { + $goaway-retries--; + do-request-on-pipeline(); + } + do-request-on-pipeline(); }) } @@ -808,15 +846,9 @@ class Cro::HTTP::Client { } push @parts, self.choose-connector($secure); - my $go-away-supply; - sub create-response-parser(*%params) { - my $res-parser = Cro::HTTP2::ResponseParser.new(|%params); - $go-away-supply = $res-parser.go-away-supply; - return $res-parser; - } if $http eq '2' { push @parts, Cro::HTTP2::FrameParser.new(:client); - push @parts, create-response-parser(:$enable-push); + push @parts, Cro::HTTP2::ResponseParser.new(:$enable-push); } elsif $http eq '1.1' || !$secure || !$supports-alpn { push @parts, Cro::HTTP::ResponseParser.new(); @@ -825,7 +857,7 @@ class Cro::HTTP::Client { push @parts, Cro::ConnectionConditional.new( { (.alpn-result // '') eq 'h2' } => [ Cro::HTTP2::FrameParser.new(:client), - create-response-parser() + Cro::HTTP2::ResponseParser.new() ], Cro::HTTP::ResponseParser.new() ); @@ -862,7 +894,7 @@ class Cro::HTTP::Client { }; $version-decision.then: -> $version { $version.result eq '2' - ?? Pipeline2.new(:$secure, :$host, :$port, :$in, :$out, :$go-away-supply) + ?? Pipeline2.new(:$secure, :$host, :$port, :$in, :$out) !! Pipeline.new(:$secure, :$host, :$port, :$in, :$out) } } diff --git a/lib/Cro/HTTP2/GeneralParser.pm6 b/lib/Cro/HTTP2/GeneralParser.pm6 index 21d77e1..4b046aa 100644 --- a/lib/Cro/HTTP2/GeneralParser.pm6 +++ b/lib/Cro/HTTP2/GeneralParser.pm6 @@ -9,6 +9,13 @@ use HTTP::HPACK; # HTTP/2 stream enum State ; +class X::Cro::HTTP2::GoAway is Exception { + has $.code; + has $.last-processed-sid; + + method message() { "$!code" } +} + my class Stream { has Int $.sid; has State $.state is rw; @@ -22,9 +29,6 @@ my class Stream { role Cro::HTTP2::GeneralParser does Cro::ConnectionState[Cro::HTTP2::ConnectionState] { has $!pseudo-headers; has $.enable-push = False; - has Supplier $!go-away-supplier .= new; - # Emits the highest stream number that is still allowed to be processed. - has Supply $.go-away-supply = $!go-away-supplier.Supply; method transformer(Supply:D $in, Cro::HTTP2::ConnectionState :$connection-state!) { supply { @@ -170,20 +174,24 @@ role Cro::HTTP2::GeneralParser does Cro::ConnectionState[Cro::HTTP2::ConnectionS push %push-promises-for-stream{.stream-identifier}, $pp; } when Cro::HTTP2::Frame::GoAway { - $!go-away-supplier.emit: .last-sid; for %push-promises-by-promised-id.kv -> $k, $v { if $k > .last-sid { %push-promises-by-promised-id{$k}:delete; $v.cancel-response(); } } - for %streams.kv -> $k, $v { + for %streams.keys -> $k { if $k > .last-sid { - %streams{$k}:delete; - $v.cancel-response(); + with %streams{$k}:delete { + if .message { + with .body { + .quit('GoAway received'); + } + } + } } } - %push-promises-for-stream{.stream-identifier}:delete; + die X::Cro::HTTP2::GoAway.new(:code($_.error-code), :last-processed-sid(.last-sid)); } when Cro::HTTP2::Frame::WindowUpdate { $connection-state.remote-window-change.emit: Cro::HTTP2::ConnectionState::WindowAdd.new: