Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pulse re-runs finished jobs after a process restart #57

Open
SergChr opened this issue Oct 8, 2024 · 7 comments
Open

Pulse re-runs finished jobs after a process restart #57

SergChr opened this issue Oct 8, 2024 · 7 comments
Assignees
Labels
bug Something isn't working

Comments

@SergChr
Copy link

SergChr commented Oct 8, 2024

Description

I have a Pulse job that's scheduled to run every midnight on week days (repeatInterval: "0 0 * * 1-5"). When it's finished, Pulse leaves lockedAt: null and lastRunAt with the last run date in the job document (inside pulseJobs MongoDB collection).

Once I restart a Node.js process, Pulse picks up this job again no matter the actual time (the job should run only at midnight). And it happens with every restart, even though the job gets completed, it's not interrupted.

Code example

Pulse client configuration

const config: PulseConfig = {
    processEvery: '30 seconds',
    resumeOnRestart: true,
    defaultLockLimit: 1,
    defaultLockLifetime: 180_000,
};

export const initPulseClient = (db: Db): void => {
    pulse = new Pulse({
        mongo: db,
        ...config,
    });
};

The job configuration

pulse.define(
        'MY_PULSE_JOB',
        async (_job, done) => {
            try {
                await myPulseJob(); // could be any function
                done();
            } catch (error) {
                const msg = 'Failed during publishing signals';
                logger.error(error, msg);
                done(new Error(msg));
            }
        },
        {
            concurrency: 1,
            shouldSaveResult: true,
            lockLifetime: 180_000,
            attempts: 3,
            backoff: {
                type: 'exponential',
                delay: 3_000,
            },
        },
    );

pulse.every('0 0 * * 1-5', 'MY_PULSE_JOB');

Pulse debug logs

pulse:db_init init database collection using name [pulseJobs] +0ms
  pulse:resumeOnRestart Pulse.resumeOnRestart() +0ms
  pulse:db_init attempting index creation +1ms
  pulse:define job [MY_PULSE_JOB] defined with following options: 
  pulse:define {
  pulse:define   fn: [AsyncFunction (anonymous)],
  pulse:define   concurrency: 1,
  pulse:define   lockLimit: 1,
  pulse:define   priority: 0,
  pulse:define   lockLifetime: 180000,
  pulse:define   running: 0,
  pulse:define   locked: 0,
  pulse:define   shouldSaveResult: true,
  pulse:define   attempts: 3,
  pulse:define   backoff: { type: 'exponential', delay: 3000 }
  pulse:define } +0ms
  pulse:resumeOnRestart resuming unfinished 1 jobs(2024-10-07T11:47:13.294Z) +98ms
  pulse:db_init index creation success +575ms
  pulse:start Pulse.start called, creating interval to call processJobs every [30000ms] +0ms
  pulse:every Pulse.every(0 0 * * 1-5, 'MY_PULSE_JOB', undefined) +0ms
  pulse:create Pulse.create(MY_PULSE_JOB, [Object]) +0ms
  pulse:job [MY_PULSE_JOB:undefined] computing next run via interval [0 0 * * 1-5] +0ms
  pulse:job [MY_PULSE_JOB:undefined] nextRunAt set to [2024-10-07T20:00:00.000Z] +9ms
  pulse:saveJob attempting to save a job into Pulse instance +0ms
  pulse:saveJob [job undefined] set job props: 
  pulse:saveJob {
  pulse:saveJob   name: 'MY_PULSE_JOB',
  pulse:saveJob   data: {},
  pulse:saveJob   priority: 0,
  pulse:saveJob   shouldSaveResult: true,
  pulse:saveJob   attempts: 3,
  pulse:saveJob   backoff: { type: 'exponential', delay: 3000 },
  pulse:saveJob   type: 'single',
  pulse:saveJob   nextRunAt: 2024-10-07T20:00:00.000Z,
  pulse:saveJob   repeatInterval: '0 0 * * 1-5',
  pulse:saveJob   repeatTimezone: null,
  pulse:saveJob   startDate: null,
  pulse:saveJob   endDate: null,
  pulse:saveJob   skipDays: null,
  pulse:saveJob   lastModifiedBy: undefined
  pulse:saveJob } +1ms
  pulse:saveJob current time stored as 2024-10-07T11:47:13.882Z +0ms
  pulse:saveJob job with type of "single" found +0ms
  pulse:saveJob calling findOneAndUpdate() with job name and type of "single" as query +0ms
  pulse:internal:processJobs starting to process jobs: [unknownName:unknownId] +0ms
  pulse:internal:processJobs queuing up job to process: [MY_PULSE_JOB] +0ms
  pulse:internal:processJobs jobQueueFilling: MY_PULSE_JOB isJobQueueFilling: false +0ms
  pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = true +0ms
  pulse:internal:_findAndLockNextJob _findAndLockNextJob(MY_PULSE_JOB, [Function]) +0ms
  pulse:internal:_findAndLockNextJob found a job available to lock, creating a new job on Pulse with id [6703c2b8f3d5afdf65e69436] +76ms
  pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = true +78ms
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] job locked while filling queue +0ms
  pulse:internal:processJobs jobQueueFilling: MY_PULSE_JOB isJobQueueFilling: true +0ms
  pulse:internal:processJobs job [MY_PULSE_JOB] lock status: shouldLock = false +0ms
  pulse:internal:processJobs lock limit reached in queue filling for [MY_PULSE_JOB] +0ms
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] about to process job +0ms
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] nextRunAt is in the past, run the job immediately +0ms

# Custom logs to see the `job` object
# src/utils/process-jobs.ts:246
# {
#  'job.attrs.nextRunAt': 2024-10-07T11:47:13.294Z,
#  now: 2024-10-07T11:47:13.961Z
# }
# ^ at this point, Pulse overrides `nextRunAt`, so it replaces the midnight rule to `now` 

# job.attrs
# {
#     _id: new ObjectId('6703c2b8f3d5afdf65e69436'),
#     name: 'MY_PULSE_JOB',
#     attempts: 3,
#     backoff: { type: 'exponential', delay: 3000 },
#     data: {},
#     endDate: null,
#     priority: 0,
#     repeatInterval: '0 0 * * 1-5',
#     repeatTimezone: null,
#     shouldSaveResult: true,
#     skipDays: null,
#     startDate: null,
#     finishedCount: 4,
#     lastFinishedAt: 2024-10-07T11:46:41.174Z,
#     runCount: 4,
#     lockedAt: 2024-10-07T11:47:13.884Z,
#     type: 'single',
#     nextRunAt: 2024-10-07T11:47:13.294Z
#   }
  pulse:internal:processJobs [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] processing job +2ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] setting lastRunAt to: 2024-10-07T11:47:13.964Z +0ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] computing next run via interval [0 0 * * 1-5] +83ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] nextRunAt set to [2024-10-07T20:00:00.000Z] +1ms
  pulse:saveJob attempting to save a job into Pulse instance +83ms
  pulse:saveJob [job 6703c2b8f3d5afdf65e69436] set job props: 
  pulse:saveJob {
  pulse:saveJob   name: 'MY_PULSE_JOB',
  pulse:saveJob   attempts: 3,
  pulse:saveJob   backoff: { type: 'exponential', delay: 3000 },
  pulse:saveJob   data: {},
  pulse:saveJob   endDate: null,
  pulse:saveJob   priority: 0,
  pulse:saveJob   repeatInterval: '0 0 * * 1-5',
  pulse:saveJob   repeatTimezone: null,
  pulse:saveJob   shouldSaveResult: true,
  pulse:saveJob   skipDays: null,
  pulse:saveJob   startDate: null,
  pulse:saveJob   finishedCount: 4,
  pulse:saveJob   lastFinishedAt: 2024-10-07T11:46:41.174Z,
  pulse:saveJob   runCount: 4,
  pulse:saveJob   lockedAt: 2024-10-07T11:47:13.884Z,
  pulse:saveJob   type: 'single',
  pulse:saveJob   nextRunAt: 2024-10-07T20:00:00.000Z,
  pulse:saveJob   lastRunAt: 2024-10-07T11:47:13.964Z,
  pulse:saveJob   lastModifiedBy: undefined
  pulse:saveJob } +0ms
  pulse:saveJob current time stored as 2024-10-07T11:47:13.965Z +0ms
  pulse:saveJob job already has _id, calling findOneAndUpdate() using _id as query +0ms
  pulse:saveJob processDbResult() called with success, checking whether to process job immediately or not +2ms
  pulse:saveJob processDbResult() called with success, checking whether to process job immediately or not +73ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] starting job +76ms
  pulse:job [MY_PULSE_JOB:6703c2b8f3d5afdf65e69436] process function being called +0ms

Additional context

Node.js version: 20.12.2 (also happens on 22.x).
MongoDB version: 7.0.
MongoDB Node driver: 6.5.0.

@SergChr SergChr added the bug Something isn't working label Oct 8, 2024
@SergChr SergChr changed the title Pulse re-run finished jobs after a process restart Pulse re-runs finished jobs after a process restart Oct 8, 2024
@code-xhyun
Copy link
Contributor

code-xhyun commented Oct 11, 2024

We tested it under the same conditions as yours, but it could not be reproduced.

@andrewdcato
Copy link

For what it's worth, I'm seeing similar behavior with the following dependency versions:

Pulse version: 1.6.3
Node.js version: 18.x LTS
MongoDB version: 5.0
MongoDB Node driver: 6.10.0

@Nickk4
Copy link

Nickk4 commented Nov 22, 2024

I'm experiencing the same issue. Using pulse 1.6.6, Node 20.18.0, Mongoose 8.8.2.
lockedAt remains null after successfully completing the job.

const pulse = new Pulse({
  mongo: db.connection.db,
  collection: 'jobs',
  defaultConcurrency: 1,
  maxConcurrency: 2,
  processEvery: '2 minutes',
  resumeOnRestart: true,
});

For example, I have the job below in my database. This job is complete but on every server restart, it gets executed again.

{
  "_id": {
    "$oid": "67406ee23a0dbbb08e3b9fd8"
  },
  "name": "process-upon",
  "data": {
    "documentId": {
      "$oid": "67406ea53a0dbbb08e3b9fcf"
    }
  },
  "priority": 0,
  "shouldSaveResult": false,
  "attempts": 0,
  "backoff": null,
  "type": "normal",
  "nextRunAt": null,
  "runCount": 2,
  "finishedCount": 2,
  "lastFinishedAt": {
    "$date": "2024-11-22T11:46:40.328Z"
  },
  "lockedAt": null,
  "lastModifiedBy": null,
  "lastRunAt": {
    "$date": "2024-11-22T11:46:37.106Z"
  }
}

To temporarily deal with the issue, I've added at the top of my job:

if (job.attrs.finishedCount && job.attrs.finishedCount > 0 && !job.attrs.nextRunAt) {
  console.log('Skipping already completed job:', job.attrs.name);
  return done();
}

@Nickk4
Copy link

Nickk4 commented Dec 5, 2024

I wonder if the behavior is coming from using mongo: db.connection.db, which uses an existing database connection, instead of using db: { address: dbUri, collection: 'jobs' } as per docs. Perhaps others with this issue are also doing something similar...?

@andrewdcato
Copy link

I wonder if the behavior is coming from using mongo: db.connection.db, which uses an existing database connection, instead of using db: { address: dbUri, collection: 'jobs' } as per docs. Perhaps others with this issue are also doing something similar...?

I am also instantiating pulse with mongo: db.connection.db, so there may be something here?

I assumed using this would work as it did in Agenda, given the docs' assertion that this project is a continuation of Agenda...mostly seems to be the case, with the exception of this bug.

@simonnilsson
Copy link

Having the same issue.

Pulse version: 1.6.7
Node.js version: 22.x LTS
MongoDB version: 5.0
MongoDB Node driver: 6.12.0

Pulse initialized with:

const pulse = new Pulse({
  db: {
    address: "mongodb://localhost:27017/db",
    collection: "jobs",
    options: { auth: { /* ... */ } },
  },
});

Jobs are created like this:

const job = pulse.create("job-name", { /* ... */ });
job.repeatEvery('0 12 * * *', { timezone: "..." });
job.computeNextRunAt();
await job.save();

Restarting application makes all jobs run again. Even if their last run was completed succesfully. The same code worked well in Agenda.

Adding resumeOnRestart: false to initialization prevents the jobs from being run at startup, so seems related to that functionality.

@simonnilsson
Copy link

The code for resumeOnRestart checks for if lockedAt exists. On all my Jobs lockedAt has been set to null, leading to all jobs being returned by the query and having their nextRunAt set to now.

lockedAt: { $exists: true },
nextRunAt: { $ne: null },
$or: [{ $expr: { $eq: ['$runCount', '$finishedCount'] } }, { lastFinishedAt: { $exists: false } }],

From my understanding lockedAt should either be a date or not set. But in job run it gets set to null?

this.attrs.lockedAt = null;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants