1
1
/*
2
2
*
3
- * Copyright 2020 IBM Corporation
3
+ * Copyright 2020, 2023 IBM Corporation
4
4
*
5
5
* Licensed under the Apache License, Version 2.0 (the "License");
6
6
* you may not use this file except in compliance with the License.
18
18
19
19
package com .ibm .eventstreams .connect .jdbcsink ;
20
20
21
- import com .ibm .eventstreams .connect .jdbcsink .database .DatabaseFactory ;
22
- import com .ibm .eventstreams .connect .jdbcsink .database .IDatabase ;
21
+ import java .sql .SQLException ;
22
+ import java .time .Duration ;
23
+ import java .time .Instant ;
24
+ import java .util .Collection ;
25
+ import java .util .Map ;
26
+
23
27
import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
24
28
import org .apache .kafka .common .TopicPartition ;
25
29
import org .apache .kafka .connect .connector .Connector ;
30
+ import org .apache .kafka .connect .errors .ConnectException ;
26
31
import org .apache .kafka .connect .sink .SinkRecord ;
27
32
import org .apache .kafka .connect .sink .SinkTask ;
28
33
import org .apache .kafka .connect .sink .SinkTaskContext ;
29
34
import org .slf4j .Logger ;
30
35
import org .slf4j .LoggerFactory ;
31
36
32
- import java .sql .SQLException ;
33
- import java .time .Duration ;
34
- import java .time .Instant ;
35
- import java .util .Collection ;
36
- import java .util .Map ;
37
+ import com .ibm .eventstreams .connect .jdbcsink .database .DatabaseFactory ;
38
+ import com .ibm .eventstreams .connect .jdbcsink .database .IDatabase ;
37
39
38
40
public class JDBCSinkTask extends SinkTask {
39
41
private static final Logger logger = LoggerFactory .getLogger (JDBCSinkTask .class );
40
42
private static final String classname = JDBCSinkTask .class .getName ();
41
43
42
44
// TODO: needs to be generic and incorporate other database types
43
- // needs an interface
45
+ // needs an interface
44
46
private JDBCSinkConfig config ;
45
47
46
48
public IDatabase database ;
47
49
48
50
int remainingRetries ; // init max retries via config.maxRetries ...
49
51
50
52
/**
51
- * Start the Task. This should handle any configuration parsing and one-time setup of the task.
53
+ * Start the Task. This should handle any configuration parsing and one-time
54
+ * setup of the task.
55
+ *
52
56
* @param props initial configuration
53
57
*/
54
- @ Override public void start (Map <String , String > props ) {
58
+ @ Override
59
+ public void start (Map <String , String > props ) {
55
60
logger .trace ("[{}] Entry {}.start, props={}" , Thread .currentThread ().getId (), classname , props );
56
61
this .config = new JDBCSinkConfig (props );
57
62
58
- DatabaseFactory databaseFactory = new DatabaseFactory ();
63
+ DatabaseFactory databaseFactory = getDatabaseFactory ();
59
64
try {
60
65
this .database = databaseFactory .makeDatabase (this .config );
61
66
} catch (Exception e ) {
62
67
logger .error ("Failed to build the database {} " , e );
63
- e .printStackTrace ();
64
- throw e ;
68
+ throw new ConnectException (e );
65
69
}
66
70
67
71
logger .trace ("[{}] Exit {}.start" , Thread .currentThread ().getId (), classname );
68
72
}
69
73
74
+ protected DatabaseFactory getDatabaseFactory () {
75
+ DatabaseFactory databaseFactory = new DatabaseFactory ();
76
+ return databaseFactory ;
77
+ }
78
+
70
79
/**
71
80
* Put the records in the sink.
72
81
*
73
- * If this operation fails, the SinkTask may throw a {@link org.apache.kafka.connect.errors.RetriableException} to
74
- * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to
75
- * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the
76
- * batch will be retried.
82
+ * If this operation fails, the SinkTask may throw a
83
+ * {@link org.apache.kafka.connect.errors.RetriableException} to indicate that
84
+ * the framework should attempt to retry the same call again. Other exceptions
85
+ * will cause the task to be stopped immediately.
86
+ * {@link SinkTaskContext#timeout(long)} can be used to set the maximum time
87
+ * before the batch will be retried.
77
88
*
78
89
* @param records the set of records to send
79
90
*/
80
- @ Override public void put (Collection <SinkRecord > records ) {
91
+ @ Override
92
+ public void put (Collection <SinkRecord > records ) {
93
+ logger .trace ("[{}] Entry {}.put" , Thread .currentThread ().getId (), classname );
81
94
if (records .isEmpty ()) {
82
95
return ;
83
96
}
84
97
85
98
final SinkRecord first = records .iterator ().next ();
86
99
final int recordsCount = records .size ();
87
100
logger .info ("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the database..." ,
88
- recordsCount , first .topic (), first .kafkaPartition (), first .kafkaOffset ()
89
- );
101
+ recordsCount , first .topic (), first .kafkaPartition (), first .kafkaOffset ());
90
102
91
103
final String tableName = config .getString (JDBCSinkConfig .CONFIG_NAME_TABLE_NAME_FORMAT );
92
104
@@ -96,28 +108,37 @@ public class JDBCSinkTask extends SinkTask {
96
108
this .database .getWriter ().insert (tableName , records );
97
109
logger .info (String .format ("%d RECORDS PROCESSED" , records .size ()));
98
110
Instant finish = Instant .now ();
99
- long timeElapsed = Duration .between (start , finish ).toMillis (); // in millis
100
- logger .info (String .format ("Processed '%d' records" , records .size () ));
111
+ long timeElapsed = Duration .between (start , finish ).toMillis (); // in millis
112
+ logger .info (String .format ("Processed '%d' records" , records .size ()));
101
113
logger .info (String .format ("Total Execution time: %d" , timeElapsed ));
102
114
} catch (SQLException error ) {
103
115
logger .error ("Write of {} records failed, remainingRetries={}" , recordsCount , remainingRetries , error );
104
- // TODO: throw exception to cancel execution or retry?
116
+ throw new ConnectException (error );
117
+ } catch (final RuntimeException e ) {
118
+ logger .error ("Unexpected runtime exception: " , e );
119
+ throw e ;
105
120
}
121
+
122
+ logger .trace ("[{}] Exit {}.put" , Thread .currentThread ().getId (), classname );
106
123
}
107
124
108
- @ Override public void stop () {
125
+ @ Override
126
+ public void stop () {
109
127
}
110
128
111
- @ Override public void flush (Map <TopicPartition , OffsetAndMetadata > map ) {
129
+ @ Override
130
+ public void flush (Map <TopicPartition , OffsetAndMetadata > map ) {
112
131
// Not necessary
113
132
}
114
133
115
134
/**
116
- * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
135
+ * Get the version of this task. Usually this should be the same as the
136
+ * corresponding {@link Connector} class's version.
117
137
*
118
138
* @return the version, formatted as a String
119
139
*/
120
- @ Override public String version () {
140
+ @ Override
141
+ public String version () {
121
142
return getClass ().getPackage ().getImplementationVersion ();
122
143
}
123
144
}
0 commit comments