Skip to content

Commit

Permalink
🔥 workflow
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultleouay committed Jan 19, 2025
1 parent e2a30d5 commit afe1f84
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 31 deletions.
31 changes: 23 additions & 8 deletions apps/workflows/src/cron/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ import { Hono } from "hono";
import { env } from "../env";
import { sendCheckerTasks } from "./checker";
import { sendFollowUpEmails } from "./emails";
import { LaunchMonitorWorkflow, workflowStepSchema } from "./monitor";
import {
LaunchMonitorWorkflow,
Step14Days,
Step3Days,
StepPaused,
workflowStepSchema,
} from "./monitor";

const app = new Hono({ strict: false });

Expand Down Expand Up @@ -53,22 +59,31 @@ app.post("/monitors/:step", async (c) => {
const step = c.req.param("step");
const schema = workflowStepSchema.safeParse(step);

const userId = c.req.query("userId");
const initialRun = c.req.query("initialRun");
if (!schema.success) {
return c.json({ error: schema.error.issues?.[0].message }, 400);
}

if (!userId) {
return c.json({ error: "userId is required" }, 400);
}
if (!initialRun) {
return c.json({ error: "initialRun is required" }, 400);
}

switch (schema.data) {
case "14days":
console.log("14 days");
break;
case "7days":
console.log("7days");
// We send the first email
await Step14Days(Number(userId));
break;
case "1day":
console.log("1day");
case "3days":
await Step3Days(Number(userId), Number(initialRun));
// 3 days before we send the second email
break;
case "paused":
console.log("paused");
// Let's pause the monitor
await StepPaused(Number(userId), Number(initialRun));
break;
default:
throw new Error("Invalid step");
Expand Down
126 changes: 103 additions & 23 deletions apps/workflows/src/cron/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ const client = new CloudTasksClient({
},
});

const parent = client.queuePath(
env().GCP_PROJECT_ID,
env().GCP_LOCATION,
"workflow",
);

export async function LaunchMonitorWorkflow() {
const threeMonthAgo = new Date().setMonth(new Date().getMonth() - 3);

Expand Down Expand Up @@ -46,11 +52,6 @@ export async function LaunchMonitorWorkflow() {
or(isNull(schema.workspace.plan), ne(schema.workspace.plan, "free")),
),
);
const parent = client.queuePath(
env().GCP_PROJECT_ID,
env().GCP_LOCATION,
"workflow",
);
// iterate over users
for (const user of users) {
// check if user has some running monitors
Expand All @@ -76,37 +77,96 @@ export async function LaunchMonitorWorkflow() {
parent,
client: client,
step: "14days",
userId: user.userId,
initialRun: new Date().getTime(),
});
// if they have check if the user is in the workflow
// If user not in workflow
//
// Start workflow -> create task with monitors/start
// add users to workflow Redis
// Add our user to the list of users that have started the workflow

await redis.sadd("workflow:users", user.userId);
console.log(`user worflow started for ${user.userId}`);
}
}

export async function Step14Days() {
export async function Step14Days(userId: number) {
// Send email saying we are going to pause the monitors
// The task has just been created we don't double check if the user has logged in :scary:
// send First email
}

export async function Step7Days() {
export async function Step3Days(userId: number, workFlowRunTimestamp: number) {
// check if user has connected
}
const hasConnected = await hasUserLoggedIn({
userId,
date: new Date(workFlowRunTimestamp),
});

export async function Step1Day() {
// also check if user has connected
if (hasConnected) {
//
await redis.srem("workflow:users", userId);
return;
}
// Send second email
// Let's schedule the next task
await CreateTask({
client,
parent,
step: "paused",
userId,
initialRun: workFlowRunTimestamp,
});
}

export async function StepPaused() {
// Send Email
// pause monitors
export async function StepPaused(userId: number, workFlowRunTimestamp: number) {
const hasConnected = await hasUserLoggedIn({
userId,
date: new Date(workFlowRunTimestamp),
});
if (!hasConnected) {
// sendSecond pause email
const users = await db
.select({
userId: schema.user.id,
email: schema.user.email,
lastConnection: schema.user.updatedAt,
workspaceId: schema.workspace.id,
})
.from(user)
.innerJoin(
schema.usersToWorkspaces,
eq(schema.user.id, schema.usersToWorkspaces.userId),
)
.innerJoin(
schema.workspace,
eq(schema.usersToWorkspaces.workspaceId, schema.workspace.id),
)
.where(
and(
or(isNull(schema.workspace.plan), ne(schema.workspace.plan, "free")),
eq(schema.user.id, userId),
),
);
// We should only have one user :)
if (users.length !== 1) {
throw new Error("Too much users found");
}
const workspaceId = users[0].workspaceId;
await db
.update(schema.monitor)
.set({ active: false })
.where(eq(schema.monitor.workspaceId, workspaceId));
// Send last email with pause monitor
}
// Remove user for workflow
await redis.srem("workflow:users", userId);
}

async function hasUserLoggedIn({
userId,
date,
}: { userId: number; date: Date }) {
}: {
userId: number;
date: Date;
}) {
const userResult = await db
.select({ lastConnection: schema.user.updatedAt })
.from(schema.user)
Expand All @@ -126,13 +186,17 @@ function CreateTask({
parent,
client,
step,
userId,
initialRun,
}: {
parent: string;
client: CloudTasksClient;
step: z.infer<typeof workflowStepSchema>;
userId: number;
initialRun: number;
}) {
const url = "";
const timestamp = Date.now();
const url = `https://openstatus-workflows.fly.dev/cron/monitors/${step}?userId=${userId}&initialRun=${initialRun}`;
const timestamp = getScheduledTime(step);
const payload = {}; // Should we send some data to the task or only in the url/
const newTask: google.cloud.tasks.v2beta3.ITask = {
httpRequest: {
Expand All @@ -145,13 +209,29 @@ function CreateTask({
body: Buffer.from(JSON.stringify(payload)).toString("base64"),
},
scheduleTime: {
seconds: timestamp / 1000,
seconds: timestamp,
},
};

const request = { parent: parent, task: newTask };
return client.createTask(request);
}

export const workflowStep = ["14days", "7days", "1day", "paused"] as const;
function getScheduledTime(step: z.infer<typeof workflowStepSchema>) {
switch (step) {
case "14days":
// let's triger it now
return new Date().getTime() / 1000;
case "3days":
// it's 11 days after the 14 days
return new Date().setDate(new Date().getDate() + 11) / 1000;
case "paused":
// it's 3 days after the 3 days step
return new Date().setDate(new Date().getDate() + 3) / 1000;
default:
throw new Error("Invalid step");
}
}

export const workflowStep = ["14days", "3days", "paused"] as const;
export const workflowStepSchema = z.enum(workflowStep);

0 comments on commit afe1f84

Please sign in to comment.