diff --git a/apps/workflows/package.json b/apps/workflows/package.json index 2b08ba5d40..f111202c82 100644 --- a/apps/workflows/package.json +++ b/apps/workflows/package.json @@ -10,6 +10,7 @@ "@openstatus/db": "workspace:*", "@openstatus/emails": "workspace:*", "@openstatus/utils": "workspace:*", + "@openstatus/upstash": "workspace:*", "hono": "4.5.3", "zod": "3.23.8" }, diff --git a/apps/workflows/src/cron/index.ts b/apps/workflows/src/cron/index.ts index 03b5ef46c1..b17f072c6f 100644 --- a/apps/workflows/src/cron/index.ts +++ b/apps/workflows/src/cron/index.ts @@ -3,7 +3,7 @@ import { Hono } from "hono"; import { env } from "../env"; import { sendCheckerTasks } from "./checker"; import { sendFollowUpEmails } from "./emails"; -import { LaunchMonitorWorkflow } from "./monitor"; +import { LaunchMonitorWorkflow, workflowStepSchema } from "./monitor"; const app = new Hono({ strict: false }); @@ -50,6 +50,29 @@ app.post("/monitors", async (c) => { }); app.post("/monitors/:step", async (c) => { + const step = c.req.param("step"); + const schema = workflowStepSchema.safeParse(step); + + if (!schema.success) { + return c.json({ error: schema.error.issues?.[0].message }, 400); + } + + switch (schema.data) { + case "14days": + console.log("14 days"); + break; + case "7days": + console.log("7days"); + break; + case "1day": + console.log("1day"); + break; + case "paused": + console.log("paused"); + break; + default: + throw new Error("Invalid step"); + } // Swith on step // and do the right action // diff --git a/apps/workflows/src/cron/monitor.ts b/apps/workflows/src/cron/monitor.ts index 4f836b37db..90f9bbe8ae 100644 --- a/apps/workflows/src/cron/monitor.ts +++ b/apps/workflows/src/cron/monitor.ts @@ -1,6 +1,22 @@ +import { google } from "@google-cloud/tasks/build/protos/protos"; import { and, db, eq, isNull, lte, max, ne, or, schema } from "@openstatus/db"; import { user } from "@openstatus/db/src/schema"; +import { Redis } from "@openstatus/upstash"; +import { env } from "../env"; +import { CloudTasksClient } from "@google-cloud/tasks"; +import { z } from "zod"; + +const redis = Redis.fromEnv(); + +const client = new CloudTasksClient({ + projectId: env().GCP_PROJECT_ID, + credentials: { + client_email: env().GCP_CLIENT_EMAIL, + private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"), + }, +}); + export async function LaunchMonitorWorkflow() { const threeMonthAgo = new Date().setMonth(new Date().getMonth() - 3); @@ -30,16 +46,109 @@ export async function LaunchMonitorWorkflow() { or(isNull(schema.workspace.plan), ne(schema.workspace.plan, "free")), ), ); - + const parent = client.queuePath( + env().GCP_PROJECT_ID, + env().GCP_LOCATION, + "workflow", + ); // iterate over users for (const user of users) { // check if user has some running monitors + // + const nbRunningMonitor = await db.$count( + schema.monitor, + and( + eq(schema.monitor.workspaceId, user.workspaceId), + eq(schema.monitor.active, true), + isNull(schema.monitor.deletedAt), + ), + ); + if (nbRunningMonitor > 0) { + continue; + } + const isMember = await redis.sismember("workflow:users", user.userId); + + if (isMember) { + continue; + } + await CreateTask({ + parent, + client: client, + "step": "14days", + }); // if they have check if the user is in the workflow - // // If user not in workflow + // // Start workflow -> create task with monitors/start // add users to workflow Redis console.log(`user worflow started for ${user.userId}`); } } + +export async function Step14Days() { + // Send email saying we are going to pause the monitors +} + +export async function Step7Days() { + // check if user has connected +} + +export async function Step1Day() { + // also check if user has connected +} + +export async function StepPaused() { + // Send Email + // pause monitors +} + +async function hasUserLoggedIn( + { userId, date }: { userId: number; date: Date }, +) { + const userResult = await db.select({ lastConnection: schema.user.updatedAt }) + .from( + schema.user, + ).where(eq(schema.user.id, userId)); + + if (userResult.length === 0) { + console.error("Something strange no user found", userId); + } + const user = userResult[0]; + if (user.lastConnection === null) { + return false; + } + return user.lastConnection > date; +} + +function CreateTask( + { parent, client, step }: { + parent: string; + client: CloudTasksClient; + step: z.infer; + }, +) { + const url = ""; + const timestamp = Date.now(); + const payload = {}; // Should we send some data to the task or only in the url/ + const newTask: google.cloud.tasks.v2beta3.ITask = { + httpRequest: { + headers: { + "Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing + Authorization: `Basic ${env().CRON_SECRET}`, + }, + httpMethod: "POST", + url, + body: Buffer.from(JSON.stringify(payload)).toString("base64"), + }, + scheduleTime: { + seconds: timestamp / 1000, + }, + }; + + const request = { parent: parent, task: newTask }; + return client.createTask(request); +} + +export const workflowStep = ["14days", "7days", "1day", "paused"] as const; +export const workflowStepSchema = z.enum(workflowStep); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 657bcaf3c4..9b3cac3e17 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -486,6 +486,9 @@ importers: '@openstatus/emails': specifier: workspace:* version: link:../../packages/emails + '@openstatus/upstash': + specifier: workspace:* + version: link:../../packages/upstash '@openstatus/utils': specifier: workspace:* version: link:../../packages/utils