Skip to content

Commit

Permalink
Throttle event emits (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zoe authored Jul 15, 2024
1 parent 9b8b6d7 commit 2f282d4
Show file tree
Hide file tree
Showing 19 changed files with 255 additions and 26 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@digital-alchemy/hass",
"repository": "https://github.com/Digital-Alchemy-TS/hass",
"homepage": "https://docs.digital-alchemy.app",
"version": "24.7.4",
"version": "24.7.5",
"scripts": {
"build": "rm -rf dist/; tsc",
"lint": "eslint src",
Expand Down
6 changes: 3 additions & 3 deletions src/e2e/area.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
} from "@digital-alchemy/core";

import { TAreaId } from "../dynamic";
import { AREA_REGISTRY_UPDATED } from "../helpers";
import { AREA_REGISTRY_UPDATED, AreaDetails } from "../helpers";
import { CreateTestingApplication, SILENT_BOOT } from "../mock_assistant";
import { BASE_URL, TOKEN } from "./utils";

Expand All @@ -34,7 +34,7 @@ describe("Area E2E", () => {
let hit = false;
event.on(AREA_REGISTRY_UPDATED, () => (hit = true));
await hass.socket.fireEvent("area_registry_updated");
await sleep(50);
await sleep(100);
expect(hit).toBe(true);
await application.teardown();
});
Expand Down Expand Up @@ -68,7 +68,7 @@ describe("Area E2E", () => {
application = CreateTestingApplication({
Test({ lifecycle, hass }: TServiceParams) {
lifecycle.onReady(async () => {
const item = hass.area.current.find(i => i.area_id === testArea);
const item = hass.area.current.find(i => i.area_id === testArea) as AreaDetails;
await hass.area.update({
...item,
name: "extra test",
Expand Down
14 changes: 7 additions & 7 deletions src/e2e/entity.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ describe("Entity E2E", () => {
await hass.call.switch.toggle({
entity_id: "switch.porch_light",
});
await sleep(50);
await sleep(100);
}
expect(counter).toBe(expected);
await application.teardown();
Expand All @@ -103,7 +103,7 @@ describe("Entity E2E", () => {
await hass.call.switch.toggle({
entity_id: "switch.porch_light",
});
await sleep(50);
await sleep(100);
}
expect(counter).toBe(1);
await application.teardown();
Expand All @@ -128,7 +128,7 @@ describe("Entity E2E", () => {
await hass.call.switch.toggle({
entity_id: "switch.porch_light",
});
await sleep(50);
await sleep(100);
}
expect(counter).toBe(1);
await application.teardown();
Expand All @@ -153,7 +153,7 @@ describe("Entity E2E", () => {
await hass.call.switch.toggle({
entity_id: "switch.porch_light",
});
await sleep(50);
await sleep(100);
}
expect(counter).toBe(1);
await application.teardown();
Expand All @@ -175,7 +175,7 @@ describe("Entity E2E", () => {
await hass.call.switch.toggle({
entity_id: "switch.porch_light",
});
await sleep(50);
await sleep(100);
}
expect(counter).toBe(1);
await application.teardown();
Expand All @@ -196,7 +196,7 @@ describe("Entity E2E", () => {
expect(next.state).toBe(porch.state);
});
await hass.call.switch.toggle({ entity_id: "switch.porch_light" });
await sleep(50);
await sleep(100);
await application.teardown();
});
},
Expand All @@ -215,7 +215,7 @@ describe("Entity E2E", () => {
expect(next.state).toBe(porch.state);
});
await hass.call.switch.toggle({ entity_id: "switch.porch_light" });
await sleep(50);
await sleep(100);
await application.teardown();
});
},
Expand Down
2 changes: 1 addition & 1 deletion src/e2e/floor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe("Floor E2E", () => {
let hit = false;
event.on(FLOOR_REGISTRY_UPDATED, () => (hit = true));
await hass.socket.fireEvent("floor_registry_updated");
await sleep(50);
await sleep(100);
expect(hit).toBe(true);
await application.teardown();
});
Expand Down
2 changes: 1 addition & 1 deletion src/e2e/label.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe("Label E2E", () => {
let hit = false;
event.on(LABEL_REGISTRY_UPDATED, () => (hit = true));
await hass.socket.fireEvent("label_registry_updated");
await sleep(50);
await sleep(100);
expect(hit).toBe(true);
await application.teardown();
});
Expand Down
2 changes: 2 additions & 0 deletions src/extensions/area.extension.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
eachSeries,
InternalError,
throttle,
TServiceParams,
} from "@digital-alchemy/core";

Expand Down Expand Up @@ -38,6 +39,7 @@ export function Area({
context,
event_type: "area_registry_updated",
async exec() {
await throttle(AREA_REGISTRY_UPDATED, config.hass.EVENT_THROTTLE_MS);
hass.area.current = await hass.area.list();
logger.debug(`area registry updated`);
event.emit(AREA_REGISTRY_UPDATED);
Expand Down
3 changes: 2 additions & 1 deletion src/extensions/device.extension.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TServiceParams } from "@digital-alchemy/core";
import { throttle, TServiceParams } from "@digital-alchemy/core";

import {
DEVICE_REGISTRY_UPDATED,
Expand Down Expand Up @@ -31,6 +31,7 @@ export function Device({
context,
event_type: "device_registry_updated",
async exec() {
await throttle(DEVICE_REGISTRY_UPDATED, config.hass.EVENT_THROTTLE_MS);
hass.device.current = await hass.device.list();
logger.debug(`device registry updated`);
event.emit(DEVICE_REGISTRY_UPDATED);
Expand Down
20 changes: 11 additions & 9 deletions src/extensions/entity.extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
SECOND,
sleep,
START,
throttle,
TServiceParams,
} from "@digital-alchemy/core";
import dayjs, { Dayjs } from "dayjs";
Expand Down Expand Up @@ -296,22 +297,23 @@ export function EntityManager({
type: "config/entity_registry/get",
});
}
hass.socket.subscribe({
context,
event_type: "entity_registry_updated",
async exec() {
await throttle(ENTITY_REGISTRY_UPDATED, config.hass.EVENT_THROTTLE_MS);
logger.debug("entity registry updated");
hass.entity.registry.current = await hass.entity.registry.list();
event.emit(ENTITY_REGISTRY_UPDATED);
},
});

// #MARK: onConnect
hass.socket.onConnect(async () => {
if (!config.hass.AUTO_CONNECT_SOCKET || !config.hass.MANAGE_REGISTRY) {
return;
}
hass.entity.registry.current = await hass.entity.registry.list();
hass.socket.subscribe({
context,
event_type: "entity_registry_updated",
async exec() {
logger.debug("entity registry updated");
hass.entity.registry.current = await hass.entity.registry.list();
event.emit(ENTITY_REGISTRY_UPDATED);
},
});
});

// #MARK: RemoveEntity
Expand Down
3 changes: 2 additions & 1 deletion src/extensions/floor.extension.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TServiceParams } from "@digital-alchemy/core";
import { throttle, TServiceParams } from "@digital-alchemy/core";

import { TFloorId } from "../dynamic";
import {
Expand Down Expand Up @@ -32,6 +32,7 @@ export function Floor({
context,
event_type: "floor_registry_updated",
async exec() {
await throttle(FLOOR_REGISTRY_UPDATED, config.hass.EVENT_THROTTLE_MS);
hass.floor.current = await hass.floor.list();
logger.debug(`floor registry updated`);
event.emit(FLOOR_REGISTRY_UPDATED);
Expand Down
3 changes: 2 additions & 1 deletion src/extensions/label.extension.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TServiceParams } from "@digital-alchemy/core";
import { throttle, TServiceParams } from "@digital-alchemy/core";

import { TLabelId } from "../dynamic";
import {
Expand Down Expand Up @@ -32,6 +32,7 @@ export function Label({
context,
event_type: "label_registry_updated",
async exec() {
await throttle(LABEL_REGISTRY_UPDATED, config.hass.EVENT_THROTTLE_MS);
hass.label.current = await hass.label.list();
logger.debug(`label registry updated`);
event.emit(LABEL_REGISTRY_UPDATED);
Expand Down
5 changes: 5 additions & 0 deletions src/extensions/websocket-api.extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,11 @@ export function WebsocketAPI({
*/
setConnectionState,

/**
* internal
*/
socketEvents,

/**
* Subscribe to hass core registry updates.
*
Expand Down
3 changes: 2 additions & 1 deletion src/extensions/zone.extension.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TServiceParams } from "@digital-alchemy/core";
import { throttle, TServiceParams } from "@digital-alchemy/core";

import {
EARLY_ON_READY,
Expand Down Expand Up @@ -32,6 +32,7 @@ export function Zone({
context,
event_type: "zone_registry_updated",
async exec() {
await throttle(ZONE_REGISTRY_UPDATED, config.hass.EVENT_THROTTLE_MS);
hass.zone.current = await hass.zone.list();
logger.debug(`zone registry updated`);
event.emit(ZONE_REGISTRY_UPDATED);
Expand Down
5 changes: 5 additions & 0 deletions src/hass.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export const LIB_HASS = CreateLibrary({
enum: ["prefer", "forbid", "allow"],
type: "string",
} as StringConfig<AllowRestOptions>,
EVENT_THROTTLE_MS: {
default: 50,
description: "Throttle reactions to registry changes",
type: "number",
},
EXPECT_RESPONSE_AFTER: {
default: 5,
description:
Expand Down
35 changes: 35 additions & 0 deletions src/testing/area.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
ApplicationDefinition,
OptionalModuleConfiguration,
ServiceMap,
sleep,
TServiceParams,
} from "@digital-alchemy/core";

Expand Down Expand Up @@ -113,6 +114,40 @@ describe("Area", () => {
);
});

it("should throttle updates properly", async () => {
expect.assertions(1);
application = CreateTestingApplication({
Test({ lifecycle, hass }: TServiceParams) {
jest
.spyOn(hass.socket, "sendMessage")
.mockImplementation(async () => undefined);
let counter = 0;
hass.events.onAreaRegistryUpdate(() => counter++);
lifecycle.onReady(async () => {
setImmediate(async () => {
hass.socket.socketEvents.emit("area_registry_updated");
await sleep(5);
hass.socket.socketEvents.emit("area_registry_updated");
await sleep(5);
hass.socket.socketEvents.emit("area_registry_updated");
await sleep(75);
hass.socket.socketEvents.emit("area_registry_updated");
});
await sleep(200);
expect(counter).toBe(2);
});
},
});
await application.bootstrap(
SILENT_BOOT({
hass: {
AUTO_CONNECT_SOCKET: false,
AUTO_SCAN_CALL_PROXY: false,
},
}),
);
});

it("should call delete properly", async () => {
expect.assertions(1);
application = CreateTestingApplication({
Expand Down
35 changes: 35 additions & 0 deletions src/testing/device.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
ApplicationDefinition,
OptionalModuleConfiguration,
ServiceMap,
sleep,
TServiceParams,
} from "@digital-alchemy/core";

Expand Down Expand Up @@ -71,6 +72,40 @@ describe("Device", () => {
});
});

it("should throttle updates properly", async () => {
expect.assertions(1);
application = CreateTestingApplication({
Test({ lifecycle, hass }: TServiceParams) {
jest
.spyOn(hass.socket, "sendMessage")
.mockImplementation(async () => undefined);
let counter = 0;
hass.events.onDeviceRegistryUpdate(() => counter++);
lifecycle.onReady(async () => {
setImmediate(async () => {
hass.socket.socketEvents.emit("device_registry_updated");
await sleep(5);
hass.socket.socketEvents.emit("device_registry_updated");
await sleep(5);
hass.socket.socketEvents.emit("device_registry_updated");
await sleep(75);
hass.socket.socketEvents.emit("device_registry_updated");
});
await sleep(200);
expect(counter).toBe(2);
});
},
});
await application.bootstrap(
SILENT_BOOT({
hass: {
AUTO_CONNECT_SOCKET: false,
AUTO_SCAN_CALL_PROXY: false,
},
}),
);
});

describe("API", () => {
describe("Formatting", () => {
it("should call list properly", async () => {
Expand Down
Loading

0 comments on commit 2f282d4

Please sign in to comment.