-
Hello, I think I am not facing a package issue but rather a skill issue. I don't understand why .poll() is blocking my program. When I run the code below, nothing happens on the output however when I add the sys.stdout.flush() calls, it starts printing. from confluent_kafka import Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import sys
bootstrap_servers = 'broker-1:19092,broker-2:19092,broker-3:19092'
conf = {'bootstrap.servers': bootstrap_servers,
'group.id': 'availability-tool',
'enable.auto.commit': 'true',
'auto.offset.reset': 'earliest'}
consumer = Consumer(conf)
TOPICS = ['asset_state']
consumer.subscribe(TOPICS)
admin = AdminClient({'bootstrap.servers': bootstrap_servers})
print("List of topics in Kafka")
for topic in admin.list_topics().topics:
print(f"{topic}")
print(f"List of topics for {consumer}")
for topic in consumer.list_topics().topics:
print(f"{topic}")
# sys.stdout.flush()
while True:
msg = consumer.poll(2.0)
if msg is None:
print("...")
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
# sys.stdout.flush()
consumer.close() I tested my program with kafka official console binaries to see that messages where sent to the consumer and they were. I did restart multiple times my program but even after a clean start, it isn't working Thanks in advance. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
Also, the most optimal way I found to have efficient output is this : while True:
sys.stdout.flush() # call to stdout flush() HERE
msg = consumer.poll(2.0)
if msg is None:
print("...")
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
consumer.close() If anyone have some time to help me figure it out that would be nice ! |
Beta Was this translation helpful? Give feedback.
print call is stuck, using a logger fixes it. So poll() is not at fault.