Skip to content

Commit 9f8e33e

Browse files
committed
Remove numberic offset usage
1 parent b18ab05 commit 9f8e33e

File tree

3 files changed

+1
-22
lines changed

3 files changed

+1
-22
lines changed

services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminMetadata.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -178,33 +178,17 @@ private PubSubPosition getPubSubPosition(PubSubPosition position, Long offset) {
178178
} else {
179179
return PubSubSymbolicPosition.EARLIEST;
180180
}
181-
} else if (offset != null && offset > position.getNumericOffset()) {
182-
LOGGER.warn(
183-
"Offset {} is greater than position {}. Resetting position to offset.",
184-
offset,
185-
position.getNumericOffset());
186-
return ApacheKafkaOffsetPosition.of(offset);
187181
} else {
188182
return position;
189183
}
190184
}
191185

192186
public void setPubSubPosition(PubSubPosition pubSubPosition) {
193187
this.position = pubSubPosition;
194-
if (pubSubPosition != null) {
195-
this.offset = pubSubPosition.getNumericOffset();
196-
} else {
197-
this.offset = UNDEFINED_VALUE;
198-
}
199188
}
200189

201190
public void setUpstreamPubSubPosition(PubSubPosition upstreamPubPosition) {
202191
this.upstreamPosition = upstreamPubPosition;
203-
if (upstreamPubPosition != null) {
204-
this.upstreamOffset = upstreamPubPosition.getNumericOffset();
205-
} else {
206-
this.upstreamOffset = UNDEFINED_VALUE;
207-
}
208192
}
209193

210194
@Override

services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AdminConsumptionStats.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,6 @@ public AdminConsumptionStats(MetricsRepository metricsRepository, String name) {
7474
adminConsumeFailCountSensor = registerSensor("failed_admin_messages", new Count());
7575
adminConsumeFailRetriableMessageCountSensor = registerSensor("failed_retriable_admin_messages", new Count());
7676
adminTopicDIVErrorReportCountSensor = registerSensor("admin_message_div_error_report_count", new Count());
77-
registerSensor(
78-
new AsyncGauge(
79-
(ignored, ignored2) -> adminConsumptionFailedPosition == null
80-
? 0L
81-
: adminConsumptionFailedPosition.getNumericOffset(),
82-
"failed_admin_message_offset"));
8377
adminConsumptionCycleDurationMsSensor =
8478
registerSensor("admin_consumption_cycle_duration_ms", new Avg(), new Min(), new Max());
8579
registerSensor(

services/venice-controller/src/test/java/com/linkedin/venice/controller/kafka/consumer/TestAdminMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public void testSerializeAndDeserializeAdminMetadata() throws IOException {
5050

5151
// Verify JSON is human-readable
5252
String jsonString = new String(jsonBytes);
53+
System.out.println("Serialized AdminMetadata JSON: " + jsonString);
5354
assertTrue(jsonString.contains("\"executionId\" : 123"));
5455
assertTrue(jsonString.contains("\"offset\" : 12345"));
5556
assertTrue(jsonString.contains("\"upstreamOffset\" : 67890"));

0 commit comments

Comments
 (0)