Skip to content

Commit

Permalink
feat(stream): implement MemoryMetadataManager (#854)
Browse files Browse the repository at this point in the history
Signed-off-by: SSpirits <[email protected]>
  • Loading branch information
ShadowySpirits authored Dec 26, 2023
1 parent c591925 commit 59716a7
Show file tree
Hide file tree
Showing 4 changed files with 539 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,87 @@
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.s3.metadata.StreamState;
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
import com.automq.stream.s3.objects.CommitStreamSetObjectResponse;
import com.automq.stream.s3.objects.CompactStreamObjectRequest;
import com.automq.stream.s3.objects.ObjectManager;
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.objects.StreamObject;
import com.automq.stream.s3.streams.StreamManager;
import com.automq.stream.utils.FutureUtil;

import java.util.Collections;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;

public class MemoryMetadataManager implements StreamManager, ObjectManager {
private final static AtomicLong NODE_ID_ALLOC = new AtomicLong();

// Data structure of stream metadata
private final AtomicLong streamIdAlloc = new AtomicLong();
private final ConcurrentMap<Long, StreamMetadata> streams = new ConcurrentHashMap<>();

// Data structure of object metadata
private final AtomicLong objectIdAlloc = new AtomicLong();
private final Map<Long, List<S3ObjectMetadata>> streamObjects = new HashMap<>();
private final Map<Long, S3ObjectMetadata> streamSetObjects = new HashMap<>();
private final ConcurrentMap<Long, List<S3ObjectMetadata>> streamObjects = new ConcurrentHashMap<>();
private final ConcurrentMap<Long, Pair<Long, S3ObjectMetadata>> streamSetObjects = new ConcurrentHashMap<>();

public static void advanceNodeId() {
NODE_ID_ALLOC.getAndIncrement();
}

@Override
public synchronized CompletableFuture<Long> prepareObject(int count, long ttl) {
return CompletableFuture.completedFuture(objectIdAlloc.getAndIncrement());
return CompletableFuture.completedFuture(objectIdAlloc.getAndAdd(count));
}

private static StreamOffsetRange to(ObjectStreamRange s) {
return new StreamOffsetRange(s.getStreamId(), s.getStartOffset(), s.getEndOffset());
}

@Override
public synchronized CompletableFuture<CommitStreamSetObjectResponse> commitStreamSetObject(CommitStreamSetObjectRequest request) {
public synchronized CompletableFuture<CommitStreamSetObjectResponse> commitStreamSetObject(
CommitStreamSetObjectRequest request) {
long dataTimeInMs = System.currentTimeMillis();
if (!request.getCompactedObjectIds().isEmpty()) {
for (long id : request.getCompactedObjectIds()) {
dataTimeInMs = Math.min(streamSetObjects.get(id).dataTimeInMs(), dataTimeInMs);
dataTimeInMs = Math.min(streamSetObjects.get(id).getRight().dataTimeInMs(), dataTimeInMs);
streamSetObjects.remove(id);
}
}
long now = System.currentTimeMillis();
if (request.getObjectId() != ObjectUtils.NOOP_OBJECT_ID) {
for (ObjectStreamRange range : request.getStreamRanges()) {
StreamMetadata stream = streams.get(range.getStreamId());
assert stream != null;
stream.setEndOffset(range.getEndOffset());
}

S3ObjectMetadata object = new S3ObjectMetadata(
request.getObjectId(), S3ObjectType.STREAM_SET, request.getStreamRanges().stream().map(MemoryMetadataManager::to).collect(Collectors.toList()),
dataTimeInMs, now, request.getObjectSize(), request.getOrderId());
streamSetObjects.put(request.getObjectId(), object);
}
for (StreamObject r : request.getStreamObjects()) {
List<S3ObjectMetadata> objects = streamObjects.computeIfAbsent(r.getStreamId(), id -> new LinkedList<>());
objects.add(
new S3ObjectMetadata(
r.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(r.getStreamId(), r.getStartOffset(), r.getEndOffset())),
dataTimeInMs, now, r.getObjectSize(), 0
)
request.getObjectId(), S3ObjectType.STREAM_SET, request.getStreamRanges().stream().map(MemoryMetadataManager::to).collect(Collectors.toList()),
dataTimeInMs, now, request.getObjectSize(), request.getOrderId());
streamSetObjects.put(request.getObjectId(), Pair.of(NODE_ID_ALLOC.get(), object));
}

for (StreamObject streamObject : request.getStreamObjects()) {
long streamId = streamObject.getStreamId();
StreamMetadata stream = streams.get(streamId);
assert stream != null;
stream.setEndOffset(streamObject.getEndOffset());

List<S3ObjectMetadata> metadataList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>());
metadataList.add(
new S3ObjectMetadata(
streamObject.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, streamObject.getStartOffset(), streamObject.getEndOffset())),
dataTimeInMs, now, streamObject.getObjectSize(), 0
)
);
}
request.getCompactedObjectIds().forEach(streamSetObjects::remove);
Expand All @@ -82,60 +111,155 @@ public synchronized CompletableFuture<CommitStreamSetObjectResponse> commitStrea

@Override
public synchronized CompletableFuture<Void> compactStreamObject(CompactStreamObjectRequest request) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
long streamId = request.getStreamId();
StreamObject streamObject = new StreamObject();
streamObject.setStreamId(streamId);
streamObject.setStartOffset(request.getStartOffset());
streamObject.setEndOffset(request.getEndOffset());
streamObject.setObjectId(request.getObjectId());
streamObject.setObjectSize(request.getObjectSize());

streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.add(new S3ObjectMetadata(
request.getObjectId(), S3ObjectType.STREAM, List.of(new StreamOffsetRange(streamId, request.getStartOffset(), request.getEndOffset())),
System.currentTimeMillis(), System.currentTimeMillis(), request.getObjectSize(), 0
));

HashSet<Long> idSet = new HashSet<>(request.getSourceObjectIds());
streamObjects.get(streamId).removeIf(metadata -> idSet.contains(metadata.objectId()));
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long startOffset, long endOffset, int limit) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
public synchronized CompletableFuture<List<S3ObjectMetadata>> getObjects(long streamId, long startOffset,
long endOffset, int limit) {
List<S3ObjectMetadata> streamSetObjectList = streamSetObjects.values()
.stream()
.map(Pair::getRight)
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1)))
.toList();
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1)))
.toList();

List<S3ObjectMetadata> result = new ArrayList<>();
result.addAll(streamSetObjectList);
result.addAll(streamObjectList);
result.sort((o1, o2) -> {
long startOffset1 = o1.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
long startOffset2 = o2.getOffsetRanges().stream().filter(r -> r.getStreamId() == streamId).findFirst().get().getStartOffset();
return Long.compare(startOffset1, startOffset2);
});

return CompletableFuture.completedFuture(result.stream().limit(limit).toList());
}

@Override
public synchronized CompletableFuture<List<S3ObjectMetadata>> getServerObjects() {
return CompletableFuture.completedFuture(new LinkedList<>(streamSetObjects.values()));
List<S3ObjectMetadata> result = streamSetObjects.values()
.stream()
.filter(pair -> pair.getLeft() == NODE_ID_ALLOC.get())
.map(Pair::getRight).toList();
return CompletableFuture.completedFuture(result);
}

@Override
public synchronized CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
public synchronized CompletableFuture<List<S3ObjectMetadata>> getStreamObjects(long streamId, long startOffset,
long endOffset, int limit) {
List<S3ObjectMetadata> streamObjectList = streamObjects.computeIfAbsent(streamId, id -> new LinkedList<>())
.stream()
.filter(o -> o.getOffsetRanges().stream().anyMatch(r -> r.getStreamId() == streamId && r.getEndOffset() > startOffset && (r.getStartOffset() <= endOffset || endOffset == -1)))
.limit(limit)
.toList();
return CompletableFuture.completedFuture(streamObjectList);
}

@Override
public synchronized CompletableFuture<List<StreamMetadata>> getOpeningStreams() {
return CompletableFuture.completedFuture(Collections.emptyList());
return CompletableFuture.completedFuture(streams.values().stream().toList());
}

@Override
public CompletableFuture<List<StreamMetadata>> getStreams(List<Long> streamIds) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
return CompletableFuture.completedFuture(streamIds.stream().map(streams::get).filter(Objects::nonNull).toList());
}

@Override
public synchronized CompletableFuture<Long> createStream() {
return FutureUtil.failedFuture(new UnsupportedOperationException());
long streamId = streamIdAlloc.getAndIncrement();
streams.put(streamId, new StreamMetadata(streamId, -1, 0, 0, StreamState.CLOSED));
return CompletableFuture.completedFuture(streamId);
}

@Override
public synchronized CompletableFuture<StreamMetadata> openStream(long streamId, long epoch) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() == StreamState.OPENED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " has been opened"));
}
if (stream.getEpoch() >= epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not newer than current epoch " + stream.getEpoch()));
}
stream.setEpoch(epoch);
stream.setState(StreamState.OPENED);
return CompletableFuture.completedFuture(stream);
}

@Override
public synchronized CompletableFuture<Void> trimStream(long streamId, long epoch, long newStartOffset) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() != StreamState.OPENED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened"));
}
if (stream.getEpoch() != epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch()));
}
if (newStartOffset < stream.getStartOffset()) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is less than current start offset " + stream.getStartOffset()));
}
if (newStartOffset > stream.getEndOffset()) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " new start offset " + newStartOffset + " is greater than current end offset " + stream.getEndOffset()));
}
stream.setStartOffset(newStartOffset);
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<Void> closeStream(long streamId, long epoch) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() != StreamState.OPENED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not opened"));
}
if (stream.getEpoch() != epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch()));
}
stream.setState(StreamState.CLOSED);
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<Void> deleteStream(long streamId, long epoch) {
return FutureUtil.failedFuture(new UnsupportedOperationException());
}

private static StreamOffsetRange to(ObjectStreamRange s) {
return new StreamOffsetRange(s.getStreamId(), s.getStartOffset(), s.getEndOffset());
StreamMetadata stream = streams.get(streamId);
if (stream == null) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " not found"));
}
if (stream.getState() != StreamState.CLOSED) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " is not closed"));
}
if (stream.getEpoch() != epoch) {
return CompletableFuture.failedFuture(new IllegalArgumentException("stream " + streamId + " epoch " + epoch + " is not equal to current epoch " + stream.getEpoch()));
}
streams.remove(streamId);
return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public int search(T target) {
int low = 0;
int high = size() - 1;
while (low <= high) {
int mid = (low + high) >>> 1;
int mid = low + ((high - low) >>> 1);
ComparableItem<T> midVal = get(mid);
if (midVal.isLessThan(target)) {
low = mid + 1;
Expand Down
Loading

0 comments on commit 59716a7

Please sign in to comment.