Building Apache Flink Applications in Java - Cheatsheet
public static void main (String [] args ) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment ();
env .fromElements (1 ,2 ,3 ,4 ,5 ).print ();
env .execute ();
}
$ flink run -c mypackage.MyClass $JAR_FILE
$ flink run --detached $JAR_FILE
$ flink stop --savepointPath $SAVEPOINT_FOLDER $JOB_ID
$ flink run --fromSavepoint $SAVEPOINT_FILE $JAR_FILE
Setting a Restart Strategy
env .setRestartStrategy (RestartStrategies .fixedDelayRestart (
3 , // number of restart attempts
Time .of (10 , TimeUnit .SECONDS ) // delay
));
StreamExecutionEnvironment.fromElements
DataStream <Integer > stream = env .fromElements (1 ,2 ,3 ,4 ,5 );
DataGeneratorSource <String > source =
new DataGeneratorSource <>(
index -> "String" +index ,
numRecords ,
RateLimiterStrategy .perSecond (1 ),
Types .STRING
);
FileSource <String > source =
FileSource .forRecordStreamFormat (
new TextLineInputFormat (),
new Path ("<PATH_TO_FILE>" )
).build ();
KafkaSource <String > source = KafkaSource .<String >builder ()
.setProperties (config )
.setTopics ("topic1" , "topic2" )
.setValueOnlyDeserializer (new SimpleStringSchema ())
.build ();
DataStream <String > stream = env
.fromSource (
source ,
WatermarkStrategy .noWatermarks (),
"my_source"
);
Serializers & Deserializers
public class Person {
public String name ;
private String email ;
public Person () {}
public String getEmail () {return email ;}
public void setEmail (String email ) {this .email = email ;}
}
Registring Kryo Serializers
env .getConfig ().registerKryoType (MyCustomType .class );
Disabling Kryo Serialization
env .getConfig ().disableGenericTypes ();
JsonSerializationSchema <MyClass > serializer =
new JsonSerializationSchema <>();
JsonDeserializationSchema
JsonDeserializationSchema <MyClass > deserializer =
new JsonDeserializationSchema <>(MyClass .class );
JsonSerializationSchema <MyClass > serializer =
new JsonSerializationSchema <>(() ->
new ObjectMapper ()
.registerModule (new JavaTimeModule ())
);
Transforming Data in Flink
ProcessFunction - Mapping Elements
public class MyProcessFunction
extends ProcessFunction <Input , Output > {
@ Override
public void processElement (
Input input ,
ProcessFunction <Input , Output >.Context ctx ,
Collector <Output > collector
) {
collector .collect (new Output (input ));
}
}
ProcessFunction - Flattening Mapped Elements
public class MyProcessFunction
extends ProcessFunction <Input [], Output > {
@ Override
public void processElement (
Input [] collection ,
ProcessFunction <Input [], Output >.Context ctx ,
Collector <Output > collector
) {
for (Input input : collection ) {
collector .collect (new Output (input ));
}
}
}
ProcessFunction - Filtering Elements
public class MyProcessFunction
extends ProcessFunction <Input , Input > {
@ Override
public void processElement (
Input input ,
ProcessFunction <Input , Input >.Context ctx ,
Collector <Input > collector
) {
if (condition ) {
collector .collect (input );
}
}
}
stream .process (new MyProcessFunction ());
stream .map (input -> new Output (input ));
DataStream <Double > doubles = integers .map (
input -> Double .valueOf (input ) / 2
);
stream .flatMap ((collection ,collector ) -> {
for (Input input : collection ) {
collector .collect (new Output (input ));
}
});
DataStream <Integer > letterCount = sentences
.map (input -> input .split (" " ))
.flatMap ((words , collector ) -> {
for (String word : words ) {
collector .collect (word .length ());
}
});
stream .filter (input -> condition );
DataStream <Integer > evenIntegers = integers
.filter (input -> input % 2 == 0 );
stream .keyBy (
input -> input .getKey ()
)
class MyKeyedProcessFunction
extends KeyedProcessFunction <String , Input , Output > {
@ Override
public void processElement (
Input input ,
KeyedProcessFunction <String , Input , Output >.Context ctx ,
Collector <Output > collector ) {
String key = ctx .getCurrentKey ();
...
}
}
stream
.keyBy (input -> input .key )
.reduce ((s1 , s2 ) -> s1 .merge (s2 ));
DataStream <Tuple2 <String , Integer >> wordCountsByFirstLetter =
itemIdsAndCounts
.keyBy (tuple -> tuple .f0 )
.reduce ((l1 , l2 ) -> new Tuple2 (l1 .f0 , l1 .f1 + l2 .f1 ));
KafkaRecordSerializationSchema <MyClass > serializer =
KafkaRecordSerializationSchema .<MyClass >builder ()
.setTopic ("topic_name" )
.setValueSerializationSchema (
new JsonSerializationSchema <>()
)
.build ();
KafkaSink <MyClass > sink = KafkaSink .<MyClass >builder ()
.setKafkaProducerConfig (config )
.setRecordSerializer (serializer )
.setDeliveryGuarantee (DeliveryGuarantee .EXACTLY_ONCE )
.build ();
stream
.sinkTo (sink )
.name ("sink_name" );