Skip to content

Commit

Permalink
[HUDI-8070] Support Flink 1.19 (apache#11779)
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu authored Aug 23, 2024
1 parent e1f70fd commit d550b46
Show file tree
Hide file tree
Showing 48 changed files with 4,867 additions and 32 deletions.
19 changes: 12 additions & 7 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ jobs:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
flinkProfile: "flink1.18"
flinkProfile: "flink1.19"

steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -420,6 +420,7 @@ jobs:
- flinkProfile: "flink1.16"
- flinkProfile: "flink1.17"
- flinkProfile: "flink1.18"
- flinkProfile: "flink1.19"
steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
Expand Down Expand Up @@ -456,15 +457,15 @@ jobs:
matrix:
include:
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.18'
flinkProfile: 'flink1.19'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.18'
flinkProfile: 'flink1.19'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.18'
flinkProfile: 'flink1.19'
sparkProfile: 'spark3.4'
sparkRuntime: 'spark3.4.0'

Expand Down Expand Up @@ -493,6 +494,10 @@ jobs:
strategy:
matrix:
include:
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.19'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.1'
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.18'
sparkProfile: 'spark3.5'
Expand Down Expand Up @@ -570,11 +575,11 @@ jobs:
matrix:
include:
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.18'
flinkProfile: 'flink1.19'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.18'
flinkProfile: 'flink1.19'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
steps:
Expand Down Expand Up @@ -713,7 +718,7 @@ jobs:
matrix:
include:
- scalaProfile: "scala-2.12"
flinkProfile: "flink1.18"
flinkProfile: "flink1.19"

steps:
- uses: actions/checkout@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.CollectOutputAdapter;
import org.apache.hudi.adapter.TestStreamConfigs;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
Expand Down Expand Up @@ -79,7 +80,7 @@ public class BulkInsertFunctionWrapper<I> implements TestFunctionWrapper<I> {
private MapFunction<RowData, RowData> mapFunction;
private Map<String, String> bucketIdToFileId;
private SortOperator sortOperator;
private CollectorOutput<RowData> output;
private CollectOutputAdapter<RowData> output;

public BulkInsertFunctionWrapper(String tablePath, Configuration conf) throws Exception {
ioManager = new IOManagerAsync();
Expand Down Expand Up @@ -227,7 +228,7 @@ private void setupSortOperator() throws Exception {
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
this.sortOperator = (SortOperator) sortOperatorGen.createSortOperator(conf);
this.sortOperator.setProcessingTimeService(new TestProcessingTimeService());
this.output = new CollectorOutput<>();
this.output = new CollectOutputAdapter<>();
StreamConfig streamConfig = new StreamConfig(conf);
streamConfig.setOperatorID(new OperatorID());
RowDataSerializer inputSerializer = new RowDataSerializer(rowTypeWithFileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.CollectOutputAdapter;
import org.apache.hudi.sink.clustering.ClusteringCommitEvent;
import org.apache.hudi.sink.clustering.ClusteringCommitSink;
import org.apache.hudi.sink.clustering.ClusteringOperator;
Expand Down Expand Up @@ -58,11 +59,11 @@ public class ClusteringFunctionWrapper {
/**
* Output to collect the clustering plan events.
*/
private CollectorOutput<ClusteringPlanEvent> planEventOutput;
private CollectOutputAdapter<ClusteringPlanEvent> planEventOutput;
/**
* Output to collect the clustering commit events.
*/
private CollectorOutput<ClusteringCommitEvent> commitEventOutput;
private CollectOutputAdapter<ClusteringCommitEvent> commitEventOutput;
/**
* Function that executes the clustering task.
*/
Expand All @@ -87,14 +88,14 @@ public ClusteringFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask

public void openFunction() throws Exception {
clusteringPlanOperator = new ClusteringPlanOperator(conf);
planEventOutput = new CollectorOutput<>();
planEventOutput = new CollectOutputAdapter<>();
clusteringPlanOperator.setup(streamTask, streamConfig, planEventOutput);
clusteringPlanOperator.open();

clusteringOperator = new ClusteringOperator(conf, TestConfigurations.ROW_TYPE);
// CAUTION: deprecated API used.
clusteringOperator.setProcessingTimeService(new TestProcessingTimeService());
commitEventOutput = new CollectorOutput<>();
commitEventOutput = new CollectOutputAdapter<>();
clusteringOperator.setup(streamTask, streamConfig, commitEventOutput);
clusteringOperator.open();
final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
Expand All @@ -108,7 +109,7 @@ public void openFunction() throws Exception {

public void cluster(long checkpointID) throws Exception {
// collect the ClusteringPlanEvents.
CollectorOutput<ClusteringPlanEvent> planOutput = new CollectorOutput<>();
CollectOutputAdapter<ClusteringPlanEvent> planOutput = new CollectOutputAdapter<>();
clusteringPlanOperator.setOutput(planOutput);
clusteringPlanOperator.notifyCheckpointComplete(checkpointID);
// collect the ClusteringCommitEvents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.CollectOutputAdapter;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
Expand Down Expand Up @@ -58,11 +59,11 @@ public class CompactFunctionWrapper {
/**
* Output to collect the compaction plan events.
*/
private CollectorOutput<CompactionPlanEvent> planEventOutput;
private CollectOutputAdapter<CompactionPlanEvent> planEventOutput;
/**
* Output to collect the compaction commit events.
*/
private CollectorOutput<CompactionCommitEvent> commitEventOutput;
private CollectOutputAdapter<CompactionCommitEvent> commitEventOutput;
/**
* Function that executes the compaction task.
*/
Expand All @@ -87,14 +88,14 @@ public CompactFunctionWrapper(Configuration conf, StreamTask<?, ?> streamTask, S

public void openFunction() throws Exception {
compactionPlanOperator = new CompactionPlanOperator(conf);
planEventOutput = new CollectorOutput<>();
planEventOutput = new CollectOutputAdapter<>();
compactionPlanOperator.setup(streamTask, streamConfig, planEventOutput);
compactionPlanOperator.open();

compactOperator = new CompactOperator(conf);
// CAUTION: deprecated API used.
compactOperator.setProcessingTimeService(new TestProcessingTimeService());
commitEventOutput = new CollectorOutput<>();
commitEventOutput = new CollectOutputAdapter<>();
compactOperator.setup(streamTask, streamConfig, commitEventOutput);
compactOperator.open();
final NonThrownExecutor syncExecutor = new MockCoordinatorExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink.utils;

import org.apache.hudi.adapter.CollectOutputAdapter;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
Expand Down Expand Up @@ -147,7 +148,7 @@ public void openFunction() throws Exception {

if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
bootstrapOperator = new BootstrapOperator<>(conf);
CollectorOutput<HoodieRecord<?>> output = new CollectorOutput<>();
CollectOutputAdapter<HoodieRecord<?>> output = new CollectOutputAdapter<>();
bootstrapOperator.setup(streamTask, streamConfig, output);
bootstrapOperator.initializeState(this.stateInitializationContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.sink.utils;
package org.apache.hudi.adapter;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -29,13 +29,13 @@
import java.util.List;

/**
* Collecting {@link Output} for {@link StreamRecord}.
* Adapter clazz for {@code Output}.
*/
public class CollectorOutput<T> implements Output<StreamRecord<T>> {
public class CollectOutputAdapter<T> implements Output<StreamRecord<T>> {

private final List<T> records;

public CollectorOutput() {
public CollectOutputAdapter() {
this.records = new ArrayList<>();
}

Expand Down Expand Up @@ -72,4 +72,4 @@ public void close() {
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
// no operation
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
import java.util.List;

/**
* Adapter clazz for {@code Output}.
*/
public class CollectOutputAdapter<T> implements Output<StreamRecord<T>> {

private final List<T> records;

public CollectOutputAdapter() {
this.records = new ArrayList<>();
}

public List<T> getRecords() {
return this.records;
}

@Override
public void emitWatermark(Watermark mark) {
// no operation
}

@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
// no operation
}

@Override
public void collect(StreamRecord<T> record) {
records.add(record.getValue());
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
}

@Override
public void close() {
this.records.clear();
}

@Override
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
// no operation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.OutputTag;

import java.util.ArrayList;
import java.util.List;

/**
* Adapter clazz for {@code Output}.
*/
public class CollectOutputAdapter<T> implements Output<StreamRecord<T>> {

private final List<T> records;

public CollectOutputAdapter() {
this.records = new ArrayList<>();
}

public List<T> getRecords() {
return this.records;
}

@Override
public void emitWatermark(Watermark mark) {
// no operation
}

@Override
public void emitLatencyMarker(LatencyMarker latencyMarker) {
// no operation
}

@Override
public void collect(StreamRecord<T> record) {
records.add(record.getValue());
}

@Override
public <X> void collect(OutputTag<X> outputTag, StreamRecord<X> record) {
throw new UnsupportedOperationException("Side output not supported for CollectorOutput");
}

@Override
public void close() {
this.records.clear();
}

@Override
public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
// no operation
}
}
Loading

0 comments on commit d550b46

Please sign in to comment.