From afe1f84abfd9735403a65d020f366757df42a391 Mon Sep 17 00:00:00 2001 From: Thibault Le Ouay Date: Sun, 19 Jan 2025 10:50:52 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A5=20workflow?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/workflows/src/cron/index.ts | 31 +++++-- apps/workflows/src/cron/monitor.ts | 126 +++++++++++++++++++++++------ 2 files changed, 126 insertions(+), 31 deletions(-) diff --git a/apps/workflows/src/cron/index.ts b/apps/workflows/src/cron/index.ts index b17f072c6f..46c66023c7 100644 --- a/apps/workflows/src/cron/index.ts +++ b/apps/workflows/src/cron/index.ts @@ -3,7 +3,13 @@ import { Hono } from "hono"; import { env } from "../env"; import { sendCheckerTasks } from "./checker"; import { sendFollowUpEmails } from "./emails"; -import { LaunchMonitorWorkflow, workflowStepSchema } from "./monitor"; +import { + LaunchMonitorWorkflow, + Step14Days, + Step3Days, + StepPaused, + workflowStepSchema, +} from "./monitor"; const app = new Hono({ strict: false }); @@ -53,22 +59,31 @@ app.post("/monitors/:step", async (c) => { const step = c.req.param("step"); const schema = workflowStepSchema.safeParse(step); + const userId = c.req.query("userId"); + const initialRun = c.req.query("initialRun"); if (!schema.success) { return c.json({ error: schema.error.issues?.[0].message }, 400); } + if (!userId) { + return c.json({ error: "userId is required" }, 400); + } + if (!initialRun) { + return c.json({ error: "initialRun is required" }, 400); + } + switch (schema.data) { case "14days": - console.log("14 days"); - break; - case "7days": - console.log("7days"); + // We send the first email + await Step14Days(Number(userId)); break; - case "1day": - console.log("1day"); + case "3days": + await Step3Days(Number(userId), Number(initialRun)); + // 3 days before we send the second email break; case "paused": - console.log("paused"); + // Let's pause the monitor + await StepPaused(Number(userId), Number(initialRun)); break; default: throw new Error("Invalid step"); diff --git a/apps/workflows/src/cron/monitor.ts b/apps/workflows/src/cron/monitor.ts index d83f546713..7903a74ffd 100644 --- a/apps/workflows/src/cron/monitor.ts +++ b/apps/workflows/src/cron/monitor.ts @@ -17,6 +17,12 @@ const client = new CloudTasksClient({ }, }); +const parent = client.queuePath( + env().GCP_PROJECT_ID, + env().GCP_LOCATION, + "workflow", +); + export async function LaunchMonitorWorkflow() { const threeMonthAgo = new Date().setMonth(new Date().getMonth() - 3); @@ -46,11 +52,6 @@ export async function LaunchMonitorWorkflow() { or(isNull(schema.workspace.plan), ne(schema.workspace.plan, "free")), ), ); - const parent = client.queuePath( - env().GCP_PROJECT_ID, - env().GCP_LOCATION, - "workflow", - ); // iterate over users for (const user of users) { // check if user has some running monitors @@ -76,37 +77,96 @@ export async function LaunchMonitorWorkflow() { parent, client: client, step: "14days", + userId: user.userId, + initialRun: new Date().getTime(), }); - // if they have check if the user is in the workflow - // If user not in workflow - // - // Start workflow -> create task with monitors/start - // add users to workflow Redis + // Add our user to the list of users that have started the workflow + + await redis.sadd("workflow:users", user.userId); console.log(`user worflow started for ${user.userId}`); } } -export async function Step14Days() { +export async function Step14Days(userId: number) { // Send email saying we are going to pause the monitors + // The task has just been created we don't double check if the user has logged in :scary: + // send First email } -export async function Step7Days() { +export async function Step3Days(userId: number, workFlowRunTimestamp: number) { // check if user has connected -} + const hasConnected = await hasUserLoggedIn({ + userId, + date: new Date(workFlowRunTimestamp), + }); -export async function Step1Day() { - // also check if user has connected + if (hasConnected) { + // + await redis.srem("workflow:users", userId); + return; + } + // Send second email + // Let's schedule the next task + await CreateTask({ + client, + parent, + step: "paused", + userId, + initialRun: workFlowRunTimestamp, + }); } -export async function StepPaused() { - // Send Email - // pause monitors +export async function StepPaused(userId: number, workFlowRunTimestamp: number) { + const hasConnected = await hasUserLoggedIn({ + userId, + date: new Date(workFlowRunTimestamp), + }); + if (!hasConnected) { + // sendSecond pause email + const users = await db + .select({ + userId: schema.user.id, + email: schema.user.email, + lastConnection: schema.user.updatedAt, + workspaceId: schema.workspace.id, + }) + .from(user) + .innerJoin( + schema.usersToWorkspaces, + eq(schema.user.id, schema.usersToWorkspaces.userId), + ) + .innerJoin( + schema.workspace, + eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id), + ) + .where( + and( + or(isNull(schema.workspace.plan), ne(schema.workspace.plan, "free")), + eq(schema.user.id, userId), + ), + ); + // We should only have one user :) + if (users.length !== 1) { + throw new Error("Too much users found"); + } + const workspaceId = users[0].workspaceId; + await db + .update(schema.monitor) + .set({ active: false }) + .where(eq(schema.monitor.workspaceId, workspaceId)); + // Send last email with pause monitor + } + // Remove user for workflow + await redis.srem("workflow:users", userId); } async function hasUserLoggedIn({ userId, date, -}: { userId: number; date: Date }) { +}: { + userId: number; + date: Date; +}) { const userResult = await db .select({ lastConnection: schema.user.updatedAt }) .from(schema.user) @@ -126,13 +186,17 @@ function CreateTask({ parent, client, step, + userId, + initialRun, }: { parent: string; client: CloudTasksClient; step: z.infer; + userId: number; + initialRun: number; }) { - const url = ""; - const timestamp = Date.now(); + const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`; + const timestamp = getScheduledTime(step); const payload = {}; // Should we send some data to the task or only in the url/ const newTask: google.cloud.tasks.v2beta3.ITask = { httpRequest: { @@ -145,7 +209,7 @@ function CreateTask({ body: Buffer.from(JSON.stringify(payload)).toString("base64"), }, scheduleTime: { - seconds: timestamp / 1000, + seconds: timestamp, }, }; @@ -153,5 +217,21 @@ function CreateTask({ return client.createTask(request); } -export const workflowStep = ["14days", "7days", "1day", "paused"] as const; +function getScheduledTime(step: z.infer) { + switch (step) { + case "14days": + // let's triger it now + return new Date().getTime() / 1000; + case "3days": + // it's 11 days after the 14 days + return new Date().setDate(new Date().getDate() + 11) / 1000; + case "paused": + // it's 3 days after the 3 days step + return new Date().setDate(new Date().getDate() + 3) / 1000; + default: + throw new Error("Invalid step"); + } +} + +export const workflowStep = ["14days", "3days", "paused"] as const; export const workflowStepSchema = z.enum(workflowStep);