Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/collections/AIExtractions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ export const AIExtractions: CollectionConfig = {
fr: "URL CheckMedia",
},
type: "text",
admin: {
width: "50%",
},
},
{
name: "Status",
Expand All @@ -128,6 +131,24 @@ export const AIExtractions: CollectionConfig = {
en: "Status",
fr: "Statut",
},
admin: {
width: "50%",
},
},
{
name: "uploadError",
type: "text",
admin: {
readOnly: true,
description: {
en: "Set when the upload to Meedan fails permanently. Extractions with this field set will not be retried.",
fr: "Défini lorsque le téléchargement vers Meedan échoue définitivement. Les extractions avec ce champ ne seront pas réessayées.",
},
},
label: {
en: "Upload Error",
fr: "Erreur d'upload",
},
},
],
},
Expand Down
5 changes: 5 additions & 0 deletions src/payload-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,10 @@ export interface AiExtraction {
checkMediaId?: string | null;
checkMediaURL?: string | null;
Status?: (string | null) | PromiseStatus;
/**
* Set when the upload to Meedan fails permanently. Extractions with this field set will not be retried.
*/
uploadError?: string | null;
id?: string | null;
}[]
| null;
Expand Down Expand Up @@ -1226,6 +1230,7 @@ export interface AiExtractionsSelect<T extends boolean = true> {
checkMediaId?: T;
checkMediaURL?: T;
Status?: T;
uploadError?: T;
id?: T;
};
updatedAt?: T;
Expand Down
69 changes: 51 additions & 18 deletions src/tasks/cleanupFailedJobs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,72 @@ export const CleanupFailedJobs: TaskConfig<"cleanupFailedJobs"> = {

const retentionMs = getRetentionMs();
const cutoffDate = new Date(Date.now() - retentionMs);
const now = new Date().toISOString();

const where: Where = {
// Failed jobs that have been around longer than the retention window
const failedWhere: Where = {
and: [
{ hasError: { equals: true } },
{ completedAt: { less_than: cutoffDate.toISOString() } },
],
};

const { totalDocs } = await payload.count({
collection: "payload-jobs",
where,
});
// Orphaned pending jobs: never attempted, not scheduled for the future, older
// than the retention window. These accumulate when jobs are queued into a queue
// that has no worker (e.g. the "default" queue).
const orphanedWhere: Where = {
and: [
{ processing: { equals: false } },
{ hasError: { equals: false } },
{ totalTried: { equals: 0 } },
{ createdAt: { less_than: cutoffDate.toISOString() } },
{
or: [
{ waitUntil: { exists: false } },
{ waitUntil: { less_than: now } },
],
},
],
};

const [{ totalDocs: failedCount }, { totalDocs: orphanedCount }] =
await Promise.all([
payload.count({ collection: "payload-jobs", where: failedWhere }),
payload.count({ collection: "payload-jobs", where: orphanedWhere }),
]);

if (!totalDocs) {
const retentionHours = retentionMs / (60 * 60 * 1000);

if (!failedCount && !orphanedCount) {
logger.info({
msg: "cleanupFailedJobs:: No failed jobs to delete",
retentionHours: retentionMs / (60 * 60 * 1000),
msg: "cleanupFailedJobs:: No jobs to delete",
retentionHours,
});
return { output: { deleted: 0 } };
return { output: { deletedFailed: 0, deletedOrphaned: 0 } };
}

await payload.delete({
collection: "payload-jobs",
where,
overrideAccess: true,
});
const deleteOps = [];
if (failedCount) {
deleteOps.push(
payload.delete({ collection: "payload-jobs", where: failedWhere, overrideAccess: true }),
);
}
if (orphanedCount) {
deleteOps.push(
payload.delete({ collection: "payload-jobs", where: orphanedWhere, overrideAccess: true }),
);
}
await Promise.all(deleteOps);

logger.info({
msg: "cleanupFailedJobs:: Deleted failed jobs",
retentionHours: retentionMs / (60 * 60 * 1000),
deleted: totalDocs,
msg: "cleanupFailedJobs:: Deleted jobs",
retentionHours,
deletedFailed: failedCount,
deletedOrphaned: orphanedCount,
});

return { output: { deleted: totalDocs } };
return {
output: { deletedFailed: failedCount, deletedOrphaned: orphanedCount },
};
}),
};
95 changes: 70 additions & 25 deletions src/tasks/uploadToMeedan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import { getTaskLogger, withTaskTracing, type TaskInput } from "./utils";
const getExtractionsToUpload = (doc: AiExtractionDoc) => {
return (
doc.extractions
?.filter((extraction) => !extraction.checkMediaId)
?.filter(
(extraction) => !extraction.checkMediaId && !extraction.uploadError,
)
.map(({ category, summary, source, uniqueId }) => ({
category,
summary,
Expand Down Expand Up @@ -148,14 +150,20 @@ const resolveCheckMediaSourceUrl = ({
};

const hasPendingExtractions = (doc: AiExtractionDoc): boolean =>
(doc.extractions ?? []).some((extraction) => !extraction?.checkMediaId);
(doc.extractions ?? []).some(
(extraction) => !extraction?.checkMediaId && !extraction?.uploadError,
);

const hasUploadedExtractions = (doc: AiExtractionDoc): boolean =>
(doc.extractions ?? []).some((extraction) => Boolean(extraction?.checkMediaId));
const hasHandledExtractions = (doc: AiExtractionDoc): boolean =>
(doc.extractions ?? []).some(
(extraction) =>
Boolean(extraction?.checkMediaId) || Boolean(extraction?.uploadError),
);

const checkAndMarkDocumentComplete = async (
doc: Document,
airtableAPIKey?: string | null,
status = "Uploaded to Meedan",
): Promise<void> => {
if (!doc.airtableID || !airtableAPIKey) {
return;
Expand All @@ -164,7 +172,7 @@ const checkAndMarkDocumentComplete = async (
await markDocumentAsProcessed({
airtableAPIKey,
airtableID: doc.airtableID,
status: "Uploaded to Meedan",
status,
});
};

Expand Down Expand Up @@ -244,23 +252,24 @@ export const UploadToMeedan: TaskConfig<"uploadToMeedan"> = {
return false;
}

let hasAnyUploadedPromise = false;
let hasAnyHandledExtraction = false;

for (const extractionDoc of extractionDocs as AiExtractionDoc[]) {
if (hasPendingExtractions(extractionDoc)) {
return false;
}

if (hasUploadedExtractions(extractionDoc)) {
hasAnyUploadedPromise = true;
if (hasHandledExtractions(extractionDoc)) {
hasAnyHandledExtraction = true;
}
}

return hasAnyUploadedPromise;
return hasAnyHandledExtraction;
};

const markDocumentAsProcessedWhenReady = async (
document: Document,
status?: string,
): Promise<boolean> => {
if (!document.id || !document.airtableID || !airtableAPIKey) {
return false;
Expand All @@ -271,7 +280,7 @@ export const UploadToMeedan: TaskConfig<"uploadToMeedan"> = {
return false;
}

await checkAndMarkDocumentComplete(document, airtableAPIKey);
await checkAndMarkDocumentComplete(document, airtableAPIKey, status);
return true;
};

Expand Down Expand Up @@ -330,8 +339,19 @@ export const UploadToMeedan: TaskConfig<"uploadToMeedan"> = {
const extractionsToUpload = getExtractionsToUpload(doc);

if (extractionsToUpload.length === 0) {
const markedAsProcessed =
await markDocumentAsProcessedWhenReady(document);
const totalExtractions = doc.extractions?.length ?? 0;
const alreadyFailedCount = (doc.extractions ?? []).filter(
(e) => e.uploadError,
).length;
const alreadyFailedStatus =
alreadyFailedCount > 0
? `Uploaded to Meedan (${alreadyFailedCount} out of ${totalExtractions} failed)`
: undefined;

const markedAsProcessed = await markDocumentAsProcessedWhenReady(
document,
alreadyFailedStatus,
);

logger.info({
message:
Expand Down Expand Up @@ -525,8 +545,33 @@ export const UploadToMeedan: TaskConfig<"uploadToMeedan"> = {
} catch (extractionError) {
failedExtractions += 1;
docFailedExtractions += 1;
const uploadErrorMessage =
extractionError instanceof Error
? extractionError.message
: String(extractionError);

// Re-fetch before writing so we don't clobber checkMediaId or
// uploadError values written by earlier iterations in this loop.
const freshDoc = await payload.findByID({
collection: "ai-extractions",
id: doc.id,
select: { extractions: true },
});
const updatedExtractions = freshDoc.extractions?.map((ext) => {
if (ext.uniqueId === extraction.uniqueId) {
return { ...ext, uploadError: uploadErrorMessage };
}
return ext;
});
await payload.update({
collection: "ai-extractions",
id: doc.id,
data: { extractions: updatedExtractions },
});

logger.warn({
message: "uploadToMeedan:: Failed uploading extraction",
message:
"uploadToMeedan:: Failed uploading extraction — marked as permanently failed",
extractionDocId: doc.id,
extractionDocTitle: doc.title,
extractionUniqueId: extraction.uniqueId,
Expand All @@ -539,21 +584,14 @@ export const UploadToMeedan: TaskConfig<"uploadToMeedan"> = {
politicalEntityAirtableID: entity?.airtableID,
tenantName: tenant?.name,
tenantCountry: tenant?.country,
error:
extractionError instanceof Error
? extractionError.message
: String(extractionError),
error: uploadErrorMessage,
});
}
}

if (docFailedExtractions > 0) {
await setDocumentFailedStatus(
document.airtableID,
`${docFailedExtractions} of ${extractionsToUpload.length} extraction(s) failed during upload`,
);
logger.warn({
message: `uploadToMeedan:: Document skipped for Airtable completion mark — ${docFailedExtractions}/${extractionsToUpload.length} extraction(s) failed to upload`,
message: `uploadToMeedan:: ${docFailedExtractions} out of ${doc.extractions?.length ?? 0} extraction(s) failed — marked permanently, will not be retried`,
extractionDocId: doc.id,
extractionDocTitle: doc.title,
documentId,
Expand All @@ -563,12 +601,19 @@ export const UploadToMeedan: TaskConfig<"uploadToMeedan"> = {
failedExtractions: docFailedExtractions,
totalExtractionsToUpload: extractionsToUpload.length,
});
continue;
}

const totalExtractions = doc.extractions?.length ?? 0;
const completionStatus =
docFailedExtractions > 0
? `Uploaded to Meedan (${docFailedExtractions} out of ${totalExtractions} failed)`
: undefined;

try {
const markedAsProcessed =
await markDocumentAsProcessedWhenReady(document);
const markedAsProcessed = await markDocumentAsProcessedWhenReady(
document,
completionStatus,
);

if (!markedAsProcessed) {
logger.info({
Expand Down