Skip to content

Commit edd0466

Browse files
ysq
1 parent ab652a2 commit edd0466

File tree

22 files changed

+1067
-2
lines changed

22 files changed

+1067
-2
lines changed

README.md

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
# jlogstash-output-plugin
2-
java 版本 logstash output 插件
1+
output

elasticsearch/.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/target/
2+
target/*
3+
.project
4+
.settings
5+
.classpath
6+
.metadata/

elasticsearch/assembly.xml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
5+
<formats>
6+
<format>jar</format>
7+
</formats>
8+
<id>with-dependencies</id>
9+
<includeBaseDirectory>false</includeBaseDirectory>
10+
<dependencySets>
11+
<dependencySet>
12+
<outputDirectory>/</outputDirectory>
13+
<useProjectArtifact>true</useProjectArtifact>
14+
<unpack>true</unpack>
15+
<scope>runtime</scope>
16+
</dependencySet>
17+
</dependencySets>
18+
</assembly>

elasticsearch/pom.xml

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<groupId>com.dtstack.logstash.output</groupId>
6+
<artifactId>elasticsearch</artifactId>
7+
<version>0.0.1-SNAPSHOT</version>
8+
<packaging>jar</packaging>
9+
10+
<name>elasticsearch</name>
11+
<url>http://maven.apache.org</url>
12+
13+
<properties>
14+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
15+
</properties>
16+
17+
<build>
18+
<resources>
19+
<resource>
20+
<directory>src/main/java/</directory>
21+
</resource>
22+
<resource>
23+
<directory>src/main/resources/</directory>
24+
</resource>
25+
</resources>
26+
<plugins>
27+
<plugin>
28+
<groupId>org.apache.maven.plugins</groupId>
29+
<artifactId>maven-assembly-plugin</artifactId>
30+
<configuration>
31+
<descriptors>
32+
<descriptor>assembly.xml</descriptor>
33+
</descriptors>
34+
</configuration>
35+
<executions>
36+
<execution>
37+
<phase>package</phase>
38+
<goals>
39+
<goal>single</goal>
40+
</goals>
41+
</execution>
42+
</executions>
43+
</plugin>
44+
<plugin>
45+
<groupId>org.apache.maven.plugins</groupId>
46+
<artifactId>maven-compiler-plugin</artifactId>
47+
<version>2.3.2</version>
48+
<configuration>
49+
<source>1.7</source>
50+
<target>1.7</target>
51+
<compilerVersion>1.7</compilerVersion>
52+
</configuration>
53+
</plugin>
54+
</plugins>
55+
</build>
56+
57+
<dependencies>
58+
<dependency>
59+
<groupId>org.elasticsearch</groupId>
60+
<artifactId>elasticsearch</artifactId>
61+
<version>2.3.4</version>
62+
</dependency>
63+
</dependencies>
64+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
package com.dtstack.logstash.outputs;
2+
3+
import java.net.InetAddress;
4+
import java.net.UnknownHostException;
5+
import java.util.HashMap;
6+
import java.util.List;
7+
import java.util.Map;
8+
9+
import org.apache.commons.lang3.StringUtils;
10+
import org.elasticsearch.client.transport.TransportClient;
11+
import org.elasticsearch.common.settings.Settings;
12+
import org.elasticsearch.common.transport.InetSocketTransportAddress;
13+
import org.elasticsearch.common.settings.Settings.Builder;
14+
15+
import com.dtstack.logstash.annotation.Required;
16+
import com.dtstack.logstash.render.Formatter;
17+
import com.dtstack.logstash.render.FreeMarkerRender;
18+
import com.dtstack.logstash.render.TemplateRender;
19+
import com.google.common.collect.Lists;
20+
import com.google.common.collect.Maps;
21+
22+
import org.elasticsearch.action.ActionRequest;
23+
import org.elasticsearch.action.bulk.BulkItemResponse;
24+
import org.elasticsearch.action.bulk.BulkProcessor;
25+
import org.elasticsearch.action.bulk.BulkRequest;
26+
import org.elasticsearch.action.bulk.BulkResponse;
27+
import org.elasticsearch.action.index.IndexRequest;
28+
import org.elasticsearch.common.unit.ByteSizeUnit;
29+
import org.elasticsearch.common.unit.ByteSizeValue;
30+
import org.elasticsearch.common.unit.TimeValue;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
35+
/**
36+
*
37+
* Reason: TODO ADD REASON(可选)
38+
* Date: 2016年8月31日 下午1:35:21
39+
* Company: www.dtstack.com
40+
* @author sishu.yss
41+
*
42+
*/
43+
public class Elasticsearch extends BaseOutput {
44+
private static final Logger logger = LoggerFactory.getLogger(Elasticsearch.class);
45+
46+
@Required(required=true)
47+
private static String index;
48+
49+
private static String indexTimezone = "UTC";
50+
51+
private static String documentId;
52+
53+
private static String documentType="logs";
54+
55+
private static String cluster;
56+
57+
@Required(required=true)
58+
private static List<String> hosts;
59+
60+
private static boolean sniff=true;
61+
62+
private static int bulkActions = 20000;
63+
64+
private static int bulkSize = 15;
65+
66+
private static int flushInterval = 1;
67+
68+
private static int concurrentRequests = 1;
69+
70+
private BulkProcessor bulkProcessor;
71+
72+
private TransportClient esclient;
73+
74+
private TemplateRender indexTypeRender =null;
75+
76+
private TemplateRender idRender =null;
77+
78+
public Elasticsearch(Map config) {
79+
super(config);
80+
}
81+
82+
public void prepare() {
83+
try {
84+
if (StringUtils.isNotBlank(documentId)) {
85+
idRender = new FreeMarkerRender(documentId,documentId);
86+
}
87+
indexTypeRender = new FreeMarkerRender(documentType,documentType);
88+
this.initESClient();
89+
} catch (Exception e) {
90+
logger.error(e.getMessage());
91+
System.exit(1);
92+
}
93+
}
94+
95+
96+
@SuppressWarnings("unchecked")
97+
private void initESClient() throws NumberFormatException,
98+
UnknownHostException {
99+
Builder builder = Settings.settingsBuilder().put("client.transport.sniff", sniff);
100+
if(StringUtils.isNotBlank(cluster)){
101+
builder.put("cluster.name", cluster);
102+
}
103+
Settings settings = builder.build();
104+
esclient = TransportClient.builder().settings(settings).build();
105+
for (String host : hosts) {
106+
String[] hp = host.split(":");
107+
String h = null, p = null;
108+
if (hp.length == 2) {
109+
h = hp[0];
110+
p = hp[1];
111+
} else if (hp.length == 1) {
112+
h = hp[0];
113+
p = "9300";
114+
}
115+
esclient.addTransportAddress(new InetSocketTransportAddress(
116+
InetAddress.getByName(h), Integer.parseInt(p)));
117+
}
118+
119+
bulkProcessor = BulkProcessor
120+
.builder(esclient, new BulkProcessor.Listener() {
121+
122+
@Override
123+
public void afterBulk(long arg0, BulkRequest arg1,
124+
BulkResponse arg2) {
125+
126+
ato.getAndSet(1);
127+
// logger.info("bulk done with executionId: " + arg0);
128+
List<ActionRequest> requests = arg1.requests();
129+
int toberetry = 0;
130+
int totalFailed = 0;
131+
for (BulkItemResponse item : arg2.getItems()) {
132+
if (item.isFailed()) {
133+
switch (item.getFailure().getStatus()) {
134+
case TOO_MANY_REQUESTS:
135+
case SERVICE_UNAVAILABLE:
136+
if (toberetry == 0) {
137+
logger.error("bulk has failed item which NEED to retry");
138+
logger.error(item.getFailureMessage());
139+
}
140+
toberetry++;
141+
bulkProcessor.add(requests.get(item
142+
.getItemId()));
143+
break;
144+
default:
145+
if (totalFailed == 0) {
146+
logger.error("bulk has failed item which do NOT need to retry");
147+
logger.error(item.getFailureMessage());
148+
}
149+
break;
150+
}
151+
152+
totalFailed++;
153+
}
154+
}
155+
156+
if (totalFailed > 0) {
157+
logger.info(totalFailed + " doc failed, "
158+
+ toberetry + " need to retry");
159+
} else {
160+
logger.debug("no failed docs");
161+
}
162+
163+
if (toberetry > 0) {
164+
try {
165+
logger.info("sleep " + toberetry / 2
166+
+ "millseconds after bulk failure");
167+
Thread.sleep(toberetry / 2);
168+
} catch (InterruptedException e) {
169+
// TODO Auto-generated catch block
170+
e.printStackTrace();
171+
}
172+
} else {
173+
logger.debug("no docs need to retry");
174+
}
175+
176+
}
177+
178+
@Override
179+
public void afterBulk(long arg0, BulkRequest arg1,
180+
Throwable arg2) {
181+
ato.getAndSet(2);
182+
logger.error("bulk got exception");
183+
String message = arg2.getMessage();
184+
logger.error(message);
185+
}
186+
187+
@Override
188+
public void beforeBulk(long arg0, BulkRequest arg1) {
189+
logger.info("executionId: " + arg0);
190+
logger.info("numberOfActions: "
191+
+ arg1.numberOfActions());
192+
}
193+
})
194+
.setBulkActions(bulkActions)
195+
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
196+
.setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
197+
.setConcurrentRequests(concurrentRequests).build();
198+
}
199+
200+
protected void emit(Map event) {
201+
String _index = Formatter.format(event, index, indexTimezone);
202+
String _indexType = indexTypeRender.render(event);
203+
IndexRequest indexRequest;
204+
if (idRender == null) {
205+
indexRequest = new IndexRequest(_index, _indexType).source(event);
206+
} else {
207+
String _id = idRender.render(event);
208+
indexRequest = new IndexRequest(_index, _indexType, _id)
209+
.source(event);
210+
}
211+
this.bulkProcessor.add(indexRequest);
212+
}
213+
214+
215+
public static void main(String[] args) throws InterruptedException{
216+
Map<String,Object> event = Maps.newConcurrentMap();
217+
event.put("tenant_id",4);
218+
event.put("@timestamp","2016-07-04T01:40:37.54Z");
219+
index ="dtlog-%{tenant_id}-%{+YYYY.MM.dd}";
220+
hosts = Lists.newArrayList("127.0.0.1:9300");
221+
Elasticsearch elasticsearch = new Elasticsearch(new HashMap<String,Object>());
222+
elasticsearch.prepare();
223+
elasticsearch.emit(event);
224+
}
225+
}

file/.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
/target/
2+
target/*
3+
.project
4+
.settings
5+
.classpath
6+
.metadata/

file/assembly.xml

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
<assembly
2+
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
5+
<formats>
6+
<format>jar</format>
7+
</formats>
8+
<id>with-dependencies</id>
9+
<includeBaseDirectory>false</includeBaseDirectory>
10+
<dependencySets>
11+
<dependencySet>
12+
<outputDirectory>/</outputDirectory>
13+
<useProjectArtifact>true</useProjectArtifact>
14+
<unpack>true</unpack>
15+
<scope>runtime</scope>
16+
</dependencySet>
17+
</dependencySets>
18+
</assembly>

0 commit comments

Comments
 (0)