Skip to content

mq.receive.max.poll.time.ms, to limit the maximum time spent polling messages in a Kafka Connect task cycle #153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ The configuration options for the Kafka Connect source connector for IBM MQ are
| `mq.message.receive.timeout` | The timeout (in milliseconds) for receiving messages from the queue manager before returning to Kafka Connect. | long | 2000 | 1 or greater |
| `mq.reconnect.delay.min.ms` | The minimum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 64 | 1 or greater |
| `mq.reconnect.delay.max.ms` | The maximum delay (in milliseconds) for reconnecting to the queue manager after a connection error. | long | 8192 | 1 or greater |
| `mq.receive.max.poll.time.ms` | Maximum time (in milliseconds) to poll messages in a single Kafka Connect task cycle. If set to 0, polling continues until batch size or a receive returns null. | long | 0 | 0 or greater |
| `errors.deadletterqueue.topic.name` | The name of the Kafka topic to use as the dead letter queue (DLQ) for poison messages that fail during processing within the record builder component of the connector. | string | | If left blank (default), failed messages will not be written to a DLQ. |
| `errors.deadletterqueue.context.headers.enable` | Whether to add error context headers to messages written to the DLQ. | boolean | false | When enabled, additional headers describing the error will be included with each DLQ record. |

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1299,4 +1299,152 @@ public void verifyLoggingErrorsWithMessageHavingDefaultRecordBuilder() throws Ex
assertThat(processedRecords.get(0).topic()).isEqualTo("__dlq.mq.source");
assertThat(processedRecords.get(1).topic()).isEqualTo("mytopic");
}

public void testMaxPollTimeTerminates() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorConfigProps.put("mq.message.receive.timeout", "150");
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10");
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200");
connectorConfigProps.put("mq.batch.size", "5000");

final JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(connectorConfigProps));
final JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(connectorConfigProps));
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);

connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);

final List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);

final List<SourceRecord> kafkaMessages = connectTask.poll();

final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
assertThat(stateMsgs1.size()).isEqualTo(1);
final List<Message> sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
assertThat(sourceMsgs.size()).isEqualTo(0);
assertEquals(30, kafkaMessages.size());
}

@Test
public void testMaxPollTimeTerminatesBatchEarly() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> connectorConfigProps = createExactlyOnceConnectorProperties();
connectorConfigProps.put("mq.message.body.jms", "true");
connectorConfigProps.put("mq.record.builder",
"com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
connectorConfigProps.put("mq.message.receive.timeout", "100");
connectorConfigProps.put("mq.receive.subsequent.timeout.ms", "10");
connectorConfigProps.put("mq.receive.max.poll.time.ms", "200"); // stop after 200ms
connectorConfigProps.put("mq.batch.size", "5000");

final JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(connectorConfigProps));
final JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(connectorConfigProps));
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);

connectTask.start(connectorConfigProps, shared, dedicated, sequenceStateClient);

final List<Message> messages = createAListOfMessages(getJmsContext(), 10, "msg ");
putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, messages);
final long start = System.nanoTime();
final List<SourceRecord> kafkaMessages = connectTask.poll();
final long durationMs = (System.nanoTime() - start) / 1_000_000;

// Poll should end close to 200ms
assertThat(durationMs <= 210).isTrue();

final List<Message> stateMsgs1 = browseAllMessagesFromQueue(DEFAULT_STATE_QUEUE);
assertThat(stateMsgs1.size()).isEqualTo(1);
final List<Message> sourceMsgs = getAllMessagesFromQueue(DEFAULT_SOURCE_QUEUE);
assertThat(sourceMsgs.size()).isEqualTo(0);
assertEquals(10, kafkaMessages.size());
}

@Test
public void testPollEndsWhenBatchSizeReached() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();

final Map<String, String> config = createExactlyOnceConnectorProperties();
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
config.put("mq.message.receive.timeout", "100");
config.put("mq.receive.subsequent.timeout.ms", "10");
config.put("mq.receive.max.poll.time.ms", "1000");
config.put("mq.batch.size", "10");

final JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(config));
final JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(config));
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
connectTask.start(config, shared, dedicated, sequenceStateClient);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 12, "msg "));

final long start = System.nanoTime();
connectTask.poll();
final long durationMs = (System.nanoTime() - start) / 1_000_000;

assertThat(durationMs < 1000).isTrue();
}

@Test
public void testPollWithMaxPollTimeZeroBehavesAsDefault() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();
final Map<String, String> config = createExactlyOnceConnectorProperties();
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
config.put("mq.message.receive.timeout", "400");
config.put("mq.receive.max.poll.time.ms", "0");
config.put("mq.batch.size", "100");

final JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(config));
final JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(config));
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
connectTask.start(config, shared, dedicated, sequenceStateClient);

// putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 3, "msg "));

final long start = System.nanoTime();
final List<SourceRecord> records = connectTask.poll();
final long durationMs = (System.nanoTime() - start) / 1_000_000;

assertThat(durationMs >= 400 && durationMs <= 450).isTrue();
assertEquals(0, records.size());
}

@Test
public void testPollWithShortMaxPollTime() throws Exception {
connectTask = getSourceTaskWithEmptyKafkaOffset();
final Map<String, String> config = createExactlyOnceConnectorProperties();
config.put("mq.record.builder", "com.ibm.eventstreams.connect.mqsource.builders.DefaultRecordBuilder");
config.put("mq.receive.max.poll.time.ms", "50");
config.put("mq.message.receive.timeout", "1");
config.put("mq.receive.subsequent.timeout.ms", "0");
config.put("mq.batch.size", "5000");

final JMSWorker shared = new JMSWorker();
shared.configure(getPropertiesConfig(config));
final JMSWorker dedicated = new JMSWorker();
dedicated.configure(getPropertiesConfig(config));
final SequenceStateClient sequenceStateClient = new SequenceStateClient(DEFAULT_STATE_QUEUE, shared, dedicated);
connectTask.start(config, shared, dedicated, sequenceStateClient);

putAllMessagesToQueue(DEFAULT_SOURCE_QUEUE, createAListOfMessages(getJmsContext(), 100, "msg "));

final List<SourceRecord> records = connectTask.poll();

assertThat(records.size() < 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ public class MQSourceConnector extends SourceConnector {
public static final long CONFIG_RECONNECT_DELAY_MAX_DEFAULT = 8192L;
public static final long CONFIG_RECONNECT_DELAY_MAX_MINIMUM = 10L;

public static final String CONFIG_MAX_POLL_TIME = "mq.receive.max.poll.time.ms";
public static final String CONFIG_DOCUMENTATION_MAX_POLL_TIME = "Maximum time (in milliseconds) to poll for messages during a single Kafka Connect poll cycle. "
+ "Acts as a hard upper bound on how long the task will try to accumulate a batch. "
+ "If set to 0 or not defined, polling continues until either a message receive returns null or the batch size is met. "
+ "Note: It is recommended to keep this value less than or equal to both 'mq.message.receive.timeout' "
+ "and 'mq.receive.subsequent.timeout.ms' to avoid unexpected delays due to long blocking receive calls.";
public static final String CONFIG_DISPLAY_MAX_POLL_TIME = "Max poll time (ms)";
public static final long CONFIG_MAX_POLL_TIME_DEFAULT = 0L;

public static final String DLQ_PREFIX = "errors.deadletterqueue.";

public static final String DLQ_TOPIC_NAME_CONFIG = DLQ_PREFIX + "topic.name";
Expand Down Expand Up @@ -609,7 +618,7 @@ null, new ReadableFile(),
CONFIG_DISPLAY_MAX_RECEIVE_TIMEOUT);
CONFIGDEF.define(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT,
ConfigDef.Type.LONG,
CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT,
CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT, ConfigDef.Range.atLeast(CONFIG_SUBSEQUENT_RECEIVE_TIMEOUT_DEFAULT),
ConfigDef.Importance.LOW,
CONFIG_DOCUMENTATION_SUBSEQUENT_RECEIVE_TIMEOUT,
CONFIG_GROUP_MQ,
Expand Down Expand Up @@ -648,6 +657,15 @@ null, new ReadableFile(),
CONFIG_GROUP_MQ, 31,
Width.MEDIUM,
DLQ_CONTEXT_HEADERS_ENABLE_DISPLAY);
CONFIGDEF.define(CONFIG_MAX_POLL_TIME,
ConfigDef.Type.LONG,
CONFIG_MAX_POLL_TIME_DEFAULT, ConfigDef.Range.atLeast(CONFIG_MAX_POLL_TIME_DEFAULT),
ConfigDef.Importance.LOW,
CONFIG_DOCUMENTATION_MAX_POLL_TIME,
CONFIG_GROUP_MQ,
32,
ConfigDef.Width.MEDIUM,
CONFIG_DISPLAY_MAX_POLL_TIME);

CONFIGDEF.define(CONFIG_NAME_TOPIC,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,18 @@
*/
package com.ibm.eventstreams.connect.mqsource;

import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException;
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState;
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateClient;
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateException;
import com.ibm.eventstreams.connect.mqsource.util.LogMessages;
import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor;
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_BLOCKED_TIME_MS;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_TIME;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.NORMAL_OPERATION;
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REDELIVER_UNSENT_BATCH;
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REMOVE_DELIVERED_MESSAGES_FROM_SOURCE_QUEUE;
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.DELIVERED;
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.IN_FLIGHT;

import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -47,24 +42,32 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_MAX_POLL_BLOCKED_TIME_MS;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_BATCH_SIZE;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_EXACTLY_ONCE_STATE_QUEUE;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_NAME_MQ_QUEUE_MANAGER;
import static com.ibm.eventstreams.connect.mqsource.MQSourceConnector.CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.Message;

import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REMOVE_DELIVERED_MESSAGES_FROM_SOURCE_QUEUE;
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.NORMAL_OPERATION;
import static com.ibm.eventstreams.connect.mqsource.MQSourceTaskStartUpAction.REDELIVER_UNSENT_BATCH;
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.DELIVERED;
import static com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState.LastKnownState.IN_FLIGHT;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ibm.eventstreams.connect.mqsource.builders.RecordBuilderException;
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceState;
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateClient;
import com.ibm.eventstreams.connect.mqsource.sequencestate.SequenceStateException;
import com.ibm.eventstreams.connect.mqsource.util.ExceptionProcessor;
import com.ibm.eventstreams.connect.mqsource.util.LogMessages;
import com.ibm.eventstreams.connect.mqsource.util.QueueConfig;

public class MQSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(MQSourceTask.class);

// The maximum number of records returned per call to poll()
private int batchSize = CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;
// The maximum time to spend polling messages before returning a batch
private long maxPollTime = CONFIG_VALUE_MQ_BATCH_SIZE_DEFAULT;

// Used to signal completion of a batch
// After returning a batch of messages to Connect, the SourceTask waits
Expand Down Expand Up @@ -174,6 +177,7 @@ protected void start(final Map<String, String> props, final JMSWorker reader, fi
startUpAction = NORMAL_OPERATION;

batchSize = config.getInt(CONFIG_NAME_MQ_BATCH_SIZE);
maxPollTime = config.getLong(CONFIG_MAX_POLL_TIME);
try {
reader.configure(config);
reader.connect();
Expand Down Expand Up @@ -415,22 +419,31 @@ private void initOrResetBatchCompleteSignal(final boolean predicate, final List<

private List<Message> pollSourceQueue(final int numberOfMessagesToBePolled) throws JMSException {
final List<Message> localList = new ArrayList<>();

if (!stopNow.get()) {
log.debug("Polling for records");
Message message;
do {
message = reader.receive(sourceQueue, sourceQueueConfig, localList.size() == 0);
if (message != null) {
localList.add(message);
}
} while (message != null && localList.size() < numberOfMessagesToBePolled && !stopNow.get());
} else {
if (stopNow.get()) {
log.info("Stopping polling for records");
return localList;
}

log.debug("Polling for records");
final long startTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final long startTime = System.currentTimeMillis();
final long pollEndTime = System.currentTimeMillis() + maxPollTime;


Message message;
do {
message = reader.receive(sourceQueue, sourceQueueConfig, localList.isEmpty());
if (message != null) {
localList.add(message);
}
} while (
message != null &&
localList.size() < numberOfMessagesToBePolled &&
!stopNow.get() &&
(maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
(maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime)
(maxPollTime <= 0 || (System.currentTimeMillis() < pollEndTime)

);

return localList;
}


private boolean isFirstMsgOnSourceQueueARequiredMsg(final List<String> msgIds) throws JMSException {
final Message message = reader.browse(sourceQueue).get();
return msgIds.contains(message.getJMSMessageID());
Expand Down