Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 77 additions & 3 deletions packages/payload/src/queues/localAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,41 @@ export const getJobsLocalAPI = (payload: Payload) => ({
})
},

runByID: async (args: {
runByID: async <T extends Record<string, any> = Record<string, any>>(args: {
id: number | string
overrideAccess?: boolean
req?: PayloadRequest
}): Promise<ReturnType<typeof runJobs>> => {
returnTaskOutput?: boolean
}): Promise<{ taskOutput?: T } & ReturnType<typeof runJobs>> => {
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))

return await runJobs({
const result = await runJobs({
id: args.id,
overrideAccess: args.overrideAccess !== false,
req: newReq,
})

if (args.returnTaskOutput) {
const job = await payload.findByID({
id: args.id,
collection: jobsCollectionSlug,
depth: 0,
req: newReq,
})

const taskOutput = {} as T
if (job?.log?.length) {
job.log.forEach((logEntry) => {
if (logEntry.state === 'succeeded' && logEntry.taskSlug && logEntry.output) {
taskOutput[logEntry.taskSlug as keyof T] = logEntry.output
}
})
}

return Object.assign({}, result, { taskOutput })
}

return result
},

cancel: async (args: {
Expand Down Expand Up @@ -220,4 +243,55 @@ export const getJobsLocalAPI = (payload: Payload) => ({
returning: false,
})
},

queueAndRun: async <
// eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents
TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'],
>(
args:
| {
input: TypedJobs['tasks'][TTaskOrWorkflowSlug]['input']
queue?: string
req?: PayloadRequest
// TTaskOrWorkflowlug with keyof TypedJobs['workflows'] removed:
task: TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] ? TTaskOrWorkflowSlug : never
waitUntil?: Date
workflow?: never
}
| {
input: TypedJobs['workflows'][TTaskOrWorkflowSlug]['input']
queue?: string
req?: PayloadRequest
task?: never
waitUntil?: Date
workflow: TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? TTaskOrWorkflowSlug
: never
},
): Promise<{
job: TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug>
result: {
jobStatus?: Record<string, { status: 'error' | 'error-reached-max-retries' | 'success' }>
noJobsRemaining?: boolean
remainingJobsFromQueried: number
taskOutput?: Record<string, any>
}
}> => {
const job = await payload.jobs.queue(args)

const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))
const result = await payload.jobs.runByID({
id: job.id,
overrideAccess: true,
req: newReq,
returnTaskOutput: true,
})

return {
job,
result,
}
},
})