Skip to content

Commit f8e97d4

Browse files
committed
[FLINK-3154][API] Upgrade from Kryo 2.x to Kryo 5.x. Removed twitter chill companion library which is abandoned.
1 parent 9878e56 commit f8e97d4

File tree

46 files changed

+1955
-409
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1955
-409
lines changed

flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer does not satisfy
106106
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
107107
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
108108
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
109-
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$SpecificInstanceCollectionSerializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
110-
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$SpecificInstanceCollectionSerializerForArrayList does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
111109
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
112110
org.apache.flink.runtime.io.network.api.CheckpointBarrier does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
113111
org.apache.flink.runtime.io.network.api.EndOfData does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated

flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.util.InstantiationUtil;
2828

2929
import com.esotericsoftware.kryo.Kryo;
30+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3031
import org.apache.hadoop.io.Writable;
3132
import org.objenesis.strategy.StdInstantiatorStrategy;
3233

@@ -174,12 +175,10 @@ private void checkKryoInitialized() {
174175
if (this.kryo == null) {
175176
this.kryo = new Kryo();
176177

177-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
178-
new Kryo.DefaultInstantiatorStrategy();
178+
DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
179179
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
180180
kryo.setInstantiatorStrategy(instantiatorStrategy);
181181

182-
this.kryo.setAsmEnabled(true);
183182
this.kryo.register(type);
184183
}
185184
}

flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.util.InstantiationUtil;
2828

2929
import com.esotericsoftware.kryo.Kryo;
30+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3031
import org.apache.hadoop.io.NullWritable;
3132
import org.apache.hadoop.io.Writable;
3233
import org.objenesis.strategy.StdInstantiatorStrategy;
@@ -126,12 +127,10 @@ private void checkKryoInitialized() {
126127
if (this.kryo == null) {
127128
this.kryo = new Kryo();
128129

129-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
130-
new Kryo.DefaultInstantiatorStrategy();
130+
DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
131131
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
132132
kryo.setInstantiatorStrategy(instantiatorStrategy);
133133

134-
this.kryo.setAsmEnabled(true);
135134
this.kryo.register(typeClass);
136135
}
137136
}

flink-connectors/flink-sql-connector-hive-2.3.10/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
4949
- org.apache.parquet:parquet-hadoop-bundle:1.8.1
5050
- org.apache.thrift:libthrift:0.14.1
5151
- org.apache.thrift:libfb303:0.9.3
52-
- org.objenesis:objenesis:2.1
52+
- org.objenesis:objenesis:3.4
5353

5454
The bundled Apache Hive org.apache.hive:hive-exec dependency bundles the following dependencies under the BSD license.
5555
See bundled license files for details.

flink-connectors/flink-sql-connector-hive-3.1.3/src/main/resources/META-INF/NOTICE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
4646
- org.apache.parquet:parquet-hadoop-bundle:1.10.0
4747
- org.codehaus.jackson:jackson-core-asl:1.9.13
4848
- org.codehaus.jackson:jackson-mapper-asl:1.9.13
49-
- org.objenesis:objenesis:2.1
49+
- org.objenesis:objenesis:3.4
5050

5151
The bundled Apache Hive org.apache.hive:hive-exec dependency bundles the following dependencies under the BSD license.
5252
See bundled license files for details.

flink-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ under the License.
112112

113113
<!-- for the fallback generic serializer -->
114114
<dependency>
115-
<groupId>com.esotericsoftware.kryo</groupId>
115+
<groupId>com.esotericsoftware</groupId>
116116
<artifactId>kryo</artifactId>
117117
<!-- managed version -->
118118
</dependency>

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
import com.esotericsoftware.kryo.Kryo;
2626
import com.esotericsoftware.kryo.Serializer;
27-
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
27+
import com.esotericsoftware.kryo.SerializerFactory.ReflectionSerializerFactory;
2828

2929
import javax.annotation.Nullable;
3030

@@ -118,7 +118,7 @@ public Serializer<?> getSerializer(Kryo kryo) {
118118
case UNSPECIFIED:
119119
return null;
120120
case CLASS:
121-
return ReflectionSerializerFactory.makeSerializer(
121+
return ReflectionSerializerFactory.newSerializer(
122122
kryo, serializerClass, registeredClass);
123123
case INSTANCE:
124124
return serializableSerializerInstance.getSerializer();

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.esotericsoftware.kryo.KryoException;
2424
import com.esotericsoftware.kryo.io.Input;
25+
import com.esotericsoftware.kryo.io.KryoBufferUnderflowException;
2526

2627
import java.io.EOFException;
2728
import java.io.IOException;
@@ -33,11 +34,6 @@ public NoFetchingInput(InputStream inputStream) {
3334
super(inputStream, 8);
3435
}
3536

36-
@Override
37-
public boolean eof() {
38-
throw new UnsupportedOperationException("NoFetchingInput does not support EOF.");
39-
}
40-
4137
@Override
4238
public int read() throws KryoException {
4339
require(1);
@@ -60,33 +56,56 @@ public boolean canReadLong() throws KryoException {
6056
* bytes. Thus, it will only load the data which is required and never prefetch data.
6157
*
6258
* @param required the number of bytes being available in the buffer
63-
* @return the number of bytes remaining, which is equal to required
59+
* @return The number of bytes remaining in the buffer, which will be at least <code>required
60+
* </code> bytes.
6461
* @throws KryoException
6562
*/
6663
@Override
6764
protected int require(int required) throws KryoException {
65+
// The main change between this and Kryo 5 Input.require is this will never read more bytes
66+
// than required.
67+
// There are also formatting changes to be compliant with the Flink project styling rules.
68+
int remaining = limit - position;
69+
if (remaining >= required) {
70+
return remaining;
71+
}
6872
if (required > capacity) {
6973
throw new KryoException(
70-
"Buffer too small: capacity: " + capacity + ", " + "required: " + required);
74+
"Buffer too small: capacity: " + capacity + ", required: " + required);
7175
}
7276

73-
position = 0;
74-
int bytesRead = 0;
7577
int count;
76-
while (true) {
77-
count = fill(buffer, bytesRead, required - bytesRead);
78-
78+
// Try to fill the buffer.
79+
if (remaining > 0) {
80+
// Logical change 1 (from Kryo Input.require): "capacity - limit" -> "required - limit"
81+
count = fill(buffer, limit, required - limit);
7982
if (count == -1) {
80-
throw new KryoException(new EOFException("No more bytes left."));
83+
throw new KryoBufferUnderflowException("Buffer underflow.");
8184
}
82-
83-
bytesRead += count;
84-
if (bytesRead == required) {
85-
break;
85+
remaining += count;
86+
if (remaining >= required) {
87+
limit += count;
88+
return remaining;
8689
}
8790
}
88-
limit = required;
89-
return required;
91+
92+
// Was not enough, compact and try again.
93+
System.arraycopy(buffer, position, buffer, 0, remaining);
94+
total += position;
95+
position = 0;
96+
97+
do {
98+
// Logical change 2 (from Kryo Input.require): "capacity - remaining" -> "required -
99+
// remaining"
100+
count = fill(buffer, remaining, required - remaining);
101+
if (count == -1) {
102+
throw new KryoBufferUnderflowException("Buffer underflow.");
103+
}
104+
remaining += count;
105+
} while (remaining < required);
106+
107+
limit = remaining;
108+
return remaining;
90109
}
91110

92111
@Override

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.util.InstantiationUtil;
2929

3030
import com.esotericsoftware.kryo.Kryo;
31+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3132
import org.objenesis.strategy.StdInstantiatorStrategy;
3233

3334
import java.io.IOException;
@@ -142,12 +143,11 @@ private void checkKryoInitialized() {
142143
if (this.kryo == null) {
143144
this.kryo = new Kryo();
144145

145-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
146-
new Kryo.DefaultInstantiatorStrategy();
146+
DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
147147
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
148148
kryo.setInstantiatorStrategy(instantiatorStrategy);
149149

150-
this.kryo.setAsmEnabled(true);
150+
// this.kryo.setAsmEnabled(true);
151151
this.kryo.register(type);
152152
}
153153
}

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.util.InstantiationUtil;
3030

3131
import com.esotericsoftware.kryo.Kryo;
32+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3233
import org.objenesis.strategy.StdInstantiatorStrategy;
3334

3435
import java.io.IOException;
@@ -138,12 +139,11 @@ private void checkKryoInitialized() {
138139
if (this.kryo == null) {
139140
this.kryo = new Kryo();
140141

141-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
142-
new Kryo.DefaultInstantiatorStrategy();
143-
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
144-
kryo.setInstantiatorStrategy(instantiatorStrategy);
142+
DefaultInstantiatorStrategy initStrategy = new DefaultInstantiatorStrategy();
143+
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
144+
kryo.setInstantiatorStrategy(initStrategy);
145145

146-
this.kryo.setAsmEnabled(true);
146+
// this.kryo.setAsmEnabled(true);
147147

148148
KryoUtils.applyRegistrations(
149149
this.kryo, kryoRegistrations.values(), this.kryo.getNextRegistrationId());

0 commit comments

Comments
 (0)