diff --git a/packages/server/src/adapters/NotificationQueueImpl.ts b/packages/server/src/adapters/NotificationQueueImpl.ts
index f5569b0..948300d 100644
--- a/packages/server/src/adapters/NotificationQueueImpl.ts
+++ b/packages/server/src/adapters/NotificationQueueImpl.ts
@@ -2,7 +2,8 @@ import { NotificationQueue } from "../ports/NotificationQueue";
import webpush from "web-push";
import { PushSubscriptionsRepo } from "../ports/PushSubscriptionsRepo";
import { Client as FaktoryClient } from "faktory-worker";
-import * as SendPushNotificationJob from "../jobs/SendPushNotification";
+import { SendPushNotificationJob } from "../jobs/SendPushNotificationJob";
+import { createJob } from "../jobs/JobType";
export class NotificationQueueImpl implements NotificationQueue {
constructor(
@@ -30,14 +31,13 @@ export class NotificationQueueImpl implements NotificationQueue {
await this.faktory.pushBulk(
payloads.flatMap(({ userId, payload }) => {
const pushSubscriptions = pushSubscriptionsByUserId.get(userId) ?? [];
- return pushSubscriptions.map((pushSubscription) => {
- const arg: SendPushNotificationJob.Arg = {
+ return pushSubscriptions.map((pushSubscription) =>
+ createJob(this.faktory, SendPushNotificationJob, {
pushSubscription,
payload,
userId,
- };
- return this.faktory.job(SendPushNotificationJob.JobType, arg);
- });
+ })
+ );
})
);
}
diff --git a/packages/server/src/cron.ts b/packages/server/src/cron.ts
new file mode 100644
index 0000000..1821919
--- /dev/null
+++ b/packages/server/src/cron.ts
@@ -0,0 +1,46 @@
+import { CronJob } from "cron";
+import { ResWaitCountKey } from "./entities";
+import { CheckDeadTopicJob } from "./jobs/CheckDeadTopicJob";
+import { faktoryClient } from "./faktoryClient";
+import { createJob } from "./jobs/JobType";
+import { UserPointResetJob } from "./jobs/UserPointResetJob";
+import { UserCountResetJob } from "./jobs/UserCountResetJob";
+
+export function startCron() {
+ const startUserCountResetCron = (cronTime: string, key: ResWaitCountKey) => {
+ new CronJob({
+ cronTime,
+ onTick: () => {
+ void faktoryClient.push(
+ createJob(faktoryClient, UserCountResetJob, { key })
+ );
+ },
+ start: false,
+ timeZone: "Asia/Tokyo",
+ }).start();
+ };
+ startUserCountResetCron("00 00,10,20,30,40,50 * * * *", "m10");
+ startUserCountResetCron("00 00,30 * * * *", "m30");
+ startUserCountResetCron("00 00 * * * *", "h1");
+ startUserCountResetCron("00 00 00,06,12,18 * * *", "h6");
+ startUserCountResetCron("00 00 00,12 * * *", "h12");
+ startUserCountResetCron("00 00 00 * * *", "d1");
+
+ new CronJob({
+ cronTime: "00 00 00 * * *",
+ onTick: () => {
+ void faktoryClient.push(createJob(faktoryClient, UserPointResetJob, {}));
+ },
+ start: false,
+ timeZone: "Asia/Tokyo",
+ }).start();
+
+ new CronJob({
+ cronTime: "00 00 * * * *",
+ onTick: () => {
+ void faktoryClient.push(createJob(faktoryClient, CheckDeadTopicJob, {}));
+ },
+ start: false,
+ timeZone: "Asia/Tokyo",
+ }).start();
+}
diff --git a/packages/server/src/jobs/CheckDeadTopicJob.ts b/packages/server/src/jobs/CheckDeadTopicJob.ts
new file mode 100644
index 0000000..7fe7975
--- /dev/null
+++ b/packages/server/src/jobs/CheckDeadTopicJob.ts
@@ -0,0 +1,7 @@
+import { z } from "zod";
+import { JobType } from "./JobType";
+
+export const CheckDeadTopicJob = JobType({
+ type: "CheckDeadTopic",
+ arg: z.object({}),
+});
diff --git a/packages/server/src/jobs/JobType.ts b/packages/server/src/jobs/JobType.ts
new file mode 100644
index 0000000..341dc0c
--- /dev/null
+++ b/packages/server/src/jobs/JobType.ts
@@ -0,0 +1,27 @@
+import { z } from "zod";
+import { Job, Client, Worker } from "faktory-worker";
+
+export interface JobType {
+ type: string;
+ arg: z.ZodType;
+}
+
+// 型推論のためのヘルパー関数
+export function JobType(value: JobType): JobType {
+ return value;
+}
+
+export function createJob(client: Client, type: JobType, arg: A): Job {
+ return client.job(type.type, arg);
+}
+
+export function registerWorker(
+ worker: Worker,
+ type: JobType,
+ handler: (arg: A) => Promise
+): void {
+ worker.register(type.type, async (raw) => {
+ const arg = type.arg.parse(raw);
+ await handler(arg);
+ });
+}
diff --git a/packages/server/src/jobs/SendPushNotification.ts b/packages/server/src/jobs/SendPushNotification.ts
deleted file mode 100644
index fdd8c72..0000000
--- a/packages/server/src/jobs/SendPushNotification.ts
+++ /dev/null
@@ -1,15 +0,0 @@
-import { z } from "zod";
-
-export const JobType = "SendPushNotification";
-export const Arg = z.object({
- pushSubscription: z.object({
- endpoint: z.string(),
- keys: z.object({
- p256dh: z.string(),
- auth: z.string(),
- }),
- }),
- payload: z.string(),
- userId: z.string(),
-});
-export type Arg = z.infer;
diff --git a/packages/server/src/jobs/SendPushNotificationJob.ts b/packages/server/src/jobs/SendPushNotificationJob.ts
new file mode 100644
index 0000000..3ceb098
--- /dev/null
+++ b/packages/server/src/jobs/SendPushNotificationJob.ts
@@ -0,0 +1,17 @@
+import { z } from "zod";
+import { JobType } from "./JobType";
+
+export const SendPushNotificationJob = JobType({
+ type: "SendPushNotification",
+ arg: z.object({
+ pushSubscription: z.object({
+ endpoint: z.string(),
+ keys: z.object({
+ p256dh: z.string(),
+ auth: z.string(),
+ }),
+ }),
+ payload: z.string(),
+ userId: z.string(),
+ }),
+});
diff --git a/packages/server/src/jobs/UserCountResetJob.ts b/packages/server/src/jobs/UserCountResetJob.ts
new file mode 100644
index 0000000..8ce307c
--- /dev/null
+++ b/packages/server/src/jobs/UserCountResetJob.ts
@@ -0,0 +1,16 @@
+import { JobType } from "./JobType";
+import { z } from "zod";
+
+export const UserCountResetJob = JobType({
+ type: "UserCountReset",
+ arg: z.object({
+ key: z.union([
+ z.literal("m10"),
+ z.literal("m30"),
+ z.literal("h1"),
+ z.literal("h6"),
+ z.literal("h12"),
+ z.literal("d1"),
+ ]),
+ }),
+});
diff --git a/packages/server/src/jobs/UserPointResetJob.ts b/packages/server/src/jobs/UserPointResetJob.ts
new file mode 100644
index 0000000..41d2c43
--- /dev/null
+++ b/packages/server/src/jobs/UserPointResetJob.ts
@@ -0,0 +1,7 @@
+import { z } from "zod";
+import { JobType } from "./JobType";
+
+export const UserPointResetJob = JobType({
+ type: "UserPointReset",
+ arg: z.object({}),
+});
diff --git a/packages/server/src/server/server.ts b/packages/server/src/server/server.ts
index af97b09..399b57a 100644
--- a/packages/server/src/server/server.ts
+++ b/packages/server/src/server/server.ts
@@ -6,7 +6,7 @@ import { Config } from "../config";
import { resolvers } from "../schema/resolvers.generated";
import { resolveTypes } from "../schema/resolveTypes";
import { typeDefs } from "../schema/typeDefs.generated";
-import { runWorker } from "../worker";
+import { startWorker } from "../worker";
import { AppContext, createContext } from "./context";
import Router from "@koa/router";
import { ApolloServer } from "@apollo/server";
@@ -21,6 +21,7 @@ import { koaMiddleware } from "@as-integrations/koa";
import { makeExecutableSchema } from "@graphql-tools/schema";
import bodyParser from "koa-bodyparser";
import { GraphQLError } from "graphql";
+import { startCron } from "../cron";
export async function serverRun() {
const app = new Koa();
@@ -86,7 +87,8 @@ export async function serverRun() {
});
await server.start();
- await runWorker();
+ startCron();
+ await startWorker();
router.get("/ping", (ctx, _next) => (ctx.body = "OK"));
router.get("/livez", (ctx, _next) => (ctx.body = "OK"));
diff --git a/packages/server/src/worker.ts b/packages/server/src/worker.ts
index c017384..b78d07c 100644
--- a/packages/server/src/worker.ts
+++ b/packages/server/src/worker.ts
@@ -1,20 +1,19 @@
-import { CronJob } from "cron";
-import { Logger, TopicRepo, UserRepo } from "./adapters";
-import { ResWaitCountKey } from "./entities";
-import { prisma } from "./prisma-client";
import faktory from "faktory-worker";
import { Config } from "./config";
-import * as SendPushNotificationJob from "./jobs/SendPushNotification";
+import { SendPushNotificationJob } from "./jobs/SendPushNotificationJob";
+import { CheckDeadTopicJob } from "./jobs/CheckDeadTopicJob";
import { createPorts, PortsConfig } from "./createPorts";
+import { registerWorker } from "./jobs/JobType";
+import { UserPointResetJob } from "./jobs/UserPointResetJob";
+import { UserCountResetJob } from "./jobs/UserCountResetJob";
-export async function runWorker(): Promise {
+export async function startWorker(): Promise {
const worker = await faktory.work({
url: Config.faktory.url,
});
- worker.register(SendPushNotificationJob.JobType, async (raw) => {
+ registerWorker(worker, SendPushNotificationJob, async (arg) => {
const ports = createPorts(PortsConfig);
- const arg = SendPushNotificationJob.Arg.parse(raw);
await ports.notificationSender.sendNotification(
arg.userId,
arg.pushSubscription,
@@ -22,61 +21,18 @@ export async function runWorker(): Promise {
);
});
- runTopicWorker();
- runUserWorker();
-}
-
-function runTopicWorker() {
- // 毎時間トピ落ちチェック
- new CronJob({
- cronTime: "00 00 * * * *",
- onTick: () => {
- void (async () => {
- const logger = new Logger();
- const topicRepo = new TopicRepo(prisma);
-
- logger.info("TopicCron");
- await topicRepo.cronTopicCheck(new Date());
- })();
- },
- start: false,
- timeZone: "Asia/Tokyo",
- }).start();
-}
-
-function runUserWorker() {
- const start = (cronTime: string, key: ResWaitCountKey) => {
- new CronJob({
- cronTime,
- onTick: () => {
- void (async () => {
- const logger = new Logger();
- const userRepo = new UserRepo(prisma);
+ registerWorker(worker, CheckDeadTopicJob, async (_arg) => {
+ const ports = createPorts(PortsConfig);
+ await ports.topicRepo.cronTopicCheck(ports.clock.now());
+ });
- logger.info(`UserCron ${key}`);
- await userRepo.cronCountReset(key);
- })();
- },
- start: false,
- timeZone: "Asia/Tokyo",
- }).start();
- };
+ registerWorker(worker, UserPointResetJob, async (_arg) => {
+ const ports = createPorts(PortsConfig);
+ await ports.userRepo.cronPointReset();
+ });
- start("00 00,10,20,30,40,50 * * * *", "m10");
- start("00 00,30 * * * *", "m30");
- start("00 00 * * * *", "h1");
- start("00 00 00,06,12,18 * * *", "h6");
- start("00 00 00,12 * * *", "h12");
- start("00 00 00 * * *", "d1");
- new CronJob({
- cronTime: "00 00 00 * * *",
- onTick: () => {
- void (async () => {
- const userRepo = new UserRepo(prisma);
- await userRepo.cronPointReset();
- })();
- },
- start: false,
- timeZone: "Asia/Tokyo",
- }).start();
+ registerWorker(worker, UserCountResetJob, async (arg) => {
+ const ports = createPorts(PortsConfig);
+ await ports.userRepo.cronCountReset(arg.key);
+ });
}