File tree 1 file changed +20
-1
lines changed
kafka/src/main/java/com/dtstack/jlogstash/outputs
1 file changed +20
-1
lines changed Original file line number Diff line number Diff line change 33
33
import com .dtstack .jlogstash .annotation .Required ;
34
34
import com .dtstack .jlogstash .outputs .BaseOutput ;
35
35
import com .dtstack .jlogstash .render .Formatter ;
36
+ import com .google .common .collect .Maps ;
36
37
37
38
/**
38
39
*
@@ -69,7 +70,7 @@ public class Kafka extends BaseOutput {
69
70
@ Required (required =true )
70
71
private static String brokerList ;
71
72
72
- private Map <String ,String > producerSettings ;
73
+ private static Map <String ,String > producerSettings ;
73
74
74
75
@ SuppressWarnings ("rawtypes" )
75
76
public Kafka (Map config ) {
@@ -139,5 +140,23 @@ protected void emit(Map event) {
139
140
}
140
141
public static void main (String [] args ){
141
142
143
+ Kafka .topic ="oggoggogg" ;
144
+ Kafka .brokerList = "116.62.164.243:9092" ;
145
+
146
+ Kafka .producerSettings = Maps .newConcurrentMap ();
147
+
148
+ Kafka .producerSettings .put ("producer.type" , "async" );
149
+ Kafka .producerSettings .put ("key.serializer.class" , "kafka.serializer.StringEncoder" );
150
+ Kafka .producerSettings .put ("value.serializer.class" , "kafka.serializer.StringEncoder" );
151
+
152
+ Kafka kafka = new Kafka (Maps .newConcurrentMap ());
153
+ kafka .prepare ();
154
+
155
+ for (int i =0 ;i <10 ;i ++){
156
+ Map dd = Maps .newConcurrentMap ();
157
+ dd .put ("hhh" , 2123 );
158
+ kafka .emit (dd );
159
+ }
160
+
142
161
}
143
162
}
You can’t perform that action at this time.
0 commit comments