Skip to content

Commit 6a41127

Browse files
elliotkennedyartembilan
authored andcommitted
GH-561: Use different header for JSON key
Fixes #561 Add a test to json serialize both a key and value in a stream. Configure Jackson2JavaKeyTypeMapper in JsonDeserializer and JsonSerializer when the isKey flag is present. Add collection json serialization tests. Use isKey to set json headers. update author and copyright date Configurable type headers. - Make Jackson2JavaTypeMapper methods default - Remove isKey ctors from JsonDeserializer and JsonSerializer - Remove setters from Jackson2JavaTypeMapper contract - Refactor key header config - Fix javadoc - Remove phantom author - Refactor key header config - Fix javadoc - Fix copyright - Add JsonSerde setUseTypeMapperForKey fluid builder - Remove get and set explicit type
1 parent 60f92c8 commit 6a41127

File tree

5 files changed

+332
-8
lines changed

5 files changed

+332
-8
lines changed

spring-kafka/src/main/java/org/springframework/kafka/support/converter/AbstractJavaTypeMapper.java

Lines changed: 66 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -37,6 +37,7 @@
3737
* @author Sam Nelson
3838
* @author Andreas Asplund
3939
* @author Gary Russell
40+
* @author Elliot Kennedy
4041
*
4142
* @since 2.1
4243
*/
@@ -57,22 +58,70 @@ public abstract class AbstractJavaTypeMapper implements BeanClassLoaderAware {
5758
*/
5859
public static final String DEFAULT_KEY_CLASSID_FIELD_NAME = "__KeyTypeId__";
5960

61+
/**
62+
* Default header name for key type information.
63+
*/
64+
public static final String KEY_DEFAULT_CLASSID_FIELD_NAME = "__Key_TypeId__";
65+
66+
/**
67+
* Default header name for key container object contents type information.
68+
*/
69+
public static final String KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME = "__Key_ContentTypeId__";
70+
71+
/**
72+
* Default header name for key map key type information.
73+
*/
74+
public static final String KEY_DEFAULT_KEY_CLASSID_FIELD_NAME = "__Key_KeyTypeId__";
75+
6076
private final Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
6177

6278
private final Map<Class<?>, byte[]> classIdMapping = new HashMap<Class<?>, byte[]>();
6379

80+
private String classIdFieldName = DEFAULT_CLASSID_FIELD_NAME;
81+
82+
private String contentClassIdFieldName = DEFAULT_CONTENT_CLASSID_FIELD_NAME;
83+
84+
private String keyClassIdFieldName = DEFAULT_KEY_CLASSID_FIELD_NAME;
85+
6486
private ClassLoader classLoader = ClassUtils.getDefaultClassLoader();
6587

6688
public String getClassIdFieldName() {
67-
return DEFAULT_CLASSID_FIELD_NAME;
89+
return this.classIdFieldName;
90+
}
91+
92+
/**
93+
* Configure header name for type information.
94+
* @param classIdFieldName the header name.
95+
* @since 2.1.3
96+
*/
97+
public void setClassIdFieldName(String classIdFieldName) {
98+
this.classIdFieldName = classIdFieldName;
6899
}
69100

70101
public String getContentClassIdFieldName() {
71-
return DEFAULT_CONTENT_CLASSID_FIELD_NAME;
102+
return this.contentClassIdFieldName;
103+
}
104+
105+
/**
106+
* Configure header name for container object contents type information.
107+
* @param contentClassIdFieldName the header name.
108+
* @since 2.1.3
109+
*/
110+
public void setContentClassIdFieldName(String contentClassIdFieldName) {
111+
this.contentClassIdFieldName = contentClassIdFieldName;
72112
}
73113

74114
public String getKeyClassIdFieldName() {
75-
return DEFAULT_KEY_CLASSID_FIELD_NAME;
115+
return this.keyClassIdFieldName;
116+
}
117+
118+
/**
119+
* Configure header name for map key type information.
120+
* @param keyClassIdFieldName the header name.
121+
* @since 2.1.3
122+
*/
123+
public void setKeyClassIdFieldName(String keyClassIdFieldName) {
124+
this.keyClassIdFieldName = keyClassIdFieldName;
76125
}
77126

78127
public void setIdClassMapping(Map<String, Class<?>> idClassMapping) {
@@ -133,4 +182,17 @@ public Map<String, Class<?>> getIdClassMapping() {
133182
return Collections.unmodifiableMap(this.idClassMapping);
134183
}
135184

185+
/**
186+
* Configure the TypeMapper to use default key type class.
187+
* @param isKey Use key type headers if true
188+
* @since 2.1.3
189+
*/
190+
public void setUseForKey(boolean isKey) {
191+
if (isKey) {
192+
setClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME);
193+
setContentClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_CONTENT_CLASSID_FIELD_NAME);
194+
setKeyClassIdFieldName(AbstractJavaTypeMapper.KEY_DEFAULT_KEY_CLASSID_FIELD_NAME);
195+
}
196+
}
197+
136198
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonDeserializer.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2017 the original author or authors.
2+
* Copyright 2015-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.common.serialization.ExtendedDeserializer;
2727

2828
import org.springframework.core.ResolvableType;
29+
import org.springframework.kafka.support.converter.AbstractJavaTypeMapper;
2930
import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper;
3031
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
3132
import org.springframework.util.Assert;
@@ -47,6 +48,7 @@
4748
* @author Artem Bilan
4849
* @author Gary Russell
4950
* @author Yanming Zhou
51+
* @author Elliot Kennedy
5052
*/
5153
public class JsonDeserializer<T> implements ExtendedDeserializer<T> {
5254

@@ -73,6 +75,8 @@ public class JsonDeserializer<T> implements ExtendedDeserializer<T> {
7375

7476
protected Jackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
7577

78+
private boolean typeMapperExplicitlySet = false;
79+
7680
public JsonDeserializer() {
7781
this((Class<T>) null);
7882
}
@@ -110,11 +114,27 @@ public Jackson2JavaTypeMapper getTypeMapper() {
110114
public void setTypeMapper(Jackson2JavaTypeMapper typeMapper) {
111115
Assert.notNull(typeMapper, "'typeMapper' cannot be null");
112116
this.typeMapper = typeMapper;
117+
this.typeMapperExplicitlySet = true;
118+
}
119+
120+
/**
121+
* Configure the default Jackson2JavaTypeMapper to use key type headers.
122+
* @param isKey Use key type headers if true
123+
* @since 2.1.3
124+
*/
125+
public void setUseTypeMapperForKey(boolean isKey) {
126+
if (!this.typeMapperExplicitlySet) {
127+
if (this.getTypeMapper() instanceof AbstractJavaTypeMapper) {
128+
AbstractJavaTypeMapper typeMapper = (AbstractJavaTypeMapper) this.getTypeMapper();
129+
typeMapper.setUseForKey(isKey);
130+
}
131+
}
113132
}
114133

115134
@SuppressWarnings("unchecked")
116135
@Override
117136
public void configure(Map<String, ?> configs, boolean isKey) {
137+
setUseTypeMapperForKey(isKey);
118138
try {
119139
if (isKey && configs.containsKey(DEFAULT_KEY_TYPE)) {
120140
if (configs.get(DEFAULT_KEY_TYPE) instanceof Class) {

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerde.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017 the original author or authors.
2+
* Copyright 2017-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,6 +39,7 @@
3939
* @param <T> target class for serialization/deserialization
4040
*
4141
* @author Marius Bogoevici
42+
* @author Elliot Kennedy
4243
*
4344
* @since 1.1.5
4445
*/
@@ -103,4 +104,16 @@ public Deserializer<T> deserializer() {
103104
return this.jsonDeserializer;
104105
}
105106

107+
/**
108+
* Configure the TypeMapper to use key types if the JsonSerde is used to serialize keys.
109+
* @param isKey Use key type headers if true
110+
* @return the JsonSerde
111+
* @since 2.1.3
112+
*/
113+
public JsonSerde<T> setUseTypeMapperForKey(boolean isKey) {
114+
this.jsonSerializer.setUseTypeMapperForKey(isKey);
115+
this.jsonDeserializer.setUseTypeMapperForKey(isKey);
116+
return this;
117+
}
118+
106119
}

spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerializer.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.common.serialization.ExtendedSerializer;
2525
import org.apache.kafka.common.serialization.Serializer;
2626

27+
import org.springframework.kafka.support.converter.AbstractJavaTypeMapper;
2728
import org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper;
2829
import org.springframework.kafka.support.converter.Jackson2JavaTypeMapper;
2930
import org.springframework.util.Assert;
@@ -40,6 +41,7 @@
4041
* @author Igor Stepanov
4142
* @author Artem Bilan
4243
* @author Gary Russell
44+
* @author Elliot Kennedy
4345
*/
4446
public class JsonSerializer<T> implements ExtendedSerializer<T> {
4547

@@ -54,6 +56,8 @@ public class JsonSerializer<T> implements ExtendedSerializer<T> {
5456

5557
protected Jackson2JavaTypeMapper typeMapper = new DefaultJackson2JavaTypeMapper();
5658

59+
private boolean typeMapperExplicitlySet = false;
60+
5761
public JsonSerializer() {
5862
this(new ObjectMapper());
5963
this.objectMapper.configure(MapperFeature.DEFAULT_VIEW_INCLUSION, false);
@@ -90,10 +94,26 @@ public Jackson2JavaTypeMapper getTypeMapper() {
9094
public void setTypeMapper(Jackson2JavaTypeMapper typeMapper) {
9195
Assert.notNull(typeMapper, "'typeMapper' cannot be null");
9296
this.typeMapper = typeMapper;
97+
this.typeMapperExplicitlySet = true;
98+
}
99+
100+
/**
101+
* Configure the default Jackson2JavaTypeMapper to use key type headers.
102+
* @param isKey Use key type headers if true
103+
* @since 2.1.3
104+
*/
105+
public void setUseTypeMapperForKey(boolean isKey) {
106+
if (!this.typeMapperExplicitlySet) {
107+
if (this.getTypeMapper() instanceof AbstractJavaTypeMapper) {
108+
AbstractJavaTypeMapper typeMapper = (AbstractJavaTypeMapper) this.getTypeMapper();
109+
typeMapper.setUseForKey(isKey);
110+
}
111+
}
93112
}
94113

95114
@Override
96115
public void configure(Map<String, ?> configs, boolean isKey) {
116+
setUseTypeMapperForKey(isKey);
97117
if (configs.containsKey(ADD_TYPE_INFO_HEADERS)) {
98118
Object config = configs.get(ADD_TYPE_INFO_HEADERS);
99119
if (config instanceof Boolean) {
@@ -134,5 +154,4 @@ public byte[] serialize(String topic, T data) {
134154
public void close() {
135155
// No-op
136156
}
137-
138157
}

0 commit comments

Comments
 (0)