Skip to content

Fixed the issues of consumer getting irresponsive due to inactivity #2

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 1.4.0
(18 May 2018)
- fixed the issues of consumer getting irresponsive due to inactivity

### 0.5.2:
(June 14, 2017)

Expand Down
9 changes: 5 additions & 4 deletions easyjoblite/consumers/base_rmq_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from kombu import Consumer
from kombu import Producer

from easyjoblite.utils import enqueue, is_main_thread
from easyjoblite.utils import enqueue, is_main_thread, get_default_heartbeat_interval, get_default_prefetch_count


class BaseRMQConsumer(object):
Expand Down Expand Up @@ -49,7 +49,7 @@ def get_config(self):
def should_run_loop(self):
return self._should_block

def start_connection(self):
def start_connection(self, heartbeat_interval=get_default_heartbeat_interval()):
"""
reset the connection to rabbit mq
:return:
Expand All @@ -59,13 +59,13 @@ def start_connection(self):

# todo: do we need to make confirm_publish configurable?
self._conn = Connection(self.get_config().rabbitmq_url,
transport_options={'confirm_publish': True})
transport_options={'confirm_publish': True}, heartbeat=heartbeat_interval)

# setup producer to push to error and dlqs
self._producer = Producer(channel=self._conn.channel(),
exchange=self._orchestrator.get_exchange())

def start_rmq_consume(self):
def start_rmq_consume(self, prefetch_count=get_default_prefetch_count()):
"""
start consuming from rmq
:return:
Expand All @@ -79,6 +79,7 @@ def start_rmq_consume(self):
self._queue_consumer = Consumer(channel=channel,
queues=[self._from_queue],
callbacks=[self.process_message])
self._queue_consumer.qos(prefetch_count=prefetch_count)
self._queue_consumer.consume()

def rmq_reset(self):
Expand Down
4 changes: 3 additions & 1 deletion easyjoblite/consumers/retry_queue_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from easyjoblite import constants
from easyjoblite.consumers.base_rmq_consumer import BaseRMQConsumer
from easyjoblite.job import EasyJob
from easyjoblite.utils import get_default_prefetch_count


class RetryQueueConsumer(BaseRMQConsumer):
Expand Down Expand Up @@ -96,7 +97,7 @@ def _shoveller(self, body, message):

logger.debug("shoveller: moved {t}:'{d}' work-item to buffer queue".format(t=job.tag, d=body))

def _shovel_to_buffer(self, from_queue):
def _shovel_to_buffer(self, from_queue, prefetch_count=get_default_prefetch_count()):
"""
poor man's alternative to the shovel plugin

Expand All @@ -111,6 +112,7 @@ def _shovel_to_buffer(self, from_queue):
queue_consumer = Consumer(channel=channel,
queues=[from_queue],
callbacks=[self._shoveller])
queue_consumer.qos(prefetch_count=prefetch_count)
queue_consumer.consume()

# finally drain all the work items from error-queue into shoveller
Expand Down
6 changes: 3 additions & 3 deletions easyjoblite/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from easyjoblite.consumers.work_queue_consumer import WorkQueueConsumer
from easyjoblite.exception import EasyJobServiceNotStarted
from easyjoblite.job import EasyJob
from easyjoblite.utils import enqueue, is_main_thread, stop_all_workers
from easyjoblite.utils import enqueue, is_main_thread, stop_all_workers, get_default_heartbeat_interval


class Orchestrator(object):
Expand Down Expand Up @@ -106,7 +106,7 @@ def create_all_consumers(self, is_detached=False):
self.create_consumer(constants.DEAD_LETTER_QUEUE, is_detached)
time.sleep(2)

def setup_entities(self):
def setup_entities(self, heartbeat_interval=get_default_heartbeat_interval()):
"""
declare all required entities
no advanced error handling yet (like error on declaration with altered properties etc)
Expand Down Expand Up @@ -147,7 +147,7 @@ def setup_entities(self):

# todo: do we need to make confirm_publish configurable?
self._conn = Connection(self.get_config().rabbitmq_url,
transport_options={'confirm_publish': True})
transport_options={'confirm_publish': True}, heartbeat=heartbeat_interval)

# declare all the exchanges and queues needed (declare, not overwrite existing)
for entity in [self._booking_exchange, self.work_queue, self.retry_queue,
Expand Down
34 changes: 33 additions & 1 deletion easyjoblite/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,37 @@ def update_import_paths(import_paths):
sys.path = import_paths.split(':') + sys.path


def enqueue(producer, queue_type, job, body):
def get_default_retry_policy():
"""
Gets the default retry policy of the producer
:return: the default retry policy
"""
retry_policy = {
'interval_start': 0, # First retry immediately,
'interval_step': 2, # then increase by 2s for every retry.
'interval_max': 30, # but don't exceed 30s between retries.
'max_retries': 3, # give up after 3 tries.
}
return retry_policy


def get_default_heartbeat_interval():
"""
gets the frequency at which the connections should hearbeat
:return: the number of seconds in which to heart beat
"""
return 5


def get_default_prefetch_count():
"""
gets the number of messages prefetched from the queue by default
:return: number of messages to be prefetched
"""
return 50


def enqueue(producer, queue_type, job, body, retry=True, retry_policy=get_default_retry_policy()):
"""
enque a job in the given queue
:param producer: the producer to be used
Expand All @@ -188,4 +218,6 @@ def enqueue(producer, queue_type, job, body):
producer.publish(body=body,
headers=headers,
routing_key=routing_key,
retry=retry,
retry_policy=retry_policy,
delivery_mode=PERSISTENT_DELIVERY_MODE)
2 changes: 1 addition & 1 deletion easyjoblite/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

VERSION = '0.7.1'
VERSION = '1.4.0'