Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -62,57 +63,29 @@ public void testCoGroup() throws Exception {
env.setParallelism(1);

DataStream<Tuple2<String, Integer>> source1 =
env.addSource(
new SourceFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;

@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx)
throws Exception {
ctx.collect(Tuple2.of("a", 0));
ctx.collect(Tuple2.of("a", 1));
ctx.collect(Tuple2.of("a", 2));

ctx.collect(Tuple2.of("b", 3));
ctx.collect(Tuple2.of("b", 4));
ctx.collect(Tuple2.of("b", 5));

ctx.collect(Tuple2.of("a", 6));
ctx.collect(Tuple2.of("a", 7));
ctx.collect(Tuple2.of("a", 8));

// source is finite, so it will have an implicit MAX
// watermark when it finishes
}

@Override
public void cancel() {}
})
env.fromData(
Tuple2.of("a", 0),
Tuple2.of("a", 1),
Tuple2.of("a", 2),
Tuple2.of("b", 3),
Tuple2.of("b", 4),
Tuple2.of("b", 5),
Tuple2.of("a", 6),
Tuple2.of("a", 7),
Tuple2.of("a", 8))

// source is finite, so it will have an implicit MAX
// watermark when it finishes
.assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());

DataStream<Tuple2<String, Integer>> source2 =
env.addSource(
new SourceFunction<Tuple2<String, Integer>>() {

@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx)
throws Exception {
ctx.collect(Tuple2.of("a", 0));
ctx.collect(Tuple2.of("a", 1));

ctx.collect(Tuple2.of("b", 3));

ctx.collect(Tuple2.of("c", 6));
ctx.collect(Tuple2.of("c", 7));
ctx.collect(Tuple2.of("c", 8));

// source is finite, so it will have an implicit MAX
// watermark when it finishes
}

@Override
public void cancel() {}
})
env.fromData(
Tuple2.of("a", 0),
Tuple2.of("a", 1),
Tuple2.of("b", 3),
Tuple2.of("c", 6),
Tuple2.of("c", 7),
Tuple2.of("c", 8))
.assignTimestampsAndWatermarks(new Tuple2TimestampExtractor());

source1.coGroup(source2)
Expand Down Expand Up @@ -140,13 +113,7 @@ public void coGroup(
out.collect(result.toString());
}
})
.addSink(
new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
testResults.add(value);
}
});
.sinkTo(new TestSink());

env.execute("CoGroup Test");

Expand All @@ -172,57 +139,25 @@ public void testJoin() throws Exception {
env.setParallelism(1);

DataStream<Tuple3<String, String, Integer>> source1 =
env.addSource(
new SourceFunction<Tuple3<String, String, Integer>>() {

@Override
public void run(
SourceContext<Tuple3<String, String, Integer>> ctx)
throws Exception {
ctx.collect(Tuple3.of("a", "x", 0));
ctx.collect(Tuple3.of("a", "y", 1));
ctx.collect(Tuple3.of("a", "z", 2));

ctx.collect(Tuple3.of("b", "u", 3));
ctx.collect(Tuple3.of("b", "w", 5));

ctx.collect(Tuple3.of("a", "i", 6));
ctx.collect(Tuple3.of("a", "j", 7));
ctx.collect(Tuple3.of("a", "k", 8));

// source is finite, so it will have an implicit MAX
// watermark when it finishes
}

@Override
public void cancel() {}
})
env.fromData(
Tuple3.of("a", "x", 0),
Tuple3.of("a", "y", 1),
Tuple3.of("a", "z", 2),
Tuple3.of("b", "u", 3),
Tuple3.of("b", "w", 5),
Tuple3.of("a", "i", 6),
Tuple3.of("a", "j", 7),
Tuple3.of("a", "k", 8))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

DataStream<Tuple3<String, String, Integer>> source2 =
env.addSource(
new SourceFunction<Tuple3<String, String, Integer>>() {

@Override
public void run(
SourceContext<Tuple3<String, String, Integer>> ctx)
throws Exception {
ctx.collect(Tuple3.of("a", "u", 0));
ctx.collect(Tuple3.of("a", "w", 1));

ctx.collect(Tuple3.of("b", "i", 3));
ctx.collect(Tuple3.of("b", "k", 5));

ctx.collect(Tuple3.of("a", "x", 6));
ctx.collect(Tuple3.of("a", "z", 8));

// source is finite, so it will have an implicit MAX
// watermark when it finishes
}

@Override
public void cancel() {}
})
env.fromData(
Tuple3.of("a", "u", 0),
Tuple3.of("a", "w", 1),
Tuple3.of("b", "i", 3),
Tuple3.of("b", "k", 5),
Tuple3.of("a", "x", 6),
Tuple3.of("a", "z", 8))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

source1.join(source2)
Expand All @@ -242,13 +177,7 @@ public String join(
return first + ":" + second;
}
})
.addSink(
new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
testResults.add(value);
}
});
.sinkTo(new TestSink());

env.execute("Join Test");

Expand Down Expand Up @@ -286,32 +215,15 @@ public void testSelfJoin() throws Exception {
env.setParallelism(1);

DataStream<Tuple3<String, String, Integer>> source1 =
env.addSource(
new SourceFunction<Tuple3<String, String, Integer>>() {
private static final long serialVersionUID = 1L;

@Override
public void run(
SourceContext<Tuple3<String, String, Integer>> ctx)
throws Exception {
ctx.collect(Tuple3.of("a", "x", 0));
ctx.collect(Tuple3.of("a", "y", 1));
ctx.collect(Tuple3.of("a", "z", 2));

ctx.collect(Tuple3.of("b", "u", 3));
ctx.collect(Tuple3.of("b", "w", 5));

ctx.collect(Tuple3.of("a", "i", 6));
ctx.collect(Tuple3.of("a", "j", 7));
ctx.collect(Tuple3.of("a", "k", 8));

// source is finite, so it will have an implicit MAX
// watermark when it finishes
}

@Override
public void cancel() {}
})
env.fromData(
Tuple3.of("a", "x", 0),
Tuple3.of("a", "y", 1),
Tuple3.of("a", "z", 2),
Tuple3.of("b", "u", 3),
Tuple3.of("b", "w", 5),
Tuple3.of("a", "i", 6),
Tuple3.of("a", "j", 7),
Tuple3.of("a", "k", 8))
.assignTimestampsAndWatermarks(new Tuple3TimestampExtractor());

source1.join(source1)
Expand All @@ -331,13 +243,7 @@ public String join(
return first + ":" + second;
}
})
.addSink(
new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
testResults.add(value);
}
});
.sinkTo(new TestSink());

env.execute("Self-Join Test");

Expand Down Expand Up @@ -472,4 +378,23 @@ public String getKey(Tuple3<String, String, Integer> value) throws Exception {
return value.f0;
}
}

private static class TestSink implements Sink<String> {

@Override
public SinkWriter<String> createWriter(WriterInitContext context) {
return new SinkWriter<>() {
@Override
public void write(String element, Context context) {
testResults.add(element);
}

@Override
public void flush(boolean endOfInput) {}

@Override
public void close() {}
};
}
}
}