Skip to content

Periodic JobArgs lost on subsequent invocations #187

@stevecalvert

Description

@stevecalvert

Hi River devs, I'm trying out the periodic job queue. I was hoping to be able to insert a periodic job and have it run at regular intervals, using the PeriodicJobArgs until it's no longer needed. I see the first run of the job with the correct JobArgs values, but subsequent runs no longer have access to the JobArgs. Apologies if this is an obvious omission on my part. I suppose I could get around this by using a single-shot job and re-inserting a new job before exiting the Work() function, but that defeats the object of the periodic job!

Following taken from the periodic job example, with updates to get around no access to internal imports, and adding a transaction to insert a job. Any suggestions welcomed 😄

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	//"github.com/riverqueue/river/internal/riverinternaltest"
	//"github.com/riverqueue/river/internal/util/slogutil"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

type PeriodicJobArgs struct {
	JobVal string
}

// Kind is the unique string name for this job.
func (PeriodicJobArgs) Kind() string { return "periodic" }

// PeriodicJobWorker is a job worker for sorting strings.
type PeriodicJobWorker struct {
	river.WorkerDefaults[PeriodicJobArgs]
}

func (w *PeriodicJobWorker) Work(ctx context.Context, job *river.Job[PeriodicJobArgs]) error {
	fmt.Printf("This job will run once immediately with JobVal:%s then approximately once every 30 secs\n", job.Args.JobVal)
	return nil
}

var job1Args PeriodicJobArgs = PeriodicJobArgs{
	JobVal: "11112222",
}

// Example_periodicJob demonstrates the use of a periodic job.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// // Required for the purpose of this test, but not necessary in real usage.
	// if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
	// 	panic(err)
	// }

	workers := river.NewWorkers()
	river.AddWorker(workers, &PeriodicJobWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		//Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		PeriodicJobs: []*river.PeriodicJob{
			river.NewPeriodicJob(
				river.PeriodicInterval(30*time.Second),
				func() (river.JobArgs, *river.InsertOpts) {
					return PeriodicJobArgs{}, nil
				},
				&river.PeriodicJobOpts{RunOnStart: true},
			),
		},
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 2},
		},
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	tx, err := dbPool.Begin(ctx)
	if err != nil {
		panic(err)
	}
	defer tx.Rollback(ctx)

	// Insert a transaction to run periodically
	_, err = riverClient.InsertTx(ctx, tx, job1Args, nil)
	if err != nil {
		// handle error
		fmt.Printf("Error inserting transaction for jobArg:%s, error:%s\n", job1Args.JobVal, err.Error())
	}
	tx.Commit(ctx)

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	// There's no need to explicitly insert a periodic job. One will be inserted
	// (and worked soon after) as the client starts up.
	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	waitForNJobs(subscribeChan, 10)

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

func waitForNJobs(subscribeChan <-chan *river.Event, numJobs int) {
	var (
		timeout  = 10 * time.Hour
		deadline = time.Now().Add(timeout)
		events   = make([]*river.Event, 0, numJobs)
	)

	for {
		select {
		case event := <-subscribeChan:
			events = append(events, event)

			if len(events) >= numJobs {
				fmt.Printf("events:%d numJobs:%d\n", len(events), numJobs)

				return
			}

		case <-time.After(time.Until(deadline)):
			panic(fmt.Sprintf("WaitOrTimeout timed out after waiting %s (received %d job(s), wanted %d)",
				timeout, len(events), numJobs))
		}
	}
}

Output is:

This job will run once immediately with JobVal:11112222 then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs
This job will run once immediately with JobVal: then approximately once every 30 secs

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions