Skip to content

Commit 0f70844

Browse files
authoredMar 20, 2025··
Transactional stream binder (#188)
* Stream Binder transactions Signed-off-by: Anders Swanson <[email protected]>
1 parent 2ae791e commit 0f70844

File tree

33 files changed

+476
-113
lines changed

33 files changed

+476
-113
lines changed
 

‎database/spring-cloud-stream-binder-oracle-txeventq/spring-cloud-stream-binder-txeventq-sample/src/test/java/com/oracle/database/spring/cloud/stream/binder/sample/TxEventQSampleAppTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
@Testcontainers
2121
public class TxEventQSampleAppTest {
2222
@Container
23-
static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.6-slim-faststart")
23+
static OracleContainer oracleContainer = new OracleContainer("gvenzl/oracle-free:23.7-slim-faststart")
2424
.withStartupTimeout(Duration.ofMinutes(2))
2525
.withUsername("testuser")
2626
.withPassword("testpwd");

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/JMSMessageChannelBinder.java

-3
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,13 @@
3030
import com.oracle.database.spring.cloud.stream.binder.config.JmsProducerProperties;
3131
import com.oracle.database.spring.cloud.stream.binder.provisioning.JmsConsumerDestination;
3232
import com.oracle.database.spring.cloud.stream.binder.provisioning.JmsProducerDestination;
33-
34-
3533
import com.oracle.database.spring.cloud.stream.binder.utils.DestinationNameResolver;
3634
import com.oracle.database.spring.cloud.stream.binder.utils.JmsMessageDrivenChannelAdapterFactory;
3735
import com.oracle.database.spring.cloud.stream.binder.utils.JmsSendingMessageHandlerFactory;
3836
import jakarta.jms.Connection;
3937
import jakarta.jms.ConnectionFactory;
4038
import jakarta.jms.Session;
4139
import jakarta.jms.Topic;
42-
4340
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
4441
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
4542
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/TxEventQQueueProvisioner.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
** TxEventQ Support for Spring Cloud Stream
3-
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
3+
** Copyright (c) 2023, 2025 Oracle and/or its affiliates.
44
**
55
** This file has been modified by Oracle Corporation.
66
*/
@@ -24,6 +24,8 @@
2424

2525
package com.oracle.database.spring.cloud.stream.binder;
2626

27+
import java.sql.SQLException;
28+
2729
import com.oracle.database.spring.cloud.stream.binder.config.JmsConsumerProperties;
2830
import com.oracle.database.spring.cloud.stream.binder.config.JmsProducerProperties;
2931
import com.oracle.database.spring.cloud.stream.binder.plsql.OracleDBUtils;
@@ -34,9 +36,6 @@
3436
import jakarta.jms.JMSException;
3537
import jakarta.jms.Session;
3638
import jakarta.jms.Topic;
37-
38-
import java.sql.SQLException;
39-
4039
import oracle.jakarta.jms.AQjmsException;
4140
import org.slf4j.Logger;
4241
import org.slf4j.LoggerFactory;
@@ -137,7 +136,7 @@ private Topic provisionProducerTopic(String topicName,
137136
ExtendedProducerProperties<JmsProducerProperties> properties) {
138137
Connection aQConnection = null;
139138
Session session = null;
140-
Topic topic = null;
139+
Topic topic;
141140
try {
142141
aQConnection = connectionFactory.createConnection();
143142
session = aQConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
@@ -177,7 +176,7 @@ private Topic provisionConsumerTopic(String topicName,
177176
ExtendedConsumerProperties<JmsConsumerProperties> properties) {
178177
Connection aQConnection = null;
179178
Session session = null;
180-
Topic topic = null;
179+
Topic topic;
181180
try {
182181
aQConnection = connectionFactory.createConnection();
183182
session = aQConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE);

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsBinderGlobalConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
@Configuration
4747
public class JmsBinderGlobalConfiguration {
4848

49-
private final ConnectionFactory connectionFactory;
49+
private ConnectionFactory connectionFactory;
5050

5151
public JmsBinderGlobalConfiguration(ConnectionFactory connectionFactory) {
5252
this.connectionFactory = connectionFactory;

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/JmsExtendedBindingProperties.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@
2727
import java.util.Map;
2828

2929
import org.springframework.boot.context.properties.ConfigurationProperties;
30-
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
3130
import org.springframework.cloud.stream.binder.AbstractExtendedBindingProperties;
31+
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
3232

3333
@ConfigurationProperties("spring.cloud.stream.txeventq")
3434
public class JmsExtendedBindingProperties

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/config/TxEventQJmsConfiguration.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,15 @@
2424

2525
package com.oracle.database.spring.cloud.stream.binder.config;
2626

27+
import java.sql.SQLException;
28+
2729
import com.oracle.database.spring.cloud.stream.binder.TxEventQQueueProvisioner;
2830
import com.oracle.database.spring.cloud.stream.binder.plsql.OracleDBUtils;
29-
3031
import jakarta.jms.ConnectionFactory;
3132
import jakarta.jms.JMSException;
3233
import oracle.jakarta.jms.AQjmsConnectionFactory;
3334
import oracle.jakarta.jms.AQjmsFactory;
3435
import oracle.ucp.jdbc.PoolDataSource;
35-
36-
import java.sql.SQLException;
37-
3836
import org.slf4j.Logger;
3937
import org.slf4j.LoggerFactory;
4038
import org.springframework.boot.autoconfigure.AutoConfigureAfter;

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/plsql/OracleDBUtils.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,14 @@
2929
import java.sql.SQLException;
3030
import java.sql.Types;
3131

32+
import oracle.ucp.jdbc.PoolDataSource;
3233
import org.slf4j.Logger;
3334
import org.slf4j.LoggerFactory;
3435

35-
import oracle.ucp.jdbc.PoolDataSource;
36-
3736
public class OracleDBUtils {
3837

3938
private PoolDataSource pds = null;
40-
private final int dbversion;
39+
private int dbversion;
4140
private final Logger logger = LoggerFactory.getLogger(getClass());
4241

4342
private static final String CREATE_KB2_TEQ =

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/provisioning/JmsProducerDestination.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class JmsProducerDestination implements ProducerDestination {
3434

3535
private final Topic topic;
3636
private final int partitionCount;
37-
private final int dbversion;
37+
private int dbversion;
3838

3939
public JmsProducerDestination(Topic topic, int pCount, int dbversion) {
4040
this.topic = topic;

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/CustomSerializationMessageConverter.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
** TxEventQ Support for Spring Cloud Stream
3-
** Copyright (c) 2023, 2024 Oracle and/or its affiliates.
3+
** Copyright (c) 2023, 2025 Oracle and/or its affiliates.
44
**
55
** This file has been modified by Oracle Corporation.
66
*/
@@ -24,13 +24,12 @@
2424

2525
package com.oracle.database.spring.cloud.stream.binder.serialize;
2626

27+
import jakarta.jms.JMSException;
28+
import jakarta.jms.Message;
2729
import org.slf4j.Logger;
2830
import org.slf4j.LoggerFactory;
2931
import org.springframework.jms.support.converter.SimpleMessageConverter;
3032

31-
import jakarta.jms.JMSException;
32-
import jakarta.jms.Message;
33-
3433
public class CustomSerializationMessageConverter extends SimpleMessageConverter {
3534
public String deserializer = null;
3635

@@ -66,8 +65,8 @@ public Object fromMessage(Message jmsMessage) throws JMSException {
6665
}
6766

6867
if (!isInstanceOfDeserializer) {
69-
logger.debug("The configured deserializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.DeSerializer'");
70-
throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.database.spring.cloud.stream.binder.serialize.DeSerializer'");
68+
logger.debug("The configured deserializer class is not an instance of 'com.oracle.cstream.serialize.DeSerializer'");
69+
throw new IllegalArgumentException("The configured serializer class is not an instance of 'com.oracle.cstream.serialize.DeSerializer'");
7170
}
7271

7372
Deserializer<?> s = null;
@@ -79,7 +78,7 @@ public Object fromMessage(Message jmsMessage) throws JMSException {
7978
throw new IllegalArgumentException("Serializer object could not be initiated.");
8079
}
8180

82-
result = s.deserialize((byte[]) result);
81+
result = (Object) (s.deserialize((byte[]) result));
8382

8483
return result;
8584
}

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Deserializer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@
2525
package com.oracle.database.spring.cloud.stream.binder.serialize;
2626

2727
public interface Deserializer<T> {
28-
T deserialize(byte[] bytes);
28+
public T deserialize(byte[] bytes);
2929
}

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/serialize/Serializer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,5 @@
2525
package com.oracle.database.spring.cloud.stream.binder.serialize;
2626

2727
public interface Serializer {
28-
byte[] serialize(Object data);
28+
public byte[] serialize(Object data);
2929
}

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/DestinationNameResolver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.springframework.util.StringUtils;
2828

2929
public class DestinationNameResolver {
30-
private final AnonymousNamingStrategy namingStrategy;
30+
private AnonymousNamingStrategy namingStrategy;
3131

3232
public DestinationNameResolver(AnonymousNamingStrategy namingStrategy) {
3333
this.namingStrategy = namingStrategy;

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsMessageDrivenChannelAdapterFactory.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,12 @@
2626

2727
import com.oracle.database.spring.cloud.stream.binder.config.JmsConsumerProperties;
2828
import com.oracle.database.spring.cloud.stream.binder.serialize.CustomSerializationMessageConverter;
29-
3029
import jakarta.jms.BytesMessage;
3130
import jakarta.jms.Destination;
3231
import jakarta.jms.JMSException;
3332
import jakarta.jms.Message;
3433
import jakarta.jms.Session;
35-
34+
import oracle.jakarta.jms.AQjmsSession;
3635
import org.slf4j.Logger;
3736
import org.slf4j.LoggerFactory;
3837
import org.springframework.beans.BeansException;
@@ -60,7 +59,7 @@ public class JmsMessageDrivenChannelAdapterFactory
6059

6160
private ApplicationContext applicationContext;
6261

63-
private final Logger logger = LoggerFactory.getLogger(getClass());
62+
private Logger logger = LoggerFactory.getLogger(getClass());
6463

6564
public JmsMessageDrivenChannelAdapterFactory(
6665
ListenerContainerFactory listenerContainerFactory,
@@ -145,10 +144,12 @@ private static class RetryingChannelPublishingJmsMessageListener
145144

146145
private final RetryTemplate retryTemplate;
147146

148-
private final RecoveryCallback<Object> recoverer;
147+
private RecoveryCallback<Object> recoverer;
149148

150149
private String deSerializerClassName = null;
151150

151+
SpecCompliantJmsHeaderMapper headerMapper = new SpecCompliantJmsHeaderMapper();
152+
152153
RetryingChannelPublishingJmsMessageListener(
153154
ConsumerProperties properties,
154155
MessageRecoverer messageRecoverer,
@@ -191,6 +192,9 @@ public Object doWithRetry(RetryContext retryContext)
191192
RETRY_CONTEXT_MESSAGE_ATTRIBUTE,
192193
jmsMessage
193194
);
195+
196+
headerMapper.setConnection(((AQjmsSession) session).getDBConnection());
197+
RetryingChannelPublishingJmsMessageListener.super.setHeaderMapper(headerMapper);
194198
RetryingChannelPublishingJmsMessageListener.super.onMessage(
195199
jmsMessage,
196200
session

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/JmsSendingMessageHandlerFactory.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
package com.oracle.database.spring.cloud.stream.binder.utils;
2626

27+
import jakarta.jms.Destination;
2728
import org.springframework.beans.BeansException;
2829
import org.springframework.beans.factory.BeanFactory;
2930
import org.springframework.beans.factory.BeanFactoryAware;
@@ -33,8 +34,6 @@
3334
import org.springframework.jms.core.JmsTemplate;
3435
import org.springframework.messaging.MessageChannel;
3536

36-
import jakarta.jms.Destination;
37-
3837
public class JmsSendingMessageHandlerFactory implements ApplicationContextAware, BeanFactoryAware {
3938

4039
private final JmsTemplate template;

‎database/spring-cloud-stream-binder-oracle-txeventq/src/main/java/com/oracle/database/spring/cloud/stream/binder/utils/ListenerContainerFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@
2626

2727
import jakarta.jms.ConnectionFactory;
2828
import jakarta.jms.Destination;
29-
3029
import org.slf4j.Logger;
3130
import org.slf4j.LoggerFactory;
3231
import org.springframework.jms.listener.AbstractMessageListenerContainer;
3332
import org.springframework.jms.listener.DefaultMessageListenerContainer;
3433

3534
public class ListenerContainerFactory {
36-
private final ConnectionFactory factory;
35+
36+
private ConnectionFactory factory;
3737

3838
private static final Logger logger = LoggerFactory.getLogger(ListenerContainerFactory.class);
3939

0 commit comments

Comments
 (0)
Please sign in to comment.