Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* table loader should be used carefully when used with writer tasks. It could result in heavy load
* on a catalog for jobs with many writers.
*/
class CachingTableSupplier implements SerializableSupplier<Table> {
public class CachingTableSupplier implements SerializableSupplier<Table> {

private static final Logger LOG = LoggerFactory.getLogger(CachingTableSupplier.class);

Expand All @@ -43,7 +43,7 @@ class CachingTableSupplier implements SerializableSupplier<Table> {
private long lastLoadTimeMillis;
private transient Table table;

CachingTableSupplier(
public CachingTableSupplier(
SerializableTable initialTable, TableLoader tableLoader, Duration tableRefreshInterval) {
Preconditions.checkArgument(initialTable != null, "initialTable cannot be null");
Preconditions.checkArgument(tableLoader != null, "tableLoader cannot be null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.io.Serializable;

/**
* Marker interface for custom metadata that flows through the Iceberg sink pipeline.
*
* <p>This interface allows users to attach arbitrary metadata to committables as they flow from
* writers through aggregators to committers. Implementations can carry custom information such as
* watermarks or other application-specific data.
*
* <p>Metadata serialization is handled by {@link CommittableMetadataSerializer} implementations
* registered via {@link CommittableMetadataRegistry}.
*
* @see CommittableMetadataSerializer
* @see CommittableMetadataRegistry
*/
public interface CommittableMetadata extends Serializable {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import javax.annotation.Nullable;

/**
* Global registry for {@link CommittableMetadataSerializer} implementations.
*
* <p>This registry provides a location to register custom serializers for {@link
* CommittableMetadata}. The registered serializer is used by {@link IcebergCommittableSerializer}
* and {@link WriteResultSerializer} to serialize/deserialize metadata flowing through the pipeline.
*
* @see CommittableMetadata
* @see CommittableMetadataSerializer
*/
public class CommittableMetadataRegistry {
private static volatile CommittableMetadataSerializer serializer = null;

private CommittableMetadataRegistry() {}

/**
* Register a metadata serializer.
*
* <p>This should be called before any Iceberg sinks are created.
*
* @param metadataSerializer The serializer to register (can be null to clear registration)
*/
public static void register(@Nullable CommittableMetadataSerializer metadataSerializer) {
serializer = metadataSerializer;
}

/**
* Get the registered metadata serializer.
*
* @return The registered serializer, or null if none is registered
*/
@Nullable
public static CommittableMetadataSerializer get() {
return serializer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.Serializable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

/**
* Serializer for {@link CommittableMetadata} implementations.
*
* <p>Responsible for serializing and deserializing custom metadata that flows through the Iceberg
* sink pipeline. The serializer must be registered via {@link CommittableMetadataRegistry} to be
* used by the sink's serialization infrastructure.
*
* @see CommittableMetadata
* @see CommittableMetadataRegistry
*/
public interface CommittableMetadataSerializer extends Serializable {
/**
* Serialize the given metadata to the output stream.
*
* @param metadata The metadata to serialize (never null)
* @param out The output stream to write to
* @throws IOException If serialization fails
*/
void write(CommittableMetadata metadata, DataOutputView out) throws IOException;

/**
* Deserialize metadata from the input stream.
*
* @param in The input stream to read from
* @return The deserialized metadata (never null)
* @throws IOException If deserialization fails
*/
CommittableMetadata read(DataInputView in) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/**
Expand All @@ -31,41 +32,58 @@
* <p>{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer
* and the Aggregator operator and between the Aggregator and the Committer as well.
*/
class IcebergCommittable implements Serializable {
public class IcebergCommittable implements Serializable {
private final byte[] manifest;
private final String jobId;
private final String operatorId;
private final long checkpointId;
@Nullable private final CommittableMetadata metadata;

IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) {
public IcebergCommittable(byte[] manifest, String jobId, String operatorId, long checkpointId) {
this(manifest, jobId, operatorId, checkpointId, null);
}

public IcebergCommittable(
byte[] manifest,
String jobId,
String operatorId,
long checkpointId,
@Nullable CommittableMetadata metadata) {
this.manifest = manifest;
this.jobId = jobId;
this.operatorId = operatorId;
this.checkpointId = checkpointId;
this.metadata = metadata;
}

byte[] manifest() {
public byte[] manifest() {
return manifest;
}

String jobId() {
public String jobId() {
return jobId;
}

String operatorId() {
public String operatorId() {
return operatorId;
}

Long checkpointId() {
public Long checkpointId() {
return checkpointId;
}

@Nullable
public CommittableMetadata metadata() {
return metadata;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("jobId", jobId)
.add("checkpointId", checkpointId)
.add("operatorId", operatorId)
.add("metadata", metadata)
.toString();
}

Expand All @@ -83,12 +101,13 @@ public boolean equals(Object o) {
return checkpointId == that.checkpointId
&& Arrays.equals(manifest, that.manifest)
&& Objects.equals(jobId, that.jobId)
&& Objects.equals(operatorId, that.operatorId);
&& Objects.equals(operatorId, that.operatorId)
&& Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
int result = Objects.hash(jobId, operatorId, checkpointId);
int result = Objects.hash(jobId, operatorId, checkpointId, metadata);
result = 31 * result + Arrays.hashCode(manifest);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ public byte[] serialize(IcebergCommittable committable) throws IOException {
view.writeLong(committable.checkpointId());
view.writeInt(committable.manifest().length);
view.write(committable.manifest());

boolean hasMetadata = committable.metadata() != null;
view.writeBoolean(hasMetadata);
if (hasMetadata) {
CommittableMetadataSerializer metadataSerializer = CommittableMetadataRegistry.get();
if (metadataSerializer != null) {
metadataSerializer.write(committable.metadata(), view);
}
}

return out.toByteArray();
}

Expand All @@ -61,7 +71,16 @@ public IcebergCommittable deserialize(int version, byte[] serialized) throws IOE
byte[] manifestBuf;
manifestBuf = new byte[manifestLen];
view.read(manifestBuf);
return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId);

CommittableMetadata metadata = null;
if (view.readBoolean()) { // hasMetadata
CommittableMetadataSerializer metadataSerializer = CommittableMetadataRegistry.get();
if (metadataSerializer != null) {
metadata = metadataSerializer.read(view);
}
}

return new IcebergCommittable(manifestBuf, jobId, operatorId, checkpointId, metadata);
}
throw new IOException("Unrecognized version or corrupt state: " + version);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
* same jobId-operatorId-checkpointId triplet
* </ul>
*/
class IcebergCommitter implements Committer<IcebergCommittable> {
public class IcebergCommitter implements Committer<IcebergCommittable> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergCommitter.class);
private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
public static final WriteResult EMPTY_WRITE_RESULT =
Expand All @@ -80,7 +80,7 @@ class IcebergCommitter implements Committer<IcebergCommittable> {
private int continuousEmptyCheckpoints = 0;
private boolean compactMode = false;

IcebergCommitter(
public IcebergCommitter(
TableLoader tableLoader,
String branch,
Map<String, String> snapshotProperties,
Expand Down
Loading