Skip to content

Commit

Permalink
[server][controller] Add MaterializedViewWriter and support view writ…
Browse files Browse the repository at this point in the history
…ers in L/F (#1296)

* [server][controller] Add MaterializedViewWriter and support view writers in L/F

1. View writers will be invoked in L/F SIT too instead of only in A/A SIT. We rely on view config validation to ensure views that do require A/A are only added to stores with A/A enabled.

2. This PR only includes creation of materialized view topics, writing of data records and control messages to the materialized view topics in server and controller.

- Materialized view topics are created during version creation time along with other view topics.
- SOP is sent during view topic creation time with same chunking and compression configs as the store version.
- EOP is sent when servers have reported EOP in every partition.
- Incremental push control messages SOIP and EOIP are not propagated to the view topic for now because the end
to end incremental push tracking story for view topics is not clear yet. Store owners will likely just disable the
requirement to wait for view consumers to fully ingest the incremental push.
- Ingestion heartbeats will not be propagated. We will broadcast the heartbeat and leader complete state in a separate PR.
- Version swap for CDC users will be implemented in a separate PR to keep this PR somewhat short for review.

3. One issue to be resolved is that during processing of batch records in the native replication source fabric, where we consume local VT, a leader transfer could result in missing records in the materialized view topic. This is because we don't do any global checkpointing across leader and followers when consuming local VT. To solve this we will be producing to the view topic from the VPJ itself.
  • Loading branch information
xunyin8 authored Jan 17, 2025
1 parent cf814c7 commit 5a81893
Show file tree
Hide file tree
Showing 55 changed files with 2,410 additions and 536 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.davinci.store.cache.backend.ObjectCacheBackend;
import com.linkedin.davinci.store.record.ByteBufferValueRecord;
import com.linkedin.davinci.store.record.ValueRecord;
import com.linkedin.davinci.store.view.VeniceViewWriter;
import com.linkedin.davinci.utils.ByteArrayKey;
import com.linkedin.venice.exceptions.PersistenceFailureException;
import com.linkedin.venice.exceptions.VeniceException;
Expand Down Expand Up @@ -68,7 +67,6 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -641,66 +639,40 @@ protected void processMessageAndMaybeProduceToKafka(
// call in this context much less obtrusive, however, it implies that all views can only work for AA stores

// Write to views
if (this.viewWriters.size() > 0) {
Runnable produceToVersionTopic = () -> producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
if (hasViewWriters()) {
/**
* The ordering guarantees we want is the following:
*
* 1. Write to all view topics (in parallel).
* 2. Write to the VT only after we get the ack for all views AND the previous write to VT was queued into the
* producer (but not necessarily acked).
*/
long preprocessingTime = System.currentTimeMillis();
CompletableFuture currentVersionTopicWrite = new CompletableFuture();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
ByteBuffer oldValueBB = mergeConflictResultWrapper.getOldValueByteBufferProvider().get();
int oldValueSchemaId =
oldValueBB == null ? -1 : mergeConflictResultWrapper.getOldValueProvider().get().writerSchemaId();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = writer.processRecord(
mergeConflictResult.getNewValue(),
oldValueBB,
keyBytes,
versionNumber,
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord());
}
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(
mergeConflictResultWrapper.getUpdatedValueBytes(),
oldValueBB,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
mergeConflictResult.getValueSchemaId(),
oldValueSchemaId,
mergeConflictResult.getRmdRecord()),
produceToVersionTopic);
} else {
// This function may modify the original record in KME and it is unsafe to use the payload from KME directly
// after
// this call.
producePutOrDeleteToKafka(
mergeConflictResultWrapper,
partitionConsumptionState,
keyBytes,
consumerRecord,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// after this call.
produceToVersionTopic.run();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static com.linkedin.davinci.kafka.consumer.LeaderFollowerStateType.STANDBY;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.END_OF_PUSH;
import static com.linkedin.venice.kafka.protocol.enums.ControlMessageType.START_OF_SEGMENT;
import static com.linkedin.venice.kafka.protocol.enums.MessageType.UPDATE;
import static com.linkedin.venice.pubsub.api.PubSubMessageHeaders.VENICE_LEADER_COMPLETION_STATE_HEADER;
import static com.linkedin.venice.writer.VeniceWriter.APP_DEFAULT_LOGICAL_TS;
import static com.linkedin.venice.writer.VeniceWriter.DEFAULT_LEADER_METADATA_WRAPPER;
Expand Down Expand Up @@ -100,6 +101,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -108,6 +110,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.LongPredicate;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -2402,7 +2405,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
boolean produceToLocalKafka = shouldProduceToVersionTopic(partitionConsumptionState);
// UPDATE message is only expected in LEADER which must be produced to kafka.
MessageType msgType = MessageType.valueOf(kafkaValue);
if (msgType == MessageType.UPDATE && !produceToLocalKafka) {
if (msgType == UPDATE && !produceToLocalKafka) {
throw new VeniceMessageException(
ingestionTaskName + " hasProducedToKafka: Received UPDATE message in non-leader for: "
+ consumerRecord.getTopicPartition() + " Offset " + consumerRecord.getOffset());
Expand Down Expand Up @@ -2436,6 +2439,15 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
partitionConsumptionState.getVeniceWriterLazyRef().ifPresent(vw -> vw.flush());
partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime);
}
/**
* Materialized view need to produce to the corresponding view topic for the batch portion of the data. This is
* achieved in the following ways:
* 1. Remote fabric(s) will leverage NR where the leader will replicate VT from NR source fabric and produce
* to local view topic(s).
* 2. NR source fabric's view topic will be produced by VPJ. This is because there is no checkpointing and
* easy way to add checkpointing for leaders consuming the local VT. Making it difficult and error prone if
* we let the leader produce to view topic(s) in NR source fabric.
*/
return DelegateConsumerRecordResult.QUEUED_TO_DRAINER;
}

Expand Down Expand Up @@ -2464,7 +2476,7 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(

if (kafkaKey.isControlMessage()) {
boolean producedFinally = true;
ControlMessage controlMessage = (ControlMessage) kafkaValue.payloadUnion;
ControlMessage controlMessage = (ControlMessage) kafkaValue.getPayloadUnion();
ControlMessageType controlMessageType = ControlMessageType.valueOf(controlMessage);
leaderProducedRecordContext = LeaderProducedRecordContext
.newControlMessageRecord(kafkaClusterId, consumerRecord.getOffset(), kafkaKey.getKey(), controlMessage);
Expand All @@ -2489,6 +2501,14 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
* consumes the first message; potential message type: SOS, EOS, SOP, EOP, data message (consider server restart).
*/
case END_OF_PUSH:
// CMs that are produced with DIV pass-through mode can break DIV without synchronization with view writers.
// This is because for data (PUT) records we queue their produceToLocalKafka behind the completion of view
// writers. The main SIT will move on to subsequent messages and for CMs that don't need to be propagated
// to view topics we are producing them directly. If we don't check the previous write before producing the
// CMs then in the VT we might get out of order messages and with pass-through DIV that's going to be an
// issue. e.g. a PUT record belonging to seg:0 can come after the EOS of seg:0 due to view writer delays.
// Since SOP and EOP are rare we can simply wait for the last VT produce future.
checkAndWaitForLastVTProduceFuture(partitionConsumptionState);
/**
* Simply produce this EOP to local VT. It will be processed in order in the drainer queue later
* after successfully producing to kafka.
Expand Down Expand Up @@ -2533,35 +2553,46 @@ protected DelegateConsumerRecordResult delegateConsumerRecord(
*
* There is one exception that overrules the above conditions. i.e. if the SOS is a heartbeat from the RT topic.
* In such case the heartbeat is produced to VT with updated {@link LeaderMetadataWrapper}.
*
* We want to ensure correct ordering for any SOS and EOS that we do decide to write to VT. This is done by
* coordinating with the corresponding {@link PartitionConsumptionState#getLastVTProduceCallFuture}.
* However, this coordination is only needed if there are view writers. i.e. the VT writes and CM writes
* need to be in the same mode. Either both coordinate with lastVTProduceCallFuture or neither.
*/
if (!consumerRecord.getTopicPartition().getPubSubTopic().isRealTime()) {
produceToLocalKafka(
consumerRecord,
final LeaderProducedRecordContext segmentCMLeaderProduceRecordContext = leaderProducedRecordContext;
maybeQueueCMWritesToVersionTopic(
partitionConsumptionState,
leaderProducedRecordContext,
(callback, leaderMetadataWrapper) -> partitionConsumptionState.getVeniceWriterLazyRef()
.get()
.put(
consumerRecord.getKey(),
consumerRecord.getValue(),
callback,
consumerRecord.getTopicPartition().getPartitionNumber(),
leaderMetadataWrapper),
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs);
() -> produceToLocalKafka(
consumerRecord,
partitionConsumptionState,
segmentCMLeaderProduceRecordContext,
(callback, leaderMetadataWrapper) -> partitionConsumptionState.getVeniceWriterLazyRef()
.get()
.put(
consumerRecord.getKey(),
consumerRecord.getValue(),
callback,
consumerRecord.getTopicPartition().getPartitionNumber(),
leaderMetadataWrapper),
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs));
} else {
if (controlMessageType == START_OF_SEGMENT
&& Arrays.equals(consumerRecord.getKey().getKey(), KafkaKey.HEART_BEAT.getKey())) {
propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
final LeaderProducedRecordContext heartbeatLeaderProducedRecordContext = leaderProducedRecordContext;
maybeQueueCMWritesToVersionTopic(
partitionConsumptionState,
consumerRecord,
leaderProducedRecordContext,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs);
() -> propagateHeartbeatFromUpstreamTopicToLocalVersionTopic(
partitionConsumptionState,
consumerRecord,
heartbeatLeaderProducedRecordContext,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingPerRecordTimestampNs));
} else {
/**
* Based on current design handling this case (specially EOS) is tricky as we don't produce the SOS/EOS
Expand Down Expand Up @@ -3334,9 +3365,43 @@ protected void processMessageAndMaybeProduceToKafka(
beforeProcessingRecordTimestampNs,
beforeProcessingBatchRecordsTimestampMs).getWriteComputeResultWrapper();
}
if (msgType.equals(UPDATE) && writeComputeResultWrapper.isSkipProduce()) {
return;
}
Runnable produceToVersionTopic = () -> produceToLocalKafkaHelper(
consumerRecord,
partitionConsumptionState,
writeComputeResultWrapper,
partition,
kafkaUrl,
kafkaClusterId,
beforeProcessingRecordTimestampNs);
// Write to views
if (hasViewWriters()) {
Put newPut = writeComputeResultWrapper.getNewPut();
queueUpVersionTopicWritesWithViewWriters(
partitionConsumptionState,
(viewWriter) -> viewWriter.processRecord(newPut.putValue, keyBytes, newPut.schemaId),
produceToVersionTopic);
} else {
produceToVersionTopic.run();
}
}

Put newPut = writeComputeResultWrapper.getNewPut();
private void produceToLocalKafkaHelper(
PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord,
PartitionConsumptionState partitionConsumptionState,
WriteComputeResultWrapper writeComputeResultWrapper,
int partition,
String kafkaUrl,
int kafkaClusterId,
long beforeProcessingRecordTimestampNs) {
KafkaKey kafkaKey = consumerRecord.getKey();
KafkaMessageEnvelope kafkaValue = consumerRecord.getValue();
byte[] keyBytes = kafkaKey.getKey();
MessageType msgType = MessageType.valueOf(kafkaValue.messageType);
LeaderProducedRecordContext leaderProducedRecordContext;
Put newPut = writeComputeResultWrapper.getNewPut();
switch (msgType) {
case PUT:
leaderProducedRecordContext =
Expand Down Expand Up @@ -3390,10 +3455,6 @@ protected void processMessageAndMaybeProduceToKafka(
break;

case UPDATE:
if (writeComputeResultWrapper.isSkipProduce()) {
return;
}

leaderProducedRecordContext =
LeaderProducedRecordContext.newPutRecord(kafkaClusterId, consumerRecord.getOffset(), keyBytes, newPut);
BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceFunction =
Expand Down Expand Up @@ -3610,15 +3671,17 @@ protected long measureRTOffsetLagForSingleRegion(
}

@Override
protected void processVersionSwapMessage(
protected void processControlMessageForViews(
KafkaKey kafkaKey,
KafkaMessageEnvelope kafkaMessageEnvelope,
ControlMessage controlMessage,
int partition,
PartitionConsumptionState partitionConsumptionState) {

// Iterate through list of views for the store and process the control message.
for (VeniceViewWriter viewWriter: viewWriters.values()) {
// TODO: at some point, we should do this on more or all control messages potentially as we add more view types
viewWriter.processControlMessage(controlMessage, partition, partitionConsumptionState, this.versionNumber);
viewWriter
.processControlMessage(kafkaKey, kafkaMessageEnvelope, controlMessage, partition, partitionConsumptionState);
}
}

Expand Down Expand Up @@ -3879,6 +3942,35 @@ protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptio
}
}

protected void queueUpVersionTopicWritesWithViewWriters(
PartitionConsumptionState partitionConsumptionState,
Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor,
Runnable versionTopicWrite) {
long preprocessingTime = System.currentTimeMillis();
CompletableFuture<Void> currentVersionTopicWrite = new CompletableFuture<>();
CompletableFuture[] viewWriterFutures = new CompletableFuture[this.viewWriters.size() + 1];
int index = 0;
// The first future is for the previous write to VT
viewWriterFutures[index++] = partitionConsumptionState.getLastVTProduceCallFuture();
for (VeniceViewWriter writer: viewWriters.values()) {
viewWriterFutures[index++] = viewWriterRecordProcessor.apply(writer);
}
hostLevelIngestionStats.recordViewProducerLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
CompletableFuture.allOf(viewWriterFutures).whenCompleteAsync((value, exception) -> {
hostLevelIngestionStats.recordViewProducerAckLatency(LatencyUtils.getElapsedTimeFromMsToMs(preprocessingTime));
if (exception == null) {
versionTopicWrite.run();
currentVersionTopicWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
currentVersionTopicWrite.completeExceptionally(veniceException);
}
});

partitionConsumptionState.setLastVTProduceCallFuture(currentVersionTopicWrite);
}

/**
* Once leader is marked completed, immediately reset {@link #lastSendIngestionHeartbeatTimestamp}
* such that {@link #maybeSendIngestionHeartbeat()} will send HB SOS to the respective RT topics
Expand All @@ -3904,4 +3996,34 @@ Set<String> getKafkaUrlSetFromTopicSwitch(TopicSwitchWrapper topicSwitchWrapper)
}
return topicSwitchWrapper.getSourceServers();
}

private void checkAndWaitForLastVTProduceFuture(PartitionConsumptionState partitionConsumptionState)
throws ExecutionException, InterruptedException {
partitionConsumptionState.getLastVTProduceCallFuture().get();
}

protected boolean hasViewWriters() {
return viewWriters != null && !viewWriters.isEmpty();
}

private void maybeQueueCMWritesToVersionTopic(
PartitionConsumptionState partitionConsumptionState,
Runnable produceCall) {
if (hasViewWriters()) {
CompletableFuture<Void> propagateSegmentCMWrite = new CompletableFuture<>();
partitionConsumptionState.getLastVTProduceCallFuture().whenCompleteAsync((value, exception) -> {
if (exception == null) {
produceCall.run();
propagateSegmentCMWrite.complete(null);
} else {
VeniceException veniceException = new VeniceException(exception);
this.setIngestionException(partitionConsumptionState.getPartition(), veniceException);
propagateSegmentCMWrite.completeExceptionally(veniceException);
}
});
partitionConsumptionState.setLastVTProduceCallFuture(propagateSegmentCMWrite);
} else {
produceCall.run();
}
}
}
Loading

0 comments on commit 5a81893

Please sign in to comment.