Skip to content

KAFKA-19478 [3/N]: Use heaps to discover the least loaded process #20172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Jul 15, 2025

The original implementation uses a linear search to find the least
loaded process in O(n), and we can replace this by look-ups in a heap is
O(log(n)), as described below

Active tasks: For active tasks, we can do exactly the same assignment as
in the original algorithm by first building a heap (by load) of all
processes. When we assign a task, we pick the head off the heap, assign
the task to it, update the load, and re-insert it into the heap in
O(log(n)).

Standby tasks: For standby tasks, we cannot do this optimization
directly, because of the order in which we assign tasks:

  1. We first try to assign task A to a process that previously owned A.
  2. If we did not find such a process, we assign A to the least loaded
    node.
  3. We now try to assign task B to a process that previously owned B
  4. If we did not find such a process, we assign B to the least loaded
    node
    ...

The problem is that we cannot efficiently keep a heap (by load)
throughout this process, because finding and removing process that
previously owned A (and B and…) in the heap is O(n). We therefore need
to change the order of evaluation to be able to use a heap:

  1. Try to assign all tasks A, B.. to a process that previously owned the
    task
  2. Build a heap.
  3. Assign all remaining tasks to the least-loaded process that does not
    yet own the task. Since at most NumStandbyReplicas already own the task,
    we can do it by removing up to NumStandbyReplicas from the top of the
    heap in O(log(n)), so we get O(log(NumProcesses)*NumStandbyReplicas).

Note that the change in order changes the resulting standby assignments
(although this difference does not show up in the existing unit tests).
I would argue that the new order of assignment will actually yield
better assignments, since the assignment will be more sticky, which has
the potential to reduce the amount of store we have to restore from the
changelog topic after assingments.

In our worst-performing benchmark, this improves the runtime by ~107x.

lucasbru added 2 commits July 21, 2025 14:42
The original implementation uses a linear search to find the least loaded process in O(n), and we can replace this by look-ups in a heap is O(log(n)), as described below

Active tasks: For active tasks, we can do exactly the same assignment as in the original algorithm by first building a heap (by load) of all processes. When we assign a task, we pick the head off the heap, assign the task to it, update the load, and re-insert it into the heap in O(log(n)).

Standby tasks: For standby tasks, we cannot do this optimization directly, because of the order in which we assign tasks:

1. We first try to assign task A to a process that previously owned A.
2. If we did not find such a process, we assign A to the least loaded node.
3. We now try to assign task B to a process that previously owned B
4. If we did not find such a process, we assign B to the least loaded node
   ...

The problem is that we cannot efficiently keep a heap (by load) throughout this process, because finding and removing process that previously owned A (and B and…) in the heap is O(n). We therefore need to change the order of evaluation to be able to use a heap:

1. Try to assign all tasks A, B.. to a process that previously owned the task
2. Build a heap.
3. Assign all remaining tasks to the least-loaded process that does not yet own the task. Since at most NumStandbyReplicas already own the task, we can do it by removing up to NumStandbyReplicas from the top of the heap in O(log(n)), so we get O(log(NumProcesses)*NumStandbyReplicas).

Note that the change in order changes the resulting standby assignments (although this difference does not show up in the existing unit tests). I would argue that the new order of assignment will actually yield better assignments, since the assignment will be more sticky, which has the potential to reduce the amount of store we have to restore from the changelog topic after assingments.

In our worst-performing benchmark, this improves the runtime by ~107x.
@lucasbru lucasbru force-pushed the jmh_benchmarks_third_opt branch from 94707f5 to 50b029d Compare July 21, 2025 12:42
final double load = process.load();
boolean isLeastLoadedProcess = localState.processIdToState.values().stream()
.allMatch(p -> p.load() >= load);
return process.hasCapacity() || isLeastLoadedProcess;
}

private void updateHelpers(final Member member, final boolean isActive) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just inlined this function

@@ -301,21 +348,13 @@ private String errorMessage(final int numStandbyReplicas, final int i, final Tas
" of " + numStandbyReplicas + " standby tasks for task [" + task + "].";
}

private boolean isLoadBalanced(final String processId) {
final ProcessState process = localState.processIdToState.get(processId);
private boolean isLoadBalanced(final ProcessState process) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: passing the process in here saves us from looking it up in the hashmap again.

updateHelpers(prevMember, true);
ProcessState processState = localState.processIdToState.get(prevMember.processId);
processState.addTask(prevMember.memberId, task, true);
maybeUpdateTasksPerMember(processState.activeTaskCount());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just inlining updateHelpers

@@ -115,7 +117,7 @@ private void initialize(final GroupSpec groupSpec, final TopologyDescriber topol
Set<Integer> partitionNoSet = entry.getValue();
for (int partitionNo : partitionNoSet) {
TaskId taskId = new TaskId(entry.getKey(), partitionNo);
localState.standbyTaskToPrevMember.putIfAbsent(taskId, new HashSet<>());
localState.standbyTaskToPrevMember.putIfAbsent(taskId, new ArrayList<>());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: no need to deduplicate here, so I'd rather just use arrays

it.remove();
}
}

// 3. assign any remaining unassigned tasks
PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
processByLoad.addAll(localState.processIdToState.values());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial build of the priority queue by load

}

private boolean hasUnfulfilledQuota(final Member member) {
return localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId) < localState.tasksPerMember;
}

private void assignStandby(final Set<TaskId> standbyTasks, final int numStandbyReplicas) {
ArrayList<StandbyToAssign> toLeastLoaded = new ArrayList<>(standbyTasks.size() * numStandbyReplicas);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This arrayList is used to store all standby tasks that we couldn't assign to a member that previously owned that task, and needs to be assigned to the "least loaded" node.

*
* @return Previous member with the least load that deoes not have the task, or null if no such member exists.
*/
private Member findPrevMemberWithLeastLoad(final ArrayList<Member> members, final TaskId taskId) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

findPrevMemberWithLeastLoad works very similar to the old findMemberWithLeastLoad - that is, it does a linear search among a collection of candidates.

However, since we don't use it anymore to find the least loaded node among all members anymore - we use a priority queue there.

This is only used to select the least loaded node among all members that previously owned the task. I replaced the Java Streams based iteration with a loop, since it's more efficient.

if (streamsProtocolEnabled) {
props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
// decrease the session timeout so that we can trigger the rebalance soon after old client left closed
CLUSTER.setGroupSessionTimeout(appId, 10000);
CLUSTER.setGroupHeartbeatTimeout(appId, 1000);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The integration test set the session timeout and the heartbeat interval incorrectly before in the new protocol. We need to set it on the group level.

This sometimes made the test flaky with the new assignment algorithm, since we iteratively cycle out the "oldest" member, and tend to assign the tasks from the next-oldest member. But, due to the high session timeout and heartbeat timeout, it could sometimes take too long for the new member to get the new tasks assigned, before being cycled out as well.

I don't see a problem in the assignment logic here - actually, it seems useful to assign tasks to "old" members, since they are stable. We are just cycling out the tasks to quickly in this integration test.

@@ -401,6 +401,8 @@ public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(final Map<Stri

private void addDefaultBrokerPropsIfAbsent(final Properties brokerConfig) {
brokerConfig.putIfAbsent(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, 2 * 1024 * 1024L);
brokerConfig.putIfAbsent(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, "100");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For integration tests, we want to use rather extreme values so that they run quickly.

@@ -99,7 +99,7 @@ private Properties streamsConfiguration(final boolean streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
if (streamsProtocolEnabled) {
streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
CLUSTER.setStandbyReplicas("app-" + safeTestName, 1);
CLUSTER.setGroupStandbyReplicas("app-" + safeTestName, 1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just renamed

@lucasbru lucasbru requested review from bbejeck, Copilot and mjsax July 21, 2025 13:07
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a heap-based optimization for the Kafka Streams task assignor to improve performance by replacing O(n) linear searches with O(log n) heap operations when finding the least loaded process. The optimization uses priority queues to efficiently manage process load ordering during both active and standby task assignment.

Key changes:

  • Replace linear search with heap-based lookups for finding least loaded processes
  • Refactor standby task assignment order to enable heap optimization
  • Add utility methods and data structures to support the heap-based approach

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.

File Description
StickyTaskAssignor.java Core algorithmic changes implementing heap-based task assignment with new data structures and methods
EmbeddedKafkaCluster.java Add utility methods for configuring group timeouts and rename existing method for consistency
StandbyTaskCreationIntegrationTest.java Update method call to use renamed setGroupStandbyReplicas method
SmokeTestDriverIntegrationTest.java Refactor timeout configuration to use new group-specific methods when streams protocol is enabled

Comment on lines +203 to 207
throw new TaskAssignorException("No process available to assign active task {}." + task);
}
String member = memberWithLeastLoad(processWithLeastLoad);
if (member == null) {
log.error("Unable to assign active task {} to any member.", task);
throw new TaskAssignorException("No member available to assign active task {}." + task);
Copy link
Preview

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message format is incorrect. The placeholder '{}' is not being used properly with string concatenation. Should be either 'No process available to assign active task ' + task + '.' or use proper string formatting.

Copilot uses AI. Check for mistakes.

Comment on lines +203 to 207
throw new TaskAssignorException("No process available to assign active task {}." + task);
}
String member = memberWithLeastLoad(processWithLeastLoad);
if (member == null) {
log.error("Unable to assign active task {} to any member.", task);
throw new TaskAssignorException("No member available to assign active task {}." + task);
Copy link
Preview

Copilot AI Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message format is incorrect. The placeholder '{}' is not being used properly with string concatenation. Should be either 'No member available to assign active task ' + task + '.' or use proper string formatting.

Copilot uses AI. Check for mistakes.

…roup/streams/assignor/StickyTaskAssignor.java

Co-authored-by: Copilot <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant