Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/tasks/cleanupFailedJobs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { TaskConfig, type Where } from "payload";
import { getTaskLogger, withTaskTracing } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing } from "./utils";

const DEFAULT_RETENTION_HOURS = 24;

Expand All @@ -13,6 +13,7 @@ export const CleanupFailedJobs: TaskConfig<"cleanupFailedJobs"> = {
slug: "cleanupFailedJobs",
label: "Cleanup Failed Jobs",
retries: 1,
onFail: createOnFail("cleanupFailedJobs"),
schedule: [
{
cron: "0 * * * *",
Expand Down
47 changes: 34 additions & 13 deletions src/tasks/createPoliticalEntity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ import {
getMediaIdFromLookup,
normalizeMediaSourceUrl,
} from "@/lib/mediaUrl";
import { getTaskLogger, withTaskTracing } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing } from "./utils";

export const CreatePoliticalEntity: TaskConfig = {
slug: "createPoliticalEntity",
label: "Create Political Entity",
onFail: createOnFail("createPoliticalEntity"),
handler: withTaskTracing("createPoliticalEntity", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "createPoliticalEntity", input);
Expand Down Expand Up @@ -330,15 +331,33 @@ export const CreatePoliticalEntity: TaskConfig = {
}

const directPatterns: Array<[RegExp, string]> = [
[/missing tenant mapping for country/i, "Missing required field: Country"],
[/missing required field:\s*country/i, "Missing required field: Country"],
[/missing required field:\s*political entity name/i, "Missing required field: Political Entity Name"],
[
/missing tenant mapping for country/i,
"Missing required field: Country",
],
[
/missing required field:\s*country/i,
"Missing required field: Country",
],
[
/missing required field:\s*political entity name/i,
"Missing required field: Political Entity Name",
],
[/missing required field:\s*image/i, "Missing required field: Image"],
[/missing required fields?:\s*period from/i, "Missing required fields: Period From and Period To"],
[
/missing required fields?:\s*period from/i,
"Missing required fields: Period From and Period To",
],
[/url is invalid|invalid image url/i, "Invalid image URL"],
[/validation/i, "Validation failed while saving entity"],
[/unauthorized|forbidden|http\s*401|http\s*403/i, "Sync failed: Unauthorized"],
[/timeout|timed out|network|fetch failed|econn|enotfound/i, "Sync failed: Network error"],
[
/unauthorized|forbidden|http\s*401|http\s*403/i,
"Sync failed: Unauthorized",
],
[
/timeout|timed out|network|fetch failed|econn|enotfound/i,
"Sync failed: Network error",
],
[/failed syncing entity:/i, "Failed to sync entity"],
];

Expand Down Expand Up @@ -444,10 +463,7 @@ export const CreatePoliticalEntity: TaskConfig = {

if (!tenantForEntity) {
failedCount += 1;
await setEntityStatus(
entity.id,
"Missing required field: Country",
);
await setEntityStatus(entity.id, "Missing required field: Country");
logger.warn({
message:
"createPoliticalEntity:: Tenant not found for entity country",
Expand Down Expand Up @@ -516,7 +532,9 @@ export const CreatePoliticalEntity: TaskConfig = {

mediaId = (mediaUpload as any)?.id ?? mediaUpload;
if (!mediaId) {
throw new Error("Failed to resolve media ID for political entity image");
throw new Error(
"Failed to resolve media ID for political entity image",
);
}

cacheMediaSourceLookup(
Expand Down Expand Up @@ -684,7 +702,10 @@ export const CreatePoliticalEntity: TaskConfig = {
entityError instanceof Error
? entityError.message
: String(entityError ?? "");
await setEntityStatus(entity.id, simplifyEntityStatusError(errorMessage));
await setEntityStatus(
entity.id,
simplifyEntityStatusError(errorMessage),
);
logger.warn({
message: `createPoliticalEntity:: Unexpected error processing entity "${entity.politicalEntityName ?? entity.id}" — skipping`,
airtableEntityId: entity.id,
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/createTenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import { getAirtableCountries } from "@/lib/airtable";
import { LANGUAGE_MAP } from "@/utils/locales";
import { TaskConfig } from "payload";
import { Tenant } from "@/payload-types";
import { getTaskLogger, withTaskTracing } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing } from "./utils";

type COUNTRY = NonNullable<Tenant["country"]>;

export const CreateTenantFromAirtable: TaskConfig<"createTenantFromAirtable"> =
{
slug: "createTenantFromAirtable",
label: "Create Tenant From Airtable",
onFail: createOnFail("createTenantFromAirtable"),
handler: withTaskTracing(
"createTenantFromAirtable",
async ({ req, input }) => {
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/downloadDocuments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
normalizeMediaSourceUrl,
} from "@/lib/mediaUrl";
import { downloadFile } from "@/utils/files";
import { getTaskLogger, withTaskTracing, type TaskInput } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing, type TaskInput } from "./utils";

type DocURL = {
url?: string | null;
Expand Down Expand Up @@ -102,6 +102,7 @@ export const DownloadDocuments: TaskConfig<"downloadDocuments"> = {
retries: 2,
slug: "downloadDocuments",
label: "Download Documents",
onFail: createOnFail("downloadDocuments"),
handler: withTaskTracing("downloadDocuments", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "downloadDocuments", input);
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/extractDocuments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { join } from "path";
import { updateDocumentStatus } from "@/lib/airtable";
import { Media } from "@/payload-types";
import { downloadFile } from "@/utils/files";
import { getTaskLogger, withTaskTracing, type TaskInput } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing, type TaskInput } from "./utils";

const tika = new AxApacheTika({
url: process.env.AX_APACHE_TIKA_URL ?? "http://127.0.0.1:9998/",
Expand All @@ -29,6 +29,7 @@ export async function extractTextFromDoc(filePath: string): Promise<string[]> {
export const ExtractDocuments: TaskConfig<"extractDocuments"> = {
slug: "extractDocuments",
label: "Extract Documents",
onFail: createOnFail("extractDocuments"),
handler: withTaskTracing("extractDocuments", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "extractDocuments", input);
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/extractPromises.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
resolveConfiguredLanguageModel,
type AISettingsInput,
} from "@/lib/ai/providerRegistry";
import { getTaskLogger, withTaskTracing, type TaskInput } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing, type TaskInput } from "./utils";

type TaskLogger = ReturnType<typeof getTaskLogger>;

Expand Down Expand Up @@ -391,6 +391,7 @@ export const __extractPromisesTestUtils = {
export const ExtractPromises: TaskConfig<"extractPromises"> = {
slug: "extractPromises",
label: "Extract Promises",
onFail: createOnFail("extractPromises"),
handler: withTaskTracing("extractPromises", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "extractPromises", input);
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/fetchAirtableDocuments.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { TaskConfig } from "payload";
import { getUnprocessedDocuments, updateDocumentStatus } from "@/lib/airtable";
import { LANGUAGE_MAP } from "@/utils/locales";
import { getTaskLogger, withTaskTracing } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing } from "./utils";

type AirtableDocumentCandidate = {
id: string;
Expand All @@ -25,6 +25,7 @@ export const FetchAirtableDocuments: TaskConfig<"fetchAirtableDocuments"> = {
retries: 2,
slug: "fetchAirtableDocuments",
label: "Fetch Airtable Documents",
onFail: createOnFail("fetchAirtableDocuments"),
handler: withTaskTracing("fetchAirtableDocuments", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "fetchAirtableDocuments", input);
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/fetchMeedanPromiseStatus.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { TaskConfig } from "payload";
import { fetchVerificationStatuses } from "@/lib/meedan";
import { getTaskLogger, withTaskTracing } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing } from "./utils";

export const FetchPromiseStatuses: TaskConfig<"fetchPromiseStatuses"> = {
slug: "fetchPromiseStatuses",
label: "Fetch Promise Statuses",
onFail: createOnFail("fetchPromiseStatuses"),
handler: withTaskTracing("fetchPromiseStatuses", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "fetchPromiseStatuses", input);
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/syncMeedanPromises.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import { TaskConfig } from "payload";
import { fetchPublishedReports } from "@/lib/meedan";
import { syncMeedanReports } from "@/lib/syncMeedanReports";
import { getTaskLogger, withTaskTracing } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing } from "./utils";

export const SyncMeedanPromises: TaskConfig<"syncMeedanPromises"> = {
slug: "syncMeedanPromises",
label: "Sync Meedan Promises",
onFail: createOnFail("syncMeedanPromises"),
handler: withTaskTracing("syncMeedanPromises", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "syncMeedanPromises", input);
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/updatePromiseStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
PromiseStatus,
} from "@/payload-types";
import { fetchProjectMediaStatuses } from "@/lib/meedan";
import { getTaskLogger, withTaskTracing, type TaskInput } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing, type TaskInput } from "./utils";

type ExtractionItem = NonNullable<
NonNullable<AiExtractionDoc["extractions"]>[number]
Expand All @@ -25,6 +25,7 @@ const getStatusId = (value: ExtractionItem["Status"]) => {
export const UpdatePromiseStatus: TaskConfig<"updatePromiseStatus"> = {
slug: "updatePromiseStatus",
label: "Update Promise Status",
onFail: createOnFail("updatePromiseStatus"),
handler: withTaskTracing("updatePromiseStatus", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "updatePromiseStatus", input);
Expand Down
3 changes: 2 additions & 1 deletion src/tasks/uploadToMeedan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
PoliticalEntity,
Tenant,
} from "@/payload-types";
import { getTaskLogger, withTaskTracing, type TaskInput } from "./utils";
import { createOnFail, getTaskLogger, withTaskTracing, type TaskInput } from "./utils";

const getExtractionsToUpload = (doc: AiExtractionDoc) => {
return (
Expand Down Expand Up @@ -172,6 +172,7 @@ export const UploadToMeedan: TaskConfig<"uploadToMeedan"> = {
slug: "uploadToMeedan",
label: "Upload To Meedan",
retries: 0,
onFail: createOnFail("uploadToMeedan"),
handler: withTaskTracing("uploadToMeedan", async ({ req, input }) => {
const { payload } = req;
const logger = getTaskLogger(req, "uploadToMeedan", input);
Expand Down
26 changes: 25 additions & 1 deletion src/tasks/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Sentry from "@sentry/nextjs";
import type { TaskConfig, PayloadRequest } from "payload";
import type { TaskConfig, PayloadRequest, Job } from "payload";

export type RunContext = {
workflowSlug?: string;
Expand Down Expand Up @@ -120,6 +120,30 @@ export const getTaskLogger = (
return createTaskLogger(payloadLogger, taskSlug, runContext);
};

export const createOnFail: (
taskSlug: string,
) => NonNullable<TaskConfig["onFail"]> =
(taskSlug) =>
async ({ job, taskStatus }: { job: Job; taskStatus: unknown }) => {
const error =
taskStatus && typeof taskStatus === "object" && "error" in taskStatus
? (taskStatus as { error: unknown }).error
: undefined;

Sentry.captureMessage(`${taskSlug}: task failed`, {
level: "error",
tags: {
task: taskSlug,
},
extra: {
jobId: job.id,
taskStatus,
error,
input: (job as { input?: unknown }).input,
},
});
};

export const withTaskTracing = (
taskSlug: string,
handler: NonNullable<TaskConfig["handler"]>,
Expand Down