Skip to content
This repository was archived by the owner on Jun 5, 2023. It is now read-only.

UTC datetime #70

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -3,4 +3,7 @@ build/
.coverage
*.egg-info
logs/
dist/
dist/

# PyCharm
/.idea
26 changes: 17 additions & 9 deletions pyres/__init__.py
Original file line number Diff line number Diff line change
@@ -3,7 +3,8 @@
from redis import Redis
import pyres.json_parser as json

import time, datetime
from datetime import datetime
import calendar
import sys
import logging

@@ -238,9 +239,12 @@ def close(self):
self.redis.connection.disconnect()

def enqueue_at(self, datetime, klass, *args, **kwargs):
"""
datetime: if it is a naive datetime, it's regarded in UTC; otherwise, it will be converted to a naive datetime in UTC
"""
datetime = self._to_naive_utc_datetime(datetime)
class_name = '%s.%s' % (klass.__module__, klass.__name__)
logging.info("enqueued '%s' job for execution at %s" % (class_name,
datetime))
logging.info("enqueued '%s' job for execution at %s" % (class_name, datetime))
if args:
logging.debug("job arguments are: %s" % str(args))
payload = {'class':class_name, 'queue': klass.queue, 'args':args}
@@ -249,7 +253,7 @@ def enqueue_at(self, datetime, klass, *args, **kwargs):
self.delayed_push(datetime, payload)

def delayed_push(self, datetime, item):
key = int(time.mktime(datetime.timetuple()))
key = int(calendar.timegm(datetime.utctimetuple()))
self.redis.rpush('resque:delayed:%s' % key, ResQ.encode(item))
self.redis.zadd('resque:delayed_queue_schedule', key, key)

@@ -268,11 +272,10 @@ def delayed_queue_schedule_size(self):
return size

def delayed_timestamp_size(self, timestamp):
#key = int(time.mktime(timestamp.timetuple()))
return self.redis.llen("resque:delayed:%s" % timestamp)

def next_delayed_timestamp(self):
key = int(time.mktime(ResQ._current_time().timetuple()))
key = int(calendar.timegm(ResQ._utcnow().utctimetuple()))
array = self.redis.zrangebyscore('resque:delayed_queue_schedule',
'-inf', key)
timestamp = None
@@ -281,7 +284,6 @@ def next_delayed_timestamp(self):
return timestamp

def next_item_for_timestamp(self, timestamp):
#key = int(time.mktime(timestamp.timetuple()))
key = "resque:delayed:%s" % timestamp
ret = self.redis.lpop(key)
item = None
@@ -312,8 +314,14 @@ def _enqueue(cls, klass, *args):
_self.push(queue, {'class':class_name,'args':args})

@staticmethod
def _current_time():
return datetime.datetime.now()
def _utcnow():
return datetime.utcnow()

@staticmethod
def _to_naive_utc_datetime(datetime):
# naive datetime is regarded as UTC datetime
timestamp = calendar.timegm(datetime.utctimetuple())
return datetime.utcfromtimestamp(timestamp)


class Stat(object):
6 changes: 3 additions & 3 deletions pyres/extensions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import os
import datetime
from datetime import datetime
import time
import signal

@@ -28,11 +28,11 @@ def work(self, interval=5):

if self.child:
print 'Forked %s at %s' % (self.child,
datetime.datetime.now())
datetime.utcnow())
os.waitpid(self.child, 0)
else:
print 'Processing %s since %s' % (job._queue,
datetime.datetime.now())
datetime.utcnow())
self.process(job)
os._exit(0)
self.child = None
4 changes: 2 additions & 2 deletions pyres/failure/redis.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import datetime, time
from datetime import datetime
from base64 import b64encode

from base import BaseBackend
@@ -12,7 +12,7 @@ def save(self, resq=None):
if not resq:
resq = ResQ()
data = {
'failed_at' : datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S'),
'failed_at' : datetime.utcnow().strftime('%Y/%m/%d %H:%M:%S'),
'payload' : self._payload,
'exception' : self._exception.__class__.__name__,
'error' : self._parse_message(self._exception),
8 changes: 4 additions & 4 deletions pyres/horde.py
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
sys.exit("multiprocessing was not available")

import time, os, signal
import datetime
from datetime import datetime
import logging
import logging.handlers
from pyres import ResQ, Stat
@@ -67,7 +67,7 @@ def register_signal_handlers(self):

def register_minion(self):
self.resq.redis.sadd('resque:minions',str(self))
self.started = datetime.datetime.now()
self.started = datetime.utcnow()

def startup(self):
self.register_signal_handlers()
@@ -107,7 +107,7 @@ def working_on(self, job):
self.logger.debug('marking as working on')
data = {
'queue': job._queue,
'run_at': int(time.mktime(datetime.datetime.now().timetuple())),
'run_at': int(time.time()),
'payload': job._payload
}
data = json.dumps(data)
@@ -238,7 +238,7 @@ def register_khan(self):
if not hasattr(self, 'resq'):
self.setup_resq()
self.resq.redis.sadd('resque:khans',str(self))
self.started = datetime.datetime.now()
self.started = datetime.utcnow()

def _check_commands(self):
if not self._shutdown:
2 changes: 1 addition & 1 deletion pyres/job.py
Original file line number Diff line number Diff line change
@@ -72,7 +72,7 @@ def retry(self, payload_class, args):
retry_timeout = getattr(payload_class, 'retry_timeout', 0)

if retry_every:
now = ResQ._current_time()
now = ResQ._utcnow()
first_attempt = self._payload.get("first_attempt", now)
retry_until = first_attempt + timedelta(seconds=retry_timeout)
retry_at = now + timedelta(seconds=retry_every)
25 changes: 11 additions & 14 deletions pyres/worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import signal
import datetime, time
from datetime import datetime
import calendar
import time
import os, sys
import json_parser as json
import commands
@@ -46,22 +48,17 @@ def validate_queues(self):

def register_worker(self):
self.resq.redis.sadd('resque:workers', str(self))
#self.resq._redis.add("worker:#{self}:started", Time.now.to_s)
self.started = datetime.datetime.now()
self.started = datetime.utcnow()

def _set_started(self, dt):
if dt:
key = int(time.mktime(dt.timetuple()))
key = int(calendar.timegm(dt.utctimetuple()))
self.resq.redis.set("resque:worker:%s:started" % self, key)
else:
self.resq.redis.delete("resque:worker:%s:started" % self)

def _get_started(self):
datestring = self.resq.redis.get("resque:worker:%s:started" % self)
#ds = None
#if datestring:
# ds = datetime.datetime.strptime(datestring, '%Y-%m-%d %H:%M:%S')
return datestring
return self.resq.redis.get("resque:worker:%s:started" % self)

started = property(_get_started, _set_started)

@@ -143,9 +140,9 @@ def work(self, interval=5):
setproctitle("pyres_worker%s: Forked %s at %s" %
(__version__,
self.child,
datetime.datetime.now()))
datetime.utcnow()))
logger.info('Forked %s at %s' % (self.child,
datetime.datetime.now()))
datetime.utcnow()))

try:
os.waitpid(self.child, 0)
@@ -159,9 +156,9 @@ def work(self, interval=5):
else:
setproctitle("pyres_worker-%s: Processing %s since %s" %
(__version__, job._queue,
datetime.datetime.now()))
datetime.utcnow()))
logger.info('Processing %s since %s' %
(job._queue, datetime.datetime.now()))
(job._queue, datetime.utcnow()))
self.after_fork(job)
self.process(job)
os._exit(0)
@@ -222,7 +219,7 @@ def working_on(self, job):
logger.debug('marking as working on')
data = {
'queue': job._queue,
'run_at': str(int(time.mktime(datetime.datetime.now().timetuple()))),
'run_at': str(int(time.time())),
'payload': job._payload
}
data = json.dumps(data)
12 changes: 6 additions & 6 deletions resweb/views.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
from pyres.worker import Worker as Wrkr
from pyres import failure
import os
import datetime
from datetime import datetime

TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), 'templates')
class ResWeb(pystache.View):
@@ -105,7 +105,7 @@ def workers(self):
if 'queue' in data:
item['data'] = True
item['code'] = data['payload']['class']
item['runat'] = str(datetime.datetime.fromtimestamp(float(data['run_at'])))
item['runat'] = str(datetime.utcfromtimestamp(float(data['run_at'])))
else:
item['data'] = False
item['nodata'] = not item['data']
@@ -155,7 +155,7 @@ def workers(self):
if 'queue' in data:
item['data'] = True
item['code'] = data['payload']['class']
item['runat'] = str(datetime.datetime.fromtimestamp(float(data['run_at'])))
item['runat'] = str(datetime.utcfromtimestamp(float(data['run_at'])))
else:
item['data'] = False
item['nodata'] = not item['data']
@@ -401,7 +401,7 @@ def code(self):
def runat(self):
data = self._worker.processing()
if self.data():
return str(datetime.datetime.fromtimestamp(float(data['run_at'])))
return str(datetime.utcfromtimestamp(float(data['run_at'])))
return ''

"""
@@ -445,7 +445,7 @@ def size(self):
def jobs(self):
jobs = []
for timestamp in self.resq.delayed_queue_peek(self.start(), self.end()):
t = datetime.datetime.fromtimestamp(float(timestamp))
t = datetime.utcfromtimestamp(float(timestamp))
item = dict(timestamp=str(timestamp))
item['size'] = str(self.resq.delayed_timestamp_size(timestamp))

@@ -467,7 +467,7 @@ def __init__(self, host, timestamp, start=0):
super(DelayedTimestamp, self).__init__(host)

def formated_timestamp(self):
return str(datetime.datetime.fromtimestamp(float(self._timestamp)))
return str(datetime.utcfromtimestamp(float(self._timestamp)))

def start(self):
return str(self._start)
3 changes: 1 addition & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import unittest
import os
from pyres import ResQ, str_to_class

class Basic(object):
@@ -32,7 +31,7 @@ class RetryOnExceptionJob(object):

@staticmethod
def perform(fail_until):
if ResQ._current_time() < fail_until:
if ResQ._utcnow() < fail_until:
raise Exception("Don't blame me! I'm supposed to fail!")
else:
return True
2 changes: 1 addition & 1 deletion tests/test_horde.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from tests import PyResTests, Basic, TestProcess
from tests import PyResTests
from pyres import horde
import os

2 changes: 1 addition & 1 deletion tests/test_jobs.py
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ def test_fail(self):
assert self.redis.llen('resque:failed') == 1

def test_date_arg_type(self):
dt = datetime.now().replace(microsecond=0)
dt = datetime.utcnow().replace(microsecond=0)
self.resq.enqueue(ReturnAllArgsJob, dt)
job = Job.reserve('basic',self.resq)
result = job.perform()
4 changes: 2 additions & 2 deletions tests/test_json.py
Original file line number Diff line number Diff line change
@@ -10,13 +10,13 @@ def test_encode_decode_date(self):
assert decoded['dt'] == dt

def test_dates_in_lists(self):
dates = [datetime.now() for i in range(50)]
dates = [datetime.utcnow() for i in range(50)]
decoded = json.loads(json.dumps(dates))
for value in dates:
assert isinstance(value, datetime)

def test_dates_in_dict(self):
dates = dict((i, datetime.now()) for i in range(50))
dates = dict((i, datetime.utcnow()) for i in range(50))
decoded = json.loads(json.dumps(dates))
for i, value in dates.items():
assert isinstance(i, int)
Loading