Skip to content

Commit

Permalink
🚧 workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultleouay committed Jan 13, 2025
1 parent cae2930 commit fdf3096
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 3 deletions.
1 change: 1 addition & 0 deletions apps/workflows/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"@openstatus/db": "workspace:*",
"@openstatus/emails": "workspace:*",
"@openstatus/utils": "workspace:*",
"@openstatus/upstash": "workspace:*",
"hono": "4.5.3",
"zod": "3.23.8"
},
Expand Down
25 changes: 24 additions & 1 deletion apps/workflows/src/cron/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Hono } from "hono";
import { env } from "../env";
import { sendCheckerTasks } from "./checker";
import { sendFollowUpEmails } from "./emails";
import { LaunchMonitorWorkflow } from "./monitor";
import { LaunchMonitorWorkflow, workflowStepSchema } from "./monitor";

const app = new Hono({ strict: false });

Expand Down Expand Up @@ -50,6 +50,29 @@ app.post("/monitors", async (c) => {
});

app.post("/monitors/:step", async (c) => {
const step = c.req.param("step");
const schema = workflowStepSchema.safeParse(step);

if (!schema.success) {
return c.json({ error: schema.error.issues?.[0].message }, 400);
}

switch (schema.data) {
case "14days":
console.log("14 days");
break;
case "7days":
console.log("7days");
break;
case "1day":
console.log("1day");
break;
case "paused":
console.log("paused");
break;
default:
throw new Error("Invalid step");
}
// Swith on step
// and do the right action
//
Expand Down
113 changes: 111 additions & 2 deletions apps/workflows/src/cron/monitor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
import { google } from "@google-cloud/tasks/build/protos/protos";
import { and, db, eq, isNull, lte, max, ne, or, schema } from "@openstatus/db";
import { user } from "@openstatus/db/src/schema";

import { Redis } from "@openstatus/upstash";
import { env } from "../env";
import { CloudTasksClient } from "@google-cloud/tasks";
import { z } from "zod";

const redis = Redis.fromEnv();

const client = new CloudTasksClient({
projectId: env().GCP_PROJECT_ID,
credentials: {
client_email: env().GCP_CLIENT_EMAIL,
private_key: env().GCP_PRIVATE_KEY.replaceAll("\\n", "\n"),
},
});

export async function LaunchMonitorWorkflow() {
const threeMonthAgo = new Date().setMonth(new Date().getMonth() - 3);

Expand Down Expand Up @@ -30,16 +46,109 @@ export async function LaunchMonitorWorkflow() {
or(isNull(schema.workspace.plan), ne(schema.workspace.plan, "free")),
),
);

const parent = client.queuePath(
env().GCP_PROJECT_ID,
env().GCP_LOCATION,
"workflow",
);
// iterate over users
for (const user of users) {
// check if user has some running monitors
//
const nbRunningMonitor = await db.$count(
schema.monitor,
and(
eq(schema.monitor.workspaceId, user.workspaceId),
eq(schema.monitor.active, true),
isNull(schema.monitor.deletedAt),
),
);
if (nbRunningMonitor > 0) {
continue;
}
const isMember = await redis.sismember("workflow:users", user.userId);

if (isMember) {
continue;
}

await CreateTask({
parent,
client: client,
"step": "14days",
});
// if they have check if the user is in the workflow
//
// If user not in workflow
//
// Start workflow -> create task with monitors/start
// add users to workflow Redis
console.log(`user worflow started for ${user.userId}`);
}
}

export async function Step14Days() {
// Send email saying we are going to pause the monitors
}

export async function Step7Days() {
// check if user has connected
}

export async function Step1Day() {
// also check if user has connected
}

export async function StepPaused() {
// Send Email
// pause monitors
}

async function hasUserLoggedIn(
{ userId, date }: { userId: number; date: Date },
) {
const userResult = await db.select({ lastConnection: schema.user.updatedAt })
.from(
schema.user,
).where(eq(schema.user.id, userId));

if (userResult.length === 0) {
console.error("Something strange no user found", userId);
}
const user = userResult[0];
if (user.lastConnection === null) {
return false;
}
return user.lastConnection > date;
}

function CreateTask(
{ parent, client, step }: {
parent: string;
client: CloudTasksClient;
step: z.infer<typeof workflowStepSchema>;
},
) {
const url = "";
const timestamp = Date.now();
const payload = {}; // Should we send some data to the task or only in the url/
const newTask: google.cloud.tasks.v2beta3.ITask = {
httpRequest: {
headers: {
"Content-Type": "application/json", // Set content type to ensure compatibility your application's request parsing
Authorization: `Basic ${env().CRON_SECRET}`,
},
httpMethod: "POST",
url,
body: Buffer.from(JSON.stringify(payload)).toString("base64"),
},
scheduleTime: {
seconds: timestamp / 1000,
},
};

const request = { parent: parent, task: newTask };
return client.createTask(request);
}

export const workflowStep = ["14days", "7days", "1day", "paused"] as const;
export const workflowStepSchema = z.enum(workflowStep);
3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit fdf3096

Please sign in to comment.