27
27
import org .apache .flink .api .common .typeutils .TypeSerializerUpgradeTestBase ;
28
28
import org .apache .flink .api .java .typeutils .TypeExtractor ;
29
29
30
- import org .hamcrest . Matcher ;
30
+ import org .assertj . core . api . Condition ;
31
31
32
- import static org .hamcrest .Matchers .is ;
33
- import static org .junit .Assert .assertSame ;
32
+ import static org .assertj .core .api .Assertions .assertThat ;
34
33
35
34
/** A {@link TypeSerializerUpgradeTestBase} for the {@link PojoSerializer}. */
36
35
class PojoRecordSerializerUpgradeTestSpecifications {
@@ -58,7 +57,7 @@ public TypeSerializer<PojoBeforeUpgrade> createPriorSerializer() {
58
57
TypeSerializer <PojoBeforeUpgrade > serializer =
59
58
TypeExtractor .createTypeInfo (PojoBeforeUpgrade .class )
60
59
.createSerializer (new SerializerConfigImpl ());
61
- assertSame ( PojoSerializer . class , serializer .getClass ());
60
+ assertThat ( serializer .getClass ()). isSameAs ( PojoSerializer . class );
62
61
return serializer ;
63
62
}
64
63
@@ -81,18 +80,19 @@ public TypeSerializer<PojoAfterUpgrade> createUpgradedSerializer() {
81
80
TypeSerializer <PojoAfterUpgrade > serializer =
82
81
TypeExtractor .createTypeInfo (PojoAfterUpgrade .class )
83
82
.createSerializer (new SerializerConfigImpl ());
84
- assertSame ( PojoSerializer . class , serializer .getClass ());
83
+ assertThat ( serializer .getClass ()). isSameAs ( PojoSerializer . class );
85
84
return serializer ;
86
85
}
87
86
88
87
@ Override
89
- public Matcher <PojoAfterUpgrade > testDataMatcher () {
90
- return is (new PojoAfterUpgrade (911108 , "Gordon" ));
88
+ public Condition <PojoAfterUpgrade > testDataCondition () {
89
+ return new Condition <>(
90
+ new PojoAfterUpgrade (911108 , "Gordon" )::equals , "value is (911108, Gordon)" );
91
91
}
92
92
93
93
@ Override
94
- public Matcher <TypeSerializerSchemaCompatibility <PojoAfterUpgrade >>
95
- schemaCompatibilityMatcher (FlinkVersion version ) {
94
+ public Condition <TypeSerializerSchemaCompatibility <PojoAfterUpgrade >>
95
+ schemaCompatibilityCondition (FlinkVersion version ) {
96
96
return TypeSerializerConditions .isCompatibleAsIs ();
97
97
}
98
98
}
@@ -110,7 +110,7 @@ public TypeSerializer<RecordBeforeMigration> createPriorSerializer() {
110
110
TypeSerializer <RecordBeforeMigration > serializer =
111
111
TypeExtractor .createTypeInfo (RecordBeforeMigration .class )
112
112
.createSerializer (new SerializerConfigImpl ());
113
- assertSame ( PojoSerializer . class , serializer .getClass ());
113
+ assertThat ( serializer .getClass ()). isSameAs ( PojoSerializer . class );
114
114
return serializer ;
115
115
}
116
116
@@ -133,18 +133,20 @@ public TypeSerializer<RecordAfterSchemaUpgrade> createUpgradedSerializer() {
133
133
TypeSerializer <RecordAfterSchemaUpgrade > serializer =
134
134
TypeExtractor .createTypeInfo (RecordAfterSchemaUpgrade .class )
135
135
.createSerializer (new SerializerConfigImpl ());
136
- assertSame ( PojoSerializer . class , serializer .getClass ());
136
+ assertThat ( serializer .getClass ()). isSameAs ( PojoSerializer . class );
137
137
return serializer ;
138
138
}
139
139
140
140
@ Override
141
- public Matcher <RecordAfterSchemaUpgrade > testDataMatcher () {
142
- return is (new RecordAfterSchemaUpgrade ("Gordon" , 0 , null ));
141
+ public Condition <RecordAfterSchemaUpgrade > testDataCondition () {
142
+ return new Condition <>(
143
+ new RecordAfterSchemaUpgrade ("Gordon" , 0 , null )::equals ,
144
+ "value is (Gordon, 0 ,null)" );
143
145
}
144
146
145
147
@ Override
146
- public Matcher <TypeSerializerSchemaCompatibility <RecordAfterSchemaUpgrade >>
147
- schemaCompatibilityMatcher (FlinkVersion version ) {
148
+ public Condition <TypeSerializerSchemaCompatibility <RecordAfterSchemaUpgrade >>
149
+ schemaCompatibilityCondition (FlinkVersion version ) {
148
150
return TypeSerializerConditions .isCompatibleAfterMigration ();
149
151
}
150
152
}
0 commit comments