Skip to content

Commit 359450a

Browse files
committed
es 8.12.0 support
1 parent 351e6c6 commit 359450a

29 files changed

+1871
-1403
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ jdk:
55

66
before_install:
77
- sudo rm -rf /var/lib/elasticsearch
8-
- curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.11.4-amd64.deb -o elasticsearch.deb && sudo dpkg -i --force-confnew elasticsearch.deb
8+
- curl https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.12.0-amd64.deb -o elasticsearch.deb && sudo dpkg -i --force-confnew elasticsearch.deb
99
- sudo cp ./src/test/resources/elasticsearch.yml /etc/elasticsearch/elasticsearch.yml
1010
- sudo cat /etc/elasticsearch/elasticsearch.yml
1111
- sudo java -version

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>org.nlpcn</groupId>
55
<artifactId>elasticsearch-sql</artifactId>
6-
<version>8.11.4.0</version>
6+
<version>8.12.0.0</version>
77
<packaging>jar</packaging>
88
<description>Query elasticsearch using SQL</description>
99
<name>elasticsearch-sql</name>
@@ -44,7 +44,7 @@
4444
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
4545
<runSuite>**/MainTestSuite.class</runSuite>
4646
<elasticsearch.plugin.name>sql</elasticsearch.plugin.name>
47-
<elasticsearch.version>8.11.4</elasticsearch.version>
47+
<elasticsearch.version>8.12.0</elasticsearch.version>
4848
<elasticsearch.plugin.classname>org.elasticsearch.plugin.nlpcn.SqlPlug</elasticsearch.plugin.classname>
4949
<druid.version>1.2.15</druid.version>
5050
<guava.version>32.0.0-jre</guava.version>

src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import com.alibaba.druid.util.Utils;
3232
import com.alibaba.druid.wall.WallFilter;
3333
import com.alibaba.druid.wall.WallProviderStatValue;
34-
import org.elasticsearch.client.internal.Client;
34+
import org.elasticsearch.plugin.nlpcn.client.ElasticsearchRestClient;
3535

3636
import javax.management.JMException;
3737
import javax.management.MBeanServer;
@@ -137,13 +137,13 @@ public class ElasticSearchDruidDataSource extends DruidDataSource {
137137
private boolean loadSpifilterSkip = false;
138138

139139
// elasticsearch client
140-
private final Client client;
140+
private final ElasticsearchRestClient client;
141141

142-
public ElasticSearchDruidDataSource(Client client) {
142+
public ElasticSearchDruidDataSource(ElasticsearchRestClient client) {
143143
this(false, client);
144144
}
145145

146-
public ElasticSearchDruidDataSource(boolean fairLock, Client client) {
146+
public ElasticSearchDruidDataSource(boolean fairLock, ElasticsearchRestClient client) {
147147
super(fairLock);
148148

149149
this.client = client;

src/main/java/com/alibaba/druid/pool/ElasticSearchDruidDataSourceFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.alibaba.druid.pool;
22

33
import org.elasticsearch.client.internal.Client;
4+
import org.elasticsearch.plugin.nlpcn.client.ElasticsearchRestClient;
45

56
import javax.sql.DataSource;
67
import java.util.Properties;
@@ -15,7 +16,7 @@ protected DataSource createDataSourceInternal(Properties properties) throws Exce
1516
throw new UnsupportedOperationException();
1617
}
1718

18-
public static DataSource createDataSource(Client client) {
19+
public static DataSource createDataSource(ElasticsearchRestClient client) {
1920
return new ElasticSearchDruidDataSource(client);
2021
}
2122
}

src/main/java/org/elasticsearch/join/aggregations/ChildrenAggregationBuilder.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
2323
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
2424
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
25-
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
2625
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
2726
import org.elasticsearch.xcontent.ParseField;
2827
import org.elasticsearch.xcontent.XContentBuilder;
@@ -162,11 +161,6 @@ public String getType() {
162161
return NAME;
163162
}
164163

165-
@Override
166-
protected ValuesSourceRegistry.RegistryKey<?> getRegistryKey() {
167-
return ValuesSourceRegistry.UNREGISTERED_KEY;
168-
}
169-
170164
@Override
171165
public TransportVersion getMinimalSupportedVersion() {
172166
return TransportVersions.ZERO;

src/main/java/org/elasticsearch/plugin/nlpcn/SqlPlug.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,15 @@
11
package org.elasticsearch.plugin.nlpcn;
22

3-
import org.elasticsearch.client.internal.Client;
43
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
54
import org.elasticsearch.cluster.node.DiscoveryNodes;
6-
import org.elasticsearch.cluster.routing.allocation.AllocationService;
7-
import org.elasticsearch.cluster.service.ClusterService;
8-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
95
import org.elasticsearch.common.settings.ClusterSettings;
106
import org.elasticsearch.common.settings.IndexScopedSettings;
117
import org.elasticsearch.common.settings.Settings;
128
import org.elasticsearch.common.settings.SettingsFilter;
13-
import org.elasticsearch.env.Environment;
14-
import org.elasticsearch.env.NodeEnvironment;
15-
import org.elasticsearch.indices.IndicesService;
169
import org.elasticsearch.plugins.ActionPlugin;
1710
import org.elasticsearch.plugins.Plugin;
18-
import org.elasticsearch.repositories.RepositoriesService;
1911
import org.elasticsearch.rest.RestController;
2012
import org.elasticsearch.rest.RestHandler;
21-
import org.elasticsearch.script.ScriptService;
22-
import org.elasticsearch.telemetry.TelemetryProvider;
23-
import org.elasticsearch.threadpool.ThreadPool;
24-
import org.elasticsearch.watcher.ResourceWatcherService;
25-
import org.elasticsearch.xcontent.NamedXContentRegistry;
2613

2714
import java.util.Collection;
2815
import java.util.Collections;
@@ -43,8 +30,8 @@ public String description() {
4330
}
4431

4532
@Override
46-
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool, ResourceWatcherService resourceWatcherService, ScriptService scriptService, NamedXContentRegistry xContentRegistry, Environment environment, NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<RepositoriesService> repositoriesServiceSupplier, TelemetryProvider telemetryProvider, AllocationService allocationService, IndicesService indicesService) {
47-
return Collections.singletonList(new NamedXContentRegistryHolder(xContentRegistry));
33+
public Collection<?> createComponents(PluginServices services) {
34+
return Collections.singletonList(new NamedXContentRegistryHolder(services.xContentRegistry()));
4835
}
4936

5037
@Override
Lines changed: 42 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,30 @@
11
package org.elasticsearch.plugin.nlpcn.client;
22

33
import co.elastic.clients.elasticsearch.ElasticsearchClient;
4-
import co.elastic.clients.json.JsonpMapper;
5-
import co.elastic.clients.json.JsonpSerializable;
4+
import com.google.common.collect.Maps;
65
import org.elasticsearch.action.ActionListener;
76
import org.elasticsearch.action.ActionRequest;
87
import org.elasticsearch.action.ActionResponse;
98
import org.elasticsearch.action.ActionType;
10-
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction;
11-
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
12-
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsAction;
13-
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
14-
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
15-
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
16-
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
17-
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
18-
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
19-
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
20-
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
21-
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
22-
import org.elasticsearch.action.admin.indices.get.GetIndexAction;
23-
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
24-
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
25-
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
26-
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
27-
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
28-
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
29-
import org.elasticsearch.action.bulk.BulkAction;
30-
import org.elasticsearch.action.bulk.BulkRequest;
31-
import org.elasticsearch.action.bulk.BulkResponse;
32-
import org.elasticsearch.action.search.MultiSearchAction;
33-
import org.elasticsearch.action.search.MultiSearchRequest;
34-
import org.elasticsearch.action.search.MultiSearchResponse;
35-
import org.elasticsearch.action.search.SearchAction;
36-
import org.elasticsearch.action.search.SearchRequest;
37-
import org.elasticsearch.action.search.SearchResponse;
38-
import org.elasticsearch.action.search.SearchScrollAction;
39-
import org.elasticsearch.action.search.SearchScrollRequest;
40-
import org.elasticsearch.action.support.master.AcknowledgedResponse;
419
import org.elasticsearch.client.internal.support.AbstractClient;
42-
import org.elasticsearch.core.CheckedFunction;
43-
import org.elasticsearch.index.reindex.BulkByScrollResponse;
44-
import org.elasticsearch.index.reindex.DeleteByQueryAction;
45-
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
10+
import org.elasticsearch.plugin.nlpcn.client.handler.ActionHandler;
11+
import org.elasticsearch.plugin.nlpcn.client.handler.BulkActionHandler;
12+
import org.elasticsearch.plugin.nlpcn.client.handler.ClusterStateActionHandler;
13+
import org.elasticsearch.plugin.nlpcn.client.handler.ClusterUpdateSettingsActionHandler;
14+
import org.elasticsearch.plugin.nlpcn.client.handler.CreateIndexActionHandler;
15+
import org.elasticsearch.plugin.nlpcn.client.handler.DeleteByQueryActionHandler;
16+
import org.elasticsearch.plugin.nlpcn.client.handler.DeleteIndexActionHandler;
17+
import org.elasticsearch.plugin.nlpcn.client.handler.GetIndexActionHandler;
18+
import org.elasticsearch.plugin.nlpcn.client.handler.MultiSearchActionHandler;
19+
import org.elasticsearch.plugin.nlpcn.client.handler.NodesInfoActionHandler;
20+
import org.elasticsearch.plugin.nlpcn.client.handler.PutMappingActionHandler;
21+
import org.elasticsearch.plugin.nlpcn.client.handler.RefreshActionHandler;
22+
import org.elasticsearch.plugin.nlpcn.client.handler.SearchActionHandler;
23+
import org.elasticsearch.plugin.nlpcn.client.handler.SearchScrollActionHandler;
4624

4725
import java.io.IOException;
4826
import java.io.UncheckedIOException;
27+
import java.util.Map;
4928
import java.util.Objects;
5029

5130
/**
@@ -55,85 +34,30 @@
5534
* @version V1.0
5635
* @since 2022-12-19 21:16
5736
*/
58-
public class ElasticsearchRestClient extends AbstractClient {
37+
public class ElasticsearchRestClient extends AbstractClient implements AutoCloseable {
5938

6039
private final ElasticsearchClient client;
61-
private final RequestConverter requestConverter;
62-
private final ResponseConverter responseConverter;
40+
private final Map<String, ActionHandler<ActionRequest, ?, ?, ActionResponse>> handlers = Maps.newHashMap();
6341

6442
public ElasticsearchRestClient(ElasticsearchClient client) {
6543
super(null, null);
6644

6745
this.client = client;
68-
JsonpMapper jsonpMapper = client._jsonpMapper();
69-
this.requestConverter = new RequestConverter(jsonpMapper);
70-
this.responseConverter = new ResponseConverter(jsonpMapper);
46+
registerHandler(client);
7147
}
7248

7349
@SuppressWarnings("unchecked")
7450
@Override
7551
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action, Request request, ActionListener<Response> listener) {
7652
try {
7753
String name = action.name();
78-
ActionResponse response;
79-
switch (name) {
80-
case ClusterUpdateSettingsAction.NAME:
81-
response = doExecute(client -> client.cluster().putSettings(requestConverter.putClusterSettingsRequest((ClusterUpdateSettingsRequest) request)),
82-
r -> responseConverter.parseJson(r, ClusterUpdateSettingsResponse::fromXContent));
83-
break;
84-
case ClusterStateAction.NAME:
85-
response = doExecute(client -> client.cluster().state(requestConverter.stateRequest((ClusterStateRequest) request)),
86-
responseConverter::clusterStateResponse);
87-
break;
88-
case NodesInfoAction.NAME:
89-
response = doExecute(client -> client.nodes().info(requestConverter.nodesInfoRequest((NodesInfoRequest) request)),
90-
responseConverter::nodesInfoResponse);
91-
break;
92-
case DeleteIndexAction.NAME:
93-
response = doExecute(client -> client.indices().delete(requestConverter.deleteIndexRequest((DeleteIndexRequest) request)),
94-
r -> responseConverter.parseJson(r, AcknowledgedResponse::fromXContent));
95-
break;
96-
case PutMappingAction.NAME:
97-
response = doExecute(client -> client.indices().putMapping(requestConverter.putMappingRequest((PutMappingRequest) request)),
98-
r -> responseConverter.parseJson(r, AcknowledgedResponse::fromXContent));
99-
break;
100-
case GetIndexAction.NAME:
101-
response = doExecute(client -> client.indices().get(requestConverter.getIndexRequest((GetIndexRequest) request)),
102-
responseConverter::getIndexResponse);
103-
break;
104-
case CreateIndexAction.NAME:
105-
response = doExecute(client -> client.indices().create(requestConverter.createIndexRequest((CreateIndexRequest) request)),
106-
r -> responseConverter.parseJson(r, CreateIndexResponse::fromXContent));
107-
break;
108-
case RefreshAction.NAME:
109-
response = doExecute(client -> client.indices().refresh(requestConverter.refreshRequest((RefreshRequest) request)),
110-
r -> responseConverter.parseJson(r, RefreshResponse::fromXContent));
111-
break;
112-
case BulkAction.NAME:
113-
response = doExecute(client -> client.bulk(requestConverter.bulkRequest((BulkRequest) request)),
114-
r -> responseConverter.parseJson(r, BulkResponse::fromXContent));
115-
break;
116-
case SearchAction.NAME:
117-
response = doExecute(client -> client.search(requestConverter.searchRequest((SearchRequest) request), Object.class),
118-
r -> responseConverter.parseJson(r, SearchResponse::fromXContent));
119-
break;
120-
case SearchScrollAction.NAME:
121-
response = doExecute(client -> client.scroll(requestConverter.scrollRequest((SearchScrollRequest) request), Object.class),
122-
r -> responseConverter.parseJson(r, SearchResponse::fromXContent));
123-
break;
124-
case MultiSearchAction.NAME:
125-
response = doExecute(client -> client.msearch(requestConverter.msearchRequest((MultiSearchRequest) request), Object.class),
126-
r -> responseConverter.parseJson(r, MultiSearchResponse::fromXContext));
127-
break;
128-
case DeleteByQueryAction.NAME:
129-
response = doExecute(client -> client.deleteByQuery(requestConverter.deleteByQueryRequest((DeleteByQueryRequest) request)),
130-
r -> responseConverter.parseJson(r, BulkByScrollResponse::fromXContent));
131-
break;
132-
default:
133-
listener.onFailure(new UnsupportedOperationException("elasticsearch rest client doesn't support action[" + name + "]"));
134-
return;
54+
ActionHandler<ActionRequest, ?, ?, ActionResponse> handler = handlers.get(name);
55+
if (Objects.isNull(handler)) {
56+
listener.onFailure(new UnsupportedOperationException("elasticsearch rest client doesn't support action[" + name + "]"));
57+
return;
13558
}
13659

60+
ActionResponse response = handler.handle(request);
13761
listener.onResponse((Response) response);
13862
} catch (Exception e) {
13963
listener.onFailure(e);
@@ -149,11 +73,24 @@ public void close() {
14973
}
15074
}
15175

152-
public <T extends JsonpSerializable, R extends ActionResponse> R doExecute(CheckedFunction<ElasticsearchClient, T, IOException> clientCallback, CheckedFunction<T, R, IOException> responseCallback) throws IOException {
153-
Objects.requireNonNull(clientCallback, "clientCallback must not be null");
154-
Objects.requireNonNull(responseCallback, "responseCallback must not be null");
76+
protected void registerHandler(ElasticsearchClient client) {
77+
doRegisterHandler(new BulkActionHandler(client));
78+
doRegisterHandler(new ClusterStateActionHandler(client));
79+
doRegisterHandler(new ClusterUpdateSettingsActionHandler(client));
80+
doRegisterHandler(new CreateIndexActionHandler(client));
81+
doRegisterHandler(new DeleteByQueryActionHandler(client));
82+
doRegisterHandler(new DeleteIndexActionHandler(client));
83+
doRegisterHandler(new GetIndexActionHandler(client));
84+
doRegisterHandler(new MultiSearchActionHandler(client));
85+
doRegisterHandler(new NodesInfoActionHandler(client));
86+
doRegisterHandler(new PutMappingActionHandler(client));
87+
doRegisterHandler(new RefreshActionHandler(client));
88+
doRegisterHandler(new SearchActionHandler(client));
89+
doRegisterHandler(new SearchScrollActionHandler(client));
90+
}
15591

156-
T response = clientCallback.apply(client);
157-
return responseCallback.apply(response);
92+
@SuppressWarnings({"rawtypes", "unchecked"})
93+
private void doRegisterHandler(ActionHandler actionHandler) {
94+
handlers.put(actionHandler.getName(), actionHandler);
15895
}
15996
}

0 commit comments

Comments
 (0)