Skip to content

Commit

Permalink
Merge pull request #628 from GetStream/mark-read-throttle
Browse files Browse the repository at this point in the history
fix: throttle mark read API requests
  • Loading branch information
szuperaz authored Aug 28, 2024
2 parents 164b676 + 0a29d99 commit b91e821
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 8 deletions.
113 changes: 112 additions & 1 deletion projects/stream-chat-angular/src/lib/channel.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first, take } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -208,6 +208,7 @@ describe('ChannelService', () => {
events$.next({ eventType: 'connection.recovered' } as ClientEvent);

tick();
flush();

expect(spy).toHaveBeenCalledWith(channels);
expect(activeChannelSpy).toHaveBeenCalledWith(channels[0]);
Expand Down Expand Up @@ -577,6 +578,10 @@ describe('ChannelService', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});
const spy = jasmine.createSpy();
service.activeChannelMessages$.subscribe(spy);
const prevCount = (spy.calls.mostRecent().args[0] as Channel[]).length;
Expand Down Expand Up @@ -991,6 +996,7 @@ describe('ChannelService', () => {

it('should add the new channel to the top of the list, and start watching it, if user is added to a channel', fakeAsync(async () => {
await init();
flush();
const newChannel = generateMockChannels()[0];
newChannel.cid = 'newchannel';
newChannel.id = 'newchannel';
Expand Down Expand Up @@ -1030,6 +1036,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];
const firstChannel = channels[0];
Expand Down Expand Up @@ -1059,6 +1066,7 @@ describe('ChannelService', () => {
event: { channel: channel } as any as Event<DefaultStreamChatGenerics>,
});
tick();
flush();

const channels = spy.calls.mostRecent().args[0] as Channel[];

Expand Down Expand Up @@ -2242,6 +2250,7 @@ describe('ChannelService', () => {

it('should relaod active channel if active channel is not present after state reconnect', fakeAsync(async () => {
await init();
flush();
let activeChannel!: Channel<DefaultStreamChatGenerics>;
service.activeChannel$.subscribe((c) => (activeChannel = c!));
let channels!: Channel<DefaultStreamChatGenerics>[];
Expand All @@ -2251,6 +2260,7 @@ describe('ChannelService', () => {
mockChatClient.queryChannels.and.resolveTo(channels);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();
const spy = jasmine.createSpy();
service.activeChannel$.subscribe(spy);

Expand All @@ -2276,6 +2286,7 @@ describe('ChannelService', () => {
activeChannel.state.messages.push(newMessage);
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).not.toHaveBeenCalled();
expect(service.deselectActiveChannel).not.toHaveBeenCalled();
Expand Down Expand Up @@ -2354,6 +2365,22 @@ describe('ChannelService', () => {
expect(service.activeChannelUnreadCount).toBe(0);
});

it(`should set last read message id to undefined if unread count is 0`, () => {
const activeChannel = generateMockChannels()[0];
activeChannel.id = 'next-active-channel';
activeChannel.state.read[user.id] = {
last_read: new Date(),
last_read_message_id: 'last-read-message-id',
unread_messages: 0,
user: user,
};

service.setAsActiveChannel(activeChannel);

expect(service.activeChannelLastReadMessageId).toBe(undefined);
expect(service.activeChannelUnreadCount).toBe(0);
});

it('should be able to select empty channel as active channel', () => {
const channel = generateMockChannels()[0];
channel.id = 'new-empty-channel';
Expand Down Expand Up @@ -2558,6 +2585,18 @@ describe('ChannelService', () => {

expect(service.activeChannelLastReadMessageId).toBe('last-read-message');
expect(service.activeChannelUnreadCount).toBe(12);

events$.next({
eventType: 'notification.mark_unread',
event: {
channel_id: service.activeChannel?.id,
unread_messages: 0,
last_read_message_id: 'last-read-message',
} as Event<DefaultStreamChatGenerics>,
});

expect(service.activeChannelLastReadMessageId).toBe(undefined);
expect(service.activeChannelUnreadCount).toBe(0);
});

it('should halt marking the channel as read if an unread call was made in that session', async () => {
Expand Down Expand Up @@ -2639,4 +2678,76 @@ describe('ChannelService', () => {
expect(customQuery).toHaveBeenCalledWith('next-page');
expect(hasMoreSpy).toHaveBeenCalledWith(false);
});

it('should throttle mark read API calls', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - channel change', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.setAsActiveChannel(service.channels[1]);

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});

it('should throttle mark read API calls - reset', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});

const activeChannel = service.activeChannel!;
spyOn(activeChannel, 'markRead');

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

(activeChannel as MockChannel).handleEvent('message.new', mockMessage());

expect(activeChannel.markRead).toHaveBeenCalledTimes(1);

service.reset();

expect(activeChannel.markRead).toHaveBeenCalledTimes(2);
});
});
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { fakeAsync, TestBed, tick } from '@angular/core/testing';
import { fakeAsync, flush, TestBed, tick } from '@angular/core/testing';
import { Subject } from 'rxjs';
import { first } from 'rxjs/operators';
import {
Expand Down Expand Up @@ -235,6 +235,7 @@ describe('ChannelService - threads', () => {
spy.calls.reset();
events$.next({ eventType: 'connection.recovered' } as ClientEvent);
tick();
flush();

expect(spy).toHaveBeenCalledWith(undefined);
}));
Expand Down Expand Up @@ -314,6 +315,10 @@ describe('ChannelService - threads', () => {

it('should watch for new message events', async () => {
await init();
// wait for mark read throttle time
await new Promise((resolve) => {
setTimeout(resolve, service['markReadThrottleTime']);
});
const spy = jasmine.createSpy();
const parentMessage = mockMessage();
await service.setAsActiveParentMessage(parentMessage);
Expand Down
46 changes: 41 additions & 5 deletions projects/stream-chat-angular/src/lib/channel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,9 @@ export class ChannelService<
};
private dismissErrorNotification?: () => void;
private areReadEventsPaused = false;
private markReadThrottleTime = 1050;
private markReadTimeout?: ReturnType<typeof setTimeout>;
private scheduledMarkReadRequest?: () => void;

constructor(
private chatClientService: ChatClientService<T>,
Expand Down Expand Up @@ -563,17 +566,19 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(prevActiveChannel);
this.flushMarkReadQueue();
this.areReadEventsPaused = false;
const readState =
channel.state.read[this.chatClientService.chatClient.user?.id || ''];
this.activeChannelLastReadMessageId = readState?.last_read_message_id;
this.activeChannelUnreadCount = readState?.unread_messages || 0;
if (
channel.state.latestMessages[channel.state.latestMessages.length - 1]
?.id === this.activeChannelLastReadMessageId
?.id === this.activeChannelLastReadMessageId ||
this.activeChannelUnreadCount === 0
) {
this.activeChannelLastReadMessageId = undefined;
}
this.activeChannelUnreadCount = readState?.unread_messages || 0;
this.watchForActiveChannelEvents(channel);
this.addChannel(channel);
this.activeChannelSubject.next(channel);
Expand All @@ -595,6 +600,7 @@ export class ChannelService<
return;
}
this.stopWatchForActiveChannelEvents(activeChannel);
this.flushMarkReadQueue();
this.activeChannelMessagesSubject.next([]);
this.activeChannelSubject.next(undefined);
this.activeParentMessageIdSubject.next(undefined);
Expand Down Expand Up @@ -1567,6 +1573,9 @@ export class ChannelService<
this.ngZone.run(() => {
this.activeChannelLastReadMessageId = e.last_read_message_id;
this.activeChannelUnreadCount = e.unread_messages;
if (this.activeChannelUnreadCount === 0) {
this.activeChannelLastReadMessageId = undefined;
}
this.activeChannelSubject.next(this.activeChannel);
});
})
Expand Down Expand Up @@ -2220,16 +2229,43 @@ export class ChannelService<
this.usersTypingInThreadSubject.next([]);
}

private markRead(channel: Channel<T>) {
private markRead(channel: Channel<T>, isThrottled = true) {
if (
this.canSendReadEvents &&
this.shouldMarkActiveChannelAsRead &&
!this.areReadEventsPaused
!this.areReadEventsPaused &&
channel.countUnread() > 0
) {
void channel.markRead();
if (isThrottled) {
this.markReadThrottled(channel);
} else {
void channel.markRead();
}
}
}

private markReadThrottled(channel: Channel<T>) {
if (!this.markReadTimeout) {
this.markRead(channel, false);
this.markReadTimeout = setTimeout(() => {
this.flushMarkReadQueue();
}, this.markReadThrottleTime);
} else {
clearTimeout(this.markReadTimeout);
this.scheduledMarkReadRequest = () => this.markRead(channel, false);
this.markReadTimeout = setTimeout(() => {
this.flushMarkReadQueue();
}, this.markReadThrottleTime);
}
}

private flushMarkReadQueue() {
this.scheduledMarkReadRequest?.();
this.scheduledMarkReadRequest = undefined;
clearTimeout(this.markReadTimeout);
this.markReadTimeout = undefined;
}

private async _init(settings: {
shouldSetActiveChannel: boolean;
messagePageSize: number;
Expand Down
2 changes: 1 addition & 1 deletion projects/stream-chat-angular/src/lib/mocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ export const generateMockChannels = (length = 25) => {
sendAction: () => {},
deleteImage: () => {},
deleteFile: () => {},
countUnread: () => {},
countUnread: () => 3,
markRead: () => {},
getReplies: () => {},
keystroke: () => {},
Expand Down

0 comments on commit b91e821

Please sign in to comment.