Skip to content

Commit

Permalink
[server] Fix regression in new push status store write schema retriev…
Browse files Browse the repository at this point in the history
…al logic (#762)

When server is deployed in a standalone fashion, the cluster does not have
** venice_system_store_PUSH_STATUS_SYSTEM_SCHEMA_STORE**
So the new push status store writer initialization logic will fail.
This PR adds an exception catch logic and fall back to old fashion.
  • Loading branch information
sixpluszero authored Nov 17, 2023
1 parent a2fef82 commit 6c1d58f
Showing 1 changed file with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.linkedin.venice.pushstatushelper.PushStatusStoreWriter;
import com.linkedin.venice.schema.SchemaEntry;
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.schema.writecompute.WriteComputeSchemaConverter;
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.service.AbstractVeniceService;
import com.linkedin.venice.stats.HelixMessageChannelStats;
import com.linkedin.venice.status.StatusMessageHandler;
Expand All @@ -49,6 +51,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
Expand Down Expand Up @@ -310,11 +313,24 @@ private void asyncStart() {
veniceServerConfig.getPubSubClientsFactory().getProducerAdapterFactory();
VeniceWriterFactory writerFactory =
new VeniceWriterFactory(veniceProperties.toProperties(), pubSubProducerAdapterFactory, null);
String dummyPushStatusStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName("dummy");
SchemaEntry valueSchemaEntry =
helixReadOnlySchemaRepository.getSupersetOrLatestValueSchema(dummyPushStatusStoreName);
DerivedSchemaEntry updateSchemaEntry =
helixReadOnlySchemaRepository.getLatestDerivedSchema(dummyPushStatusStoreName, valueSchemaEntry.getId());
SchemaEntry valueSchemaEntry;
DerivedSchemaEntry updateSchemaEntry;
try {
String dummyPushStatusStoreName = VeniceSystemStoreType.DAVINCI_PUSH_STATUS_STORE.getSystemStoreName("dummy");
valueSchemaEntry = helixReadOnlySchemaRepository.getSupersetOrLatestValueSchema(dummyPushStatusStoreName);
updateSchemaEntry =
helixReadOnlySchemaRepository.getLatestDerivedSchema(dummyPushStatusStoreName, valueSchemaEntry.getId());
} catch (VeniceException e) {
LOGGER.warn(
"ZK-shared system store is not available in current environment, will fall back to last known protocol version instead.");
int valueSchemaId = AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersion();
Schema valueSchema = AvroProtocolDefinition.PUSH_STATUS_SYSTEM_SCHEMA_STORE.getCurrentProtocolVersionSchema();
valueSchemaEntry = new SchemaEntry(valueSchemaId, valueSchema);
updateSchemaEntry = new DerivedSchemaEntry(
valueSchemaId,
1,
WriteComputeSchemaConverter.getInstance().convertFromValueRecordSchema(valueSchema));
}
// We use push status store for persisting incremental push statuses
statusStoreWriter =
new PushStatusStoreWriter(writerFactory, instance.getNodeId(), valueSchemaEntry, updateSchemaEntry);
Expand Down

0 comments on commit 6c1d58f

Please sign in to comment.