-
Notifications
You must be signed in to change notification settings - Fork 89
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server] ConsumptionTask Refactor #1318
base: main
Are you sure you want to change the base?
Conversation
9bc3128
to
a39d5d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, I'm supportive of reducing the size of the SIT
classes and subclasses.
I left a few minor comments and questions. Please take a look and LMK what you think.
...client/src/main/java/com/linkedin/davinci/kafka/consumer/ActiveActiveStoreIngestionTask.java
Outdated
Show resolved
Hide resolved
...nci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StorePartitionDataReceiver.java
Outdated
Show resolved
Hide resolved
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
protected Lazy<VeniceWriter<byte[], byte[], byte[]>> getVeniceWriter( | ||
PartitionConsumptionState partitionConsumptionState) { | ||
return partitionConsumptionState.getVeniceWriterLazyRef(); | ||
} | ||
|
||
protected void setRealTimeVeniceWriterRef(PartitionConsumptionState partitionConsumptionState) { | ||
partitionConsumptionState.setVeniceWriterLazyRef(veniceWriterForRealTime); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need functions for this? Why not just call partitionConsumptionState.getVeniceWriterLazyRef()
directly wherever we need to get the VW? And same for the setter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's because the writer on the PCS is not the real time writer, and it's being swapped out for veniceWriterForRealTime
, which is a member of LeaderFollowerStoreIngestionTask
.
PartitionConsumptionState partitionConsumptionState = | ||
storeIngestionTask.getPartitionConsumptionState(topicPartition.getPartitionNumber()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be great if the PCS
became a class property of the StorePartitionDataReceiver
, rather than be held in a map inside the SIT
. I suspect this is easier said than done, and no need to do it just yet... A large refactoring like this is ideally kept free of functional changes, and that one may be too risky. Better do it as a smaller follow up change. But just wanted to share this crazy dream 😂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ooo, that's an interesting idea. I've never thought of that before. I'd definitely be interested in attempting something like that out someday.
The way I saw it was that the PCS
map is a "global" variable that is shared among the ConsumptionTask
, Drainer
, and SIT
threads.
...ts/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java
Outdated
Show resolved
Hide resolved
I have a general comment - from my understanding, SIT thread also interacting with the SIT functions and PCS states. Even though I don't think this part of change is scoped into current PR, but the diagram created above seems to not including SIT thread. Do you mind adding a minor update to include that as well? This can be added to future diagram to talk about how the overall refactor works before vs after (btw I like all your diagrams, as they are clear to understand, thank you!) |
…nTask` into `StorePartitionDataReceiver`. 🧹
…ngestionTask` into `StorePartitionDataReceiver`. 🧹
…ght before `waitUntilValueSchemaAvailable()` from `StoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…orePartitionDataReceiver`. 🧹
…rom `StoreIngestionTask` / `LeaderFollowerStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…) from `LeaderFollowerStoreIngestionTask` / `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…FollowerStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…rStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
… from `LeaderFollowerStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…werStoreIngestionTask` / `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…tionTask` into `StorePartitionDataReceiver`. 🧹
…tionTask` into `StorePartitionDataReceiver`. 🧹
…Task` / `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…ionTask` / `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…k` / `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
… into `StorePartitionDataReceiver`. 🧹
…stionTask` into `StorePartitionDataReceiver`. 🧹
…veStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…ecord()` from `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…sk` into `StorePartitionDataReceiver`. 🧹
…onTask` into `StorePartitionDataReceiver`. 🧹
…toreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…torePartitionDataReceiver`. 🧹
…rFollowerStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…SchemaByteBufferFromStorage()` from `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…Manager` from `ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…ActiveActiveStoreIngestionTask` into `StorePartitionDataReceiver`. 🧹
…Task`, because it's just `versionedIngestionStats`? 🧹
…to `StorePartitionDataReceiver`. 🧹
a39d5d8
to
a5f550f
Compare
…eiver`. ✍️" This reverts commit a5f550f.
Summary
StoreIngestionTask.java
is getting too large and should be refactored.There are three threads:
ConsumptionTask
,StoreBufferDrainer
, andStoreIngestionTask
that all share the methods inStoreIngestionTask.java
, and it is difficult to know which part of the ingestion pipeline aStoreIngestionTask
method is part of, just by looking at it.ConsumptionTask
-only functions and member variables out ofStoreIngestionTask
into a class such asStorePartitionDataReceiver
.StorePartitionDataReceiver
toConsumedPartitionReceiver
to better indicate that it's part of theConsumptionTask
thread, but I'll leave it out of the diff to help with the review. Feel free to provide additional suggestions.How was this PR tested?
GHCI
Does this PR introduce any user-facing changes?