Skip to content

Commit

Permalink
Support heartbeat to maintain machine state (#76)
Browse files Browse the repository at this point in the history
* add MachineIdDistributor#guard api

* refactor `AbstractMachineIdDistributor`

* add `MachineIdGuarder` api

* add Default MachineId Guarder implementation

* fix unit test

* impl Guard MachineId

* add TODO Adapt Guard Machine Id.(`ZookeeperMachineIdDistributor`)

* Add design drawings to JavaDoc

* add **cosid-test** module for design Spec

* Adapt `cosid-spring-boot-starter`

* - add `DistributeIdempotent` spec
- refactor `DistributorSpec`

* Adapt Guard Machine Id.

* refactor log-level for `LocalMachineStateStorage`

* refactor code-style
  • Loading branch information
Ahoo-Wang authored May 7, 2022
1 parent 60a144e commit fdc733b
Show file tree
Hide file tree
Showing 102 changed files with 2,172 additions and 752 deletions.
7 changes: 6 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ val serverProjects = setOf(
project(":cosid-example")
)

val testProject = project(":cosid-test")
val publishProjects = subprojects - serverProjects
val libraryProjects = publishProjects - bomProjects

Expand All @@ -40,6 +41,7 @@ ext {
set("springfoxVersion", "3.0.0")
set("jmhVersion", "1.34")
set("junitPioneerVersion", "1.4.2")
set("hamcrestVersion", "2.2")
set("mybatisVersion", "3.5.7")
set("mybatisBootVersion", "2.1.4")
set("coskyVersion", "1.3.20")
Expand Down Expand Up @@ -133,6 +135,7 @@ configure(libraryProjects) {
add("testImplementation", "org.junit.jupiter:junit-jupiter-api")
add("testImplementation", "org.junit.jupiter:junit-jupiter-params")
add("testImplementation", "org.junit-pioneer:junit-pioneer")
add("testImplementation", "org.hamcrest:hamcrest")
add("testRuntimeOnly", "org.junit.jupiter:junit-jupiter-engine")
add("jmh", "org.openjdk.jmh:jmh-core:${rootProject.ext.get("jmhVersion")}")
add("jmh", "org.openjdk.jmh:jmh-generator-annprocess:${rootProject.ext.get("jmhVersion")}")
Expand Down Expand Up @@ -215,7 +218,9 @@ fun getPropertyOf(name: String) = project.properties[name]?.toString()
tasks.register<JacocoReport>("codeCoverageReport") {
executionData(fileTree(project.rootDir.absolutePath).include("**/build/jacoco/*.exec"))
libraryProjects.forEach {
sourceSets(it.sourceSets.main.get())
if (testProject != it) {
sourceSets(it.sourceSets.main.get())
}
}
reports {
xml.required.set(true)
Expand Down
4 changes: 4 additions & 0 deletions cosid-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
testImplementation(project(":cosid-test"))
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
33 changes: 17 additions & 16 deletions cosid-core/src/main/java/me/ahoo/cosid/segment/IdSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,36 @@

import javax.annotation.concurrent.ThreadSafe;


/**
* Id Segment.
*
* <p><img src="../doc-files/SegmentId.png" alt="SegmentId"></p>
*
* @author ahoo wang
*/
@ThreadSafe
public interface IdSegment extends Comparable<IdSegment> {

long SEQUENCE_OVERFLOW = -1;

long TIME_TO_LIVE_FOREVER = Long.MAX_VALUE;

/**
* ID segment fetch time.
* unit {@link java.util.concurrent.TimeUnit#SECONDS}
*
* @return fetch time
*/
long getFetchTime();

long getMaxId();

long getOffset();

long getSequence();

long getStep();

/**
* the id segment time to live.
* unit {@link java.util.concurrent.TimeUnit#SECONDS}
Expand All @@ -55,7 +56,7 @@ public interface IdSegment extends Comparable<IdSegment> {
default long getTtl() {
return TIME_TO_LIVE_FOREVER;
}

/**
* id segment has expired?.
*
Expand All @@ -71,15 +72,15 @@ default boolean isExpired() {
}
return Clock.CACHE.secondTime() - getFetchTime() > getTtl();
}

default boolean isOverflow() {
return getSequence() >= getMaxId();
}

default boolean isOverflow(long nextSeq) {
return nextSeq == SEQUENCE_OVERFLOW || nextSeq > getMaxId();
}

/**
* not expired and not overflow.
*
Expand All @@ -88,17 +89,17 @@ default boolean isOverflow(long nextSeq) {
default boolean isAvailable() {
return !isExpired() && !isOverflow();
}

long incrementAndGet();

@Override
default int compareTo(IdSegment other) {
if (getOffset() == other.getOffset()) {
return 0;
}
return getOffset() > other.getOffset() ? 1 : -1;
}

default void ensureNextIdSegment(IdSegment nextIdSegment) throws NextIdSegmentExpiredException {
if (compareTo(nextIdSegment) >= 0) {
throw new NextIdSegmentExpiredException(this, nextIdSegment);
Expand Down
57 changes: 29 additions & 28 deletions cosid-core/src/main/java/me/ahoo/cosid/segment/SegmentChainId.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,24 @@

/**
* Segment chain algorithm ID generator.
* <p><img src="../doc-files/SegmentChainId.png" alt="SegmentChainId"></p>
*
* @author ahoo wang
*/
@Slf4j
public class SegmentChainId implements SegmentId {
public static final int DEFAULT_SAFE_DISTANCE = 10;

private final long idSegmentTtl;
private final int safeDistance;
private final IdSegmentDistributor maxIdDistributor;
private final PrefetchJob prefetchJob;
private volatile IdSegmentChain headChain = IdSegmentChain.newRoot();

public SegmentChainId(IdSegmentDistributor maxIdDistributor) {
this(TIME_TO_LIVE_FOREVER, DEFAULT_SAFE_DISTANCE, maxIdDistributor, PrefetchWorkerExecutorService.DEFAULT);
}

public SegmentChainId(long idSegmentTtl, int safeDistance, IdSegmentDistributor maxIdDistributor, PrefetchWorkerExecutorService prefetchWorkerExecutorService) {
Preconditions.checkArgument(idSegmentTtl > 0, Strings.lenientFormat("Illegal idSegmentTtl parameter:[%s].", idSegmentTtl));
Preconditions.checkArgument(safeDistance > 0, "The safety distance must be greater than 0.");
Expand All @@ -52,11 +53,11 @@ public SegmentChainId(long idSegmentTtl, int safeDistance, IdSegmentDistributor
prefetchJob = new PrefetchJob(headChain);
prefetchWorkerExecutorService.submit(prefetchJob);
}

public IdSegmentChain getHead() {
return headChain;
}

/**
* No lock, because it is not important, as long as the {@link #headChain} is trending forward.
* -----
Expand All @@ -78,11 +79,11 @@ private void forward(IdSegmentChain forwardChain) {
headChain = forwardChain;
}
}

private IdSegmentChain generateNext(IdSegmentChain previousChain, int segments) {
return maxIdDistributor.nextIdSegmentChain(previousChain, segments, idSegmentTtl);
}

@Override
public long generate() {
while (true) {
Expand All @@ -97,10 +98,10 @@ public long generate() {
}
currentChain = currentChain.getNext();
}

try {
final IdSegmentChain preIdSegmentChain = headChain;

if (preIdSegmentChain.trySetNext((preChain) -> generateNext(preChain, safeDistance))) {
IdSegmentChain nextChain = preIdSegmentChain.getNext();
forward(nextChain);
Expand All @@ -116,7 +117,7 @@ public long generate() {
this.prefetchJob.hungry();
}
}

public class PrefetchJob implements AffinityJob {
private static final int MAX_PREFETCH_DISTANCE = 100_000_000;
/**
Expand All @@ -132,44 +133,44 @@ public class PrefetchJob implements AffinityJob {
* @see java.util.concurrent.TimeUnit#SECONDS
*/
private volatile long lastHungerTime;

public PrefetchJob(IdSegmentChain tailChain) {
this.tailChain = tailChain;
}

@Override
public String getJobId() {
return maxIdDistributor.getNamespacedName();
}

@Override
public void setHungerTime(long hungerTime) {
lastHungerTime = hungerTime;
}

@Override
public PrefetchWorker getPrefetchWorker() {
return prefetchWorker;
}

@Override
public void setPrefetchWorker(PrefetchWorker prefetchWorker) {
if (this.prefetchWorker != null) {
return;
}
this.prefetchWorker = prefetchWorker;
}

@Override
public void run() {
prefetch();
}

public void prefetch() {

long wakeupTimeGap = Clock.CACHE.secondTime() - lastHungerTime;
final boolean hunger = wakeupTimeGap < hungerThreshold;

final int prePrefetchDistance = this.prefetchDistance;
if (hunger) {
this.prefetchDistance = Math.min(Math.multiplyExact(this.prefetchDistance, 2), MAX_PREFETCH_DISTANCE);
Expand All @@ -184,7 +185,7 @@ public void prefetch() {
}
}
}

IdSegmentChain availableHeadChain = SegmentChainId.this.headChain;
while (!availableHeadChain.getIdSegment().isAvailable()) {
availableHeadChain = availableHeadChain.getNext();
Expand All @@ -193,32 +194,32 @@ public void prefetch() {
break;
}
}

forward(availableHeadChain);

final int headToTailGap = availableHeadChain.gap(tailChain, maxIdDistributor.getStep());
final int safeGap = safeDistance - headToTailGap;

if (safeGap <= 0 && !hunger) {
if (log.isTraceEnabled()) {
log.trace("prefetch - [{}] - safeGap is less than or equal to 0, and is not hungry - headChain.version:[{}] - tailChain.version:[{}].", maxIdDistributor.getNamespacedName(),
availableHeadChain.getVersion(), tailChain.getVersion());
}
return;
}

final int prefetchSegments = hunger ? this.prefetchDistance : safeGap;

appendChain(availableHeadChain, prefetchSegments);
}

private void appendChain(IdSegmentChain availableHeadChain, int prefetchSegments) {

if (log.isDebugEnabled()) {
log.debug("appendChain - [{}] - headChain.version:[{}] - tailChain.version:[{}] - prefetchSegments:[{}].", maxIdDistributor.getNamespacedName(), availableHeadChain.getVersion(),
tailChain.getVersion(), prefetchSegments);
}

try {
final IdSegmentChain preTail = tailChain;
tailChain = tailChain.ensureSetNext((preChain) -> generateNext(preChain, prefetchSegments)).getNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class ExactCollection<E> extends AbstractCollection<E> implements RandomA
private static final ExactCollection EMPTY = new ExactCollection(0);

@SuppressWarnings("unchecked")
public static final <E> ExactCollection<E> empty() {
public static <E> ExactCollection<E> empty() {
return (ExactCollection<E>) EMPTY;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
/**
* Interval Timeline.
*
* <p><img src="../doc-files/CosIdIntervalShardingAlgorithm.png" alt="CosIdIntervalShardingAlgorithm"></p>
*
* @author ahoo wang
*/
@ThreadSafe
Expand Down
2 changes: 2 additions & 0 deletions cosid-core/src/main/java/me/ahoo/cosid/sharding/ModCycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
/**
* ModCycle.
*
* <p><img src="../doc-files/CosIdModShardingAlgorithm.png" alt="CosIdModShardingAlgorithm"></p>
*
* @author ahoo wang
*/
public class ModCycle<T extends Number & Comparable<T>> implements Sharding<T> {
Expand Down
8 changes: 5 additions & 3 deletions cosid-core/src/main/java/me/ahoo/cosid/sharding/Sharding.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@
/**
* Sharding algorithm interface.
*
* <p><img src="../doc-files/Sharding.png" alt="Sharding"></p>
*
* @author ahoo wang
*/
@ThreadSafe
public interface Sharding<T extends Comparable<?>> {

String sharding(T shardingValue);

Collection<String> sharding(Range<T> shardingValue);

Collection<String> getEffectiveNodes();
}
Loading

0 comments on commit fdc733b

Please sign in to comment.