@@ -75,30 +75,24 @@ def consume(self, channel, **options): # skipcq
75
75
)
76
76
schema = schema .connection (channel )
77
77
while True :
78
- jobs = schema .table ("queue_jobs" ).where ("ran_at" , None ).get ()
78
+ builder = schema .table ("queue_jobs" )
79
+ jobs = builder .where_null ("ran_at" ).where (schema .table ("queue_jobs" ).where_null ('wait_until' ).or_where ('wait_until' , '<=' , pendulum .now ().to_datetime_string ())).limit (1 ).get ()
80
+
79
81
if not jobs .count ():
80
82
time .sleep (5 )
81
83
82
84
for job in jobs :
85
+ builder .where ("id" , job ["id" ]).update (
86
+ {
87
+ "ran_at" : pendulum .now ().to_datetime_string (),
88
+ }
89
+ )
83
90
unserialized = pickle .loads (job .serialized )
84
91
obj = unserialized ["obj" ]
85
92
args = unserialized ["args" ]
86
93
callback = unserialized ["callback" ]
87
94
ran = job .attempts
88
95
89
- wait_time = job ["wait_until" ]
90
-
91
- if not job ["wait_until" ]:
92
- wait_time = pendulum .now ()
93
- else :
94
- if isinstance (wait_time , str ):
95
- wait_time = pendulum .parse (job ["wait_until" ])
96
- else :
97
- wait_time = pendulum .instance (job ["wait_until" ])
98
-
99
- # print(job['wait_until'], wait_time.is_future())
100
- if job ["wait_until" ] and wait_time .is_future ():
101
- continue
102
96
try :
103
97
try :
104
98
if inspect .isclass (obj ):
@@ -111,7 +105,7 @@ def consume(self, channel, **options): # skipcq
111
105
112
106
try :
113
107
# attempts = 1
114
- schema . table ( "queue_jobs" ) .where ("id" , job ["id" ]).update (
108
+ builder .where ("id" , job ["id" ]).update (
115
109
{
116
110
"ran_at" : pendulum .now ().to_datetime_string (),
117
111
"attempts" : job ["attempts" ] + 1 ,
@@ -125,7 +119,7 @@ def consume(self, channel, **options): # skipcq
125
119
126
120
if not obj .run_again_on_fail :
127
121
# ch.basic_ack(delivery_tag=method.delivery_tag)
128
- schema . table ( "queue_jobs" ) .where ("id" , job ["id" ]).update (
122
+ builder .where ("id" , job ["id" ]).update (
129
123
{
130
124
"ran_at" : pendulum .now ().to_datetime_string (),
131
125
"failed" : 1 ,
@@ -135,12 +129,12 @@ def consume(self, channel, **options): # skipcq
135
129
136
130
if ran < obj .run_times and isinstance (obj , Queueable ):
137
131
time .sleep (1 )
138
- schema . table ( "queue_jobs" ) .where ("id" , job ["id" ]).update (
139
- {"attempts" : job ["attempts" ] + 1 , }
132
+ builder .where ("id" , job ["id" ]).update (
133
+ {"attempts" : job ["attempts" ] + 1 }
140
134
)
141
135
continue
142
136
else :
143
- schema . table ( "queue_jobs" ) .where ("id" , job ["id" ]).update (
137
+ builder .where ("id" , job ["id" ]).update (
144
138
{
145
139
"attempts" : job ["attempts" ] + 1 ,
146
140
"ran_at" : pendulum .now ().to_datetime_string (),
0 commit comments