diff --git a/kafka/src/kafka/views.py b/kafka/src/kafka/views.py index 435584e..4c67d1d 100644 --- a/kafka/src/kafka/views.py +++ b/kafka/src/kafka/views.py @@ -28,17 +28,20 @@ def my_listener(state): if state == KazooState.LOST: # Register somewhere that the session was lost + print("I'm Lost") elif state == KazooState.SUSPENDED: # Handle being disconnected from Zookeeper + print("I'm Suspended") else: # Handle being connected/reconnected to Zookeeper + print("I'm Connected/reconnected") def _get_topology(): topology = CLUSTERS.get() clusters = [] for cluster in topology: zk = KazooClient(hosts=CLUSTERS[cluster].ZK_HOST_PORTS.get()) - zk.add_listener(my_listener) + #zk.add_listener(my_listener) zk.start() brokers = _get_brokers(zk,cluster) consumer_groups = _get_consumer_groups(zk,cluster) @@ -58,7 +61,7 @@ def _get_topology(): def _get_cluster_topology(cluster): zk = KazooClient(hosts=cluster['zk_host_ports']) - zk.add_listener(my_listener) + #zk.add_listener(my_listener) zk.start() brokers = _get_brokers(zk,cluster['id']) consumer_groups = _get_consumer_groups(zk,cluster['id']) @@ -93,7 +96,7 @@ def _get_consumer_groups(zk, cluster): def _get_topics(cluster): zk = KazooClient(hosts=cluster['zk_host_ports']) - zk.add_listener(my_listener) + #zk.add_listener(my_listener) zk.start() topic_list = [] try: @@ -130,7 +133,7 @@ def _get_topics(cluster): def _get_consumers(cluster): zk = KazooClient(hosts=cluster['zk_host_ports']) - zk.add_listener(my_listener) + #zk.add_listener(my_listener) zk.start() groups = _get_consumer_groups(zk,cluster['id']) consumer_groups = [] @@ -220,7 +223,7 @@ def consumer_groups(request, cluster_id): def consumer_group(request, cluster_id, group_id): cluster = get_cluster_or_404(id=cluster_id) zk = KazooClient(hosts=cluster['zk_host_ports']) - zk.add_listener(my_listener) + #zk.add_listener(my_listener) zk.start() consumer_group = _get_consumer_group(zk=zk,cluster=cluster,group_id=group_id) zk.stop()