Skip to content
This repository has been archived by the owner on Jun 8, 2020. It is now read-only.

Resubscribe channels only when reconnecting #548

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -224,18 +224,18 @@ protected void initChannel(SocketChannel ch) {
}
reconnFailEmitters.forEach(emitter -> emitter.onNext(t));
})
.doOnComplete(() -> {
LOG.warn("Resubscribing channels");
resubscribeChannels();

connectionSuccessEmitters.forEach(emitter -> emitter.onNext(new Object()));
});
.doOnComplete(() -> connectionSuccessEmitters.forEach(emitter -> emitter.onNext(new Object())));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you check that this still works for exchanges such as Binance where channel subscriptions are made on connection?

}

private void scheduleReconnect() {
LOG.info("Scheduling reconnection");
webSocketChannel.eventLoop().schedule(
() -> connect().subscribe(),
() -> connect().subscribe(
() -> {
LOG.warn("Resubscribing channels");
resubscribeChannels();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also needs to call the connectionSuccessEmitters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move over to PublishSubject for reconnFailEmitters and connectionSuccessEmitters? I know its a little out of scope, but would be nice. Maybe emit the exchange too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Funnily enough, I just suggested that on another review! Yes, that seems sensible - it means that subscriptions to those observables could persist between manual reconnects.

}
),
retryDuration.toMillis(),
TimeUnit.MILLISECONDS);
}
Expand Down