Skip to content

[FLINK-32695] [Tests] Replace NonSerializableTupleSource with DataGeneratorSource V2 API #26818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,32 @@

package org.apache.flink.test.streaming.api;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MathUtils;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -60,8 +68,22 @@ public void testAsyncWaitOperator() throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create type information for Tuple2<Integer, NonSerializable>
TypeInformation<Tuple2<Integer, NonSerializable>> tupleTypeInfo =
new TupleTypeInfo<>(Types.INT, TypeInformation.of(NonSerializable.class));

// Create generator function for NonSerializable tuples
GeneratorFunction<Long, Tuple2<Integer, NonSerializable>> generateNonSerializableTuple =
index -> new Tuple2<>(index.intValue(), new NonSerializable(index.intValue()));

// Create DataGeneratorSource with the generator function
DataGeneratorSource<Tuple2<Integer, NonSerializable>> source =
new DataGeneratorSource<>(generateNonSerializableTuple, numElements, tupleTypeInfo);

// Create data stream using Source V2 API
DataStream<Tuple2<Integer, NonSerializable>> input =
env.addSource(new NonSerializableTupleSource(numElements));
env.fromSource(source, WatermarkStrategy.noWatermarks(), "NonSerializable Source")
.setParallelism(1);

AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function =
new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
Expand Down Expand Up @@ -106,7 +128,7 @@ public void run() {
final List<Integer> actualResult1 = new ArrayList<>(numElements);
MemorySinkFunction.registerCollection(0, actualResult1);

orderedResult.addSink(sinkFunction1).setParallelism(1);
orderedResult.sinkTo(sinkFunction1).setParallelism(1);

DataStream<Integer> unorderedResult =
AsyncDataStream.unorderedWait(input, function, timeout, TimeUnit.MILLISECONDS, 2);
Expand All @@ -116,7 +138,7 @@ public void run() {
final List<Integer> actualResult2 = new ArrayList<>(numElements);
MemorySinkFunction.registerCollection(1, actualResult2);

unorderedResult.addSink(sinkFunction2);
unorderedResult.sinkTo(sinkFunction2);

Collection<Integer> expected = new ArrayList<>(10);

Expand Down Expand Up @@ -145,77 +167,50 @@ public NonSerializable(int value) {
}
}

private static class NonSerializableTupleSource
implements SourceFunction<Tuple2<Integer, NonSerializable>> {
private static final long serialVersionUID = 3949171986015451520L;
private final int numElements;
private static class MemorySinkFunction implements Sink<Integer> {

public NonSerializableTupleSource(int numElements) {
this.numElements = numElements;
}
private final int key;
private static final Map<Integer, Collection<Integer>> collections =
new ConcurrentHashMap<>();

@Override
public void run(SourceContext<Tuple2<Integer, NonSerializable>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
ctx.collect(new Tuple2<>(i, new NonSerializable(i)));
}
public MemorySinkFunction(int key) {
this.key = key;
}

@Override
public void cancel() {}
}

private static class TupleSource implements SourceFunction<Tuple2<Integer, Integer>> {

private static final long serialVersionUID = -8110466235852024821L;
private final int numElements;
private final int numKeys;

public TupleSource(int numElements, int numKeys) {
this.numElements = numElements;
this.numKeys = numKeys;
public static void registerCollection(int key, Collection<Integer> collection) {
collections.put(key, collection);
}

@Override
public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
// keys '1' and '2' hash to different buckets
Tuple2<Integer, Integer> result =
new Tuple2<>(1 + (MathUtils.murmurHash(i) % numKeys), i);
ctx.collect(result);
}
public static void clear() {
collections.clear();
}

@Override
public void cancel() {}
}

private static class MemorySinkFunction implements SinkFunction<Integer> {
private static Map<Integer, Collection<Integer>> collections = new ConcurrentHashMap<>();

private static final long serialVersionUID = -8815570195074103860L;

private final int key;

public MemorySinkFunction(int key) {
this.key = key;
public SinkWriter<Integer> createWriter(WriterInitContext context) throws IOException {
return new MemorySinkWriter(key);
}

@Override
public void invoke(Integer value) throws Exception {
Collection<Integer> collection = collections.get(key);
private static class MemorySinkWriter implements SinkWriter<Integer>, Serializable {
private final int key;
private final Collection<Integer> collection;

synchronized (collection) {
collection.add(value);
public MemorySinkWriter(int key) {
this.key = key;
this.collection = collections.get(key);
}
}

public static void registerCollection(int key, Collection<Integer> collection) {
collections.put(key, collection);
}
@Override
public void write(Integer element, Context context) {
synchronized (collection) {
collection.add(element);
}
}

public static void clear() {
collections.clear();
@Override
public void flush(boolean endOfInput) {}

@Override
public void close() {}
}
}

Expand Down