From 82c8e27dbade9f27ee2b19de0bee665aeda43859 Mon Sep 17 00:00:00 2001 From: naah69 Date: Mon, 27 Jun 2022 14:36:55 +0800 Subject: [PATCH] add response interface and http,grpc implement Signed-off-by: naah69 --- .../java/io/dapr/client/DaprClientGrpc.java | 24 +++++++ .../java/io/dapr/client/DaprClientHttp.java | 14 ++++ .../client/domain/response/DaprResponse.java | 41 ++++++++++++ .../domain/response/GrpcDaprResponse.java | 64 +++++++++++++++++++ .../domain/response/HttpDaprResponse.java | 64 +++++++++++++++++++ .../io/dapr/client/DaprClientGrpcTest.java | 17 +++++ .../io/dapr/client/DaprClientHttpTest.java | 44 ++++++++++++- 7 files changed, 267 insertions(+), 1 deletion(-) create mode 100644 sdk/src/main/java/io/dapr/client/domain/response/DaprResponse.java create mode 100644 sdk/src/main/java/io/dapr/client/domain/response/GrpcDaprResponse.java create mode 100644 sdk/src/main/java/io/dapr/client/domain/response/HttpDaprResponse.java diff --git a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java index 39f0d227b4..2434866a78 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientGrpc.java @@ -37,6 +37,8 @@ import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.SubscribeConfigurationRequest; import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.client.domain.response.DaprResponse; +import io.dapr.client.domain.response.GrpcDaprResponse; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.internal.opencensus.GrpcWrapper; @@ -62,9 +64,13 @@ import java.io.Closeable; import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -206,6 +212,11 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef ).flatMap( it -> { try { + if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) { + Map headers = new HashMap<>(); + headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE,it.getContentType()); + return getMono(type, it.getData().getValue().toByteArray(),headers); + } return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type)); } catch (IOException e) { throw DaprException.propagate(e); @@ -217,6 +228,16 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef } } + private Mono getMono(TypeRef type, byte[] data, Map headers) { + if (type.getType() instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments(); + if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { + type = TypeRef.get(actualTypeArguments[0]); + } + } + return (Mono) Mono.just(new GrpcDaprResponse(data, headers,objectSerializer, type)); + } + /** * {@inheritDoc} */ @@ -253,6 +274,9 @@ public Mono invokeBinding(InvokeBindingRequest request, TypeRef type) ).flatMap( it -> { try { + if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) { + return getMono(type, it.getData().toByteArray(),it.getMetadataMap()); + } return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type)); } catch (IOException e) { throw DaprException.propagate(e); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java index f325c0e037..6346fa8f58 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientHttp.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientHttp.java @@ -36,6 +36,8 @@ import io.dapr.client.domain.SubscribeConfigurationRequest; import io.dapr.client.domain.TransactionalStateOperation; import io.dapr.client.domain.TransactionalStateRequest; +import io.dapr.client.domain.response.DaprResponse; +import io.dapr.client.domain.response.HttpDaprResponse; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; @@ -46,6 +48,8 @@ import reactor.core.publisher.Mono; import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -53,6 +57,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -221,6 +226,15 @@ public Mono invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef private Mono getMono(TypeRef type, DaprHttp.Response r) { try { + if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) { + if (type.getType() instanceof ParameterizedType) { + Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments(); + if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { + type = TypeRef.get(actualTypeArguments[0]); + } + } + return (Mono) Mono.just(new HttpDaprResponse(r, objectSerializer, type)); + } T object = objectSerializer.deserialize(r.getBody(), type); if (object == null) { return Mono.empty(); diff --git a/sdk/src/main/java/io/dapr/client/domain/response/DaprResponse.java b/sdk/src/main/java/io/dapr/client/domain/response/DaprResponse.java new file mode 100644 index 0000000000..61a65c9ff6 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/response/DaprResponse.java @@ -0,0 +1,41 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain.response; + +import java.io.IOException; +import java.util.Map; + +/** + * Response. + */ +public interface DaprResponse { + + /** + * get response code. + * @return response code + */ + int getCode(); + + /** + * get response data. + * @return response data + */ + T getData() throws IOException; + + /** + * get response header. + * @return response header + */ + Map getHeaders(); +} diff --git a/sdk/src/main/java/io/dapr/client/domain/response/GrpcDaprResponse.java b/sdk/src/main/java/io/dapr/client/domain/response/GrpcDaprResponse.java new file mode 100644 index 0000000000..62a41b1619 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/response/GrpcDaprResponse.java @@ -0,0 +1,64 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain.response; + +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.utils.TypeRef; + +import java.io.IOException; +import java.util.Map; + +/** + * GrpcDaprResponse. + */ +public class GrpcDaprResponse implements DaprResponse { + + private final DaprObjectSerializer serializer; + + private final TypeRef type; + + private final byte[] data; + + private final Map headers; + + /** + * build grpc dapr response. + * @param data grpc invoke response data + * @param headers grpc invoke headers + * @param serializer objectSerializer + * @param type type + */ + public GrpcDaprResponse(byte[] data, Map headers, DaprObjectSerializer serializer, TypeRef type) { + this.data = data; + this.headers = headers; + this.serializer = serializer; + this.type = type; + } + + @Override + public int getCode() { + // InvokeResponse didn't have it. + return 200; + } + + @Override + public T getData() throws IOException { + return serializer.deserialize(data, type); + } + + @Override + public Map getHeaders() { + return this.headers; + } +} diff --git a/sdk/src/main/java/io/dapr/client/domain/response/HttpDaprResponse.java b/sdk/src/main/java/io/dapr/client/domain/response/HttpDaprResponse.java new file mode 100644 index 0000000000..ffe37e8f14 --- /dev/null +++ b/sdk/src/main/java/io/dapr/client/domain/response/HttpDaprResponse.java @@ -0,0 +1,64 @@ +/* + * Copyright 2021 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.client.domain.response; + +import io.dapr.client.DaprHttp; +import io.dapr.serializer.DaprObjectSerializer; +import io.dapr.utils.TypeRef; + +import java.io.IOException; +import java.util.Map; + +/** + * HttpDaprResponse. + */ +public class HttpDaprResponse implements DaprResponse { + + private final DaprHttp.Response response; + + private final DaprObjectSerializer serializer; + + private final TypeRef type; + + /** + * build http dapr response. + * @param response http invoke response + * @param serializer serializer + * @param type type + */ + public HttpDaprResponse(DaprHttp.Response response, DaprObjectSerializer serializer, TypeRef type) { + this.response = response; + this.serializer = serializer; + this.type = type; + } + + @Override + public int getCode() { + return response.getStatusCode(); + } + + @Override + public T getData() throws IOException { + byte[] data = response.getBody(); + if (type.getType() == String.class) { + return (T) new String(data); + } + return serializer.deserialize(data, type); + } + + @Override + public Map getHeaders() { + return response.getHeaders(); + } +} diff --git a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java index 09c7f159b9..8630b2baf5 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java @@ -25,6 +25,7 @@ import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.client.domain.response.DaprResponse; import io.dapr.config.Properties; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; @@ -633,6 +634,22 @@ public void invokeServiceTest() { assertEquals(expected, strOutput); } + @Test + public void invokeServiceTestReturnResponse() throws IOException { + String expected = "Value"; + doAnswer((Answer) invocation -> { + StreamObserver observer = (StreamObserver) invocation.getArguments()[1]; + observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build()); + observer.onCompleted(); + return null; + }).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any()); + + Mono> result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, new TypeRef>() {}); + DaprResponse res = result.block(); + + assertEquals(expected, res.getData()); + } + @Test public void invokeServiceObjectTest() throws Exception { MyObject object = new MyObject(1, "Value"); diff --git a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java index 0cdf2a82ea..8e895bba2a 100644 --- a/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java @@ -24,6 +24,7 @@ import io.dapr.client.domain.State; import io.dapr.client.domain.StateOptions; import io.dapr.client.domain.TransactionalStateOperation; +import io.dapr.client.domain.response.DaprResponse; import io.dapr.config.Properties; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; @@ -39,6 +40,7 @@ import okio.BufferedSink; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; import org.mockito.Mockito; import reactor.core.publisher.Mono; import reactor.util.context.Context; @@ -417,7 +419,47 @@ public void invokeServiceWithContext() { } @Test - public void invokeBinding() { + public void invokeServiceReturnResponse() throws IOException { + String resultString = "request success"; + String resultHeaderName = "test-header"; + String resultHeaderValue = "1"; + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder") + .respond(resultString) + .addHeader(resultHeaderName,resultHeaderValue); + + InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder") + .setBody("request") + .setHttpExtension(HttpExtension.POST); + Mono> result = daprClientHttp.invokeMethod(req, new TypeRef>() {}); + DaprResponse response = result.block(); + Assertions.assertNotNull(response); + Assertions.assertEquals(200, response.getCode()); + Assertions.assertEquals(resultString,response.getData()); + Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName)); + } + + @Test + public void invokeBinding() throws IOException { + String resultString = "request success"; + String resultHeaderName = "test-header"; + String resultHeaderValue = "1"; + Map map = new HashMap<>(); + mockInterceptor.addRule() + .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic") + .respond(resultString) + .addHeader(resultHeaderName,resultHeaderValue); + + Mono> mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", new TypeRef>() {}); + DaprResponse response = mono.block(); + Assertions.assertNotNull(response); + Assertions.assertEquals(200, response.getCode()); + Assertions.assertEquals(resultString,response.getData()); + Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName)); + } + + @Test + public void invokeBindingReturnResponse() { Map map = new HashMap<>(); mockInterceptor.addRule() .post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")