Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RIP-74] Rocketmq-flink dynamic load balancing #126

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

3424672656
Copy link

import java.util.concurrent.atomic.AtomicBoolean;

public class SpinLock {
private AtomicBoolean lock = new AtomicBoolean(true);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why default value is true?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, the lock is free

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is suggested to reverse

@@ -26,6 +25,8 @@
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;

import com.alibaba.fastjson.JSON;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Configure IDE, avoid unnecessary changes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink use specific code style, could use "mvn clean install"

// The internal states of the enumerator.
// This set is only accessed by the partition discovery callable in the callAsync() method.
// The current assignment by reader id. Only accessed by the coordinator thread.
// The discovered and initialized partition splits that are waiting for owner reader to be
// ready.
private final Set<MessageQueue> allocatedSet;
private final Map<MessageQueue, Byte> allocatedSet;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need rename allocatedSet?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This object may be manipulated by multiple threads

import java.util.concurrent.atomic.AtomicBoolean;

public class SpinLock {
private AtomicBoolean lock = new AtomicBoolean(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is suggested to reverse

// check
SourceCheckEvent sourceCheckEvent = new SourceCheckEvent();
sourceCheckEvent.setAssignedMq(currentOffsetTable.keySet());
sourceReaderContext.sendSourceEventToCoordinator(sourceCheckEvent);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send event to coordinator too frequency

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done~,It has been changed to a scheduled task

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants