Skip to content

Replace kafka-python patch __next__ with poll #3682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

junhao69535
Copy link

Description

Previously, only patch __next__ was used, which caused poll to fail. The __next__ method actually calls the poll method, so the patch poll method is used instead.

    def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
        assert timeout_ms >= 0, 'Timeout must not be negative'
        if max_records is None:
            max_records = self.config['max_poll_records']
        assert isinstance(max_records, int), 'max_records must be an integer'
        assert max_records > 0, 'max_records must be positive'
        assert not self._closed, 'KafkaConsumer is closed'

        timer = Timer(timeout_ms)
        while not self._closed:
            records = self._poll_once(timer, max_records,
                                      update_offsets=update_offsets)
            if records:
                return records
            elif timer.expired:
                break
        return {}
    
    def _message_generator_v2(self):
        timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())
        record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
        for tp, records in six.iteritems(record_map):
            for record in records:
                if not self._subscription.is_fetchable(tp):
                    log.debug("Not returning fetched records for partition %s"
                              " since it is no longer fetchable", tp)
                    break
                self._subscription.assignment[tp].position = OffsetAndMetadata(record.offset + 1, '', -1)
                yield record

    def __next__(self):
        if self._closed:
            raise StopIteration('KafkaConsumer closed')
        self._set_consumer_timeout()
        while time.time() < self._consumer_timeout:
            if not self._iterator:
                self._iterator = self._message_generator_v2()
            try:
                return next(self._iterator)
            except StopIteration:
                self._iterator = None
        raise StopIteration()

Type of change

Please delete options that are not relevant.

  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

How Has This Been Tested?

Add a new test at opentelemetry.instrumentation.kafka.tests.test_utils.test_wrap_poll.

  • opentelemetry.instrumentation.kafka.tests.test_utils.test_wrap_poll

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

Copy link

linux-foundation-easycla bot commented Aug 6, 2025

CLA Not Signed

@junhao69535 junhao69535 force-pushed the feature/replace_kafka_python_patch branch from 5bfdf96 to 9da04aa Compare August 6, 2025 12:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant