Skip to content

Commit

Permalink
add response interface and http,grpc implement
Browse files Browse the repository at this point in the history
Signed-off-by: naah69 <[email protected]>
  • Loading branch information
naah69 committed Jun 27, 2022
1 parent ab8f0cc commit 82c8e27
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 1 deletion.
24 changes: 24 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.response.DaprResponse;
import io.dapr.client.domain.response.GrpcDaprResponse;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
Expand All @@ -62,9 +64,13 @@

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -206,6 +212,11 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
).flatMap(
it -> {
try {
if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) {
Map<String, String> headers = new HashMap<>();
headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE,it.getContentType());
return getMono(type, it.getData().getValue().toByteArray(),headers);
}
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
Expand All @@ -217,6 +228,16 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
}
}

private <T> Mono<T> getMono(TypeRef<T> type, byte[] data, Map<String,String> headers) {
if (type.getType() instanceof ParameterizedType) {
Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
type = TypeRef.get(actualTypeArguments[0]);
}
}
return (Mono<T>) Mono.just(new GrpcDaprResponse<T>(data, headers,objectSerializer, type));
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -253,6 +274,9 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)
).flatMap(
it -> {
try {
if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) {
return getMono(type, it.getData().toByteArray(),it.getMetadataMap());
}
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprClientHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import io.dapr.client.domain.SubscribeConfigurationRequest;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.TransactionalStateRequest;
import io.dapr.client.domain.response.DaprResponse;
import io.dapr.client.domain.response.HttpDaprResponse;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
Expand All @@ -46,13 +48,16 @@
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -221,6 +226,15 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef

private <T> Mono<T> getMono(TypeRef<T> type, DaprHttp.Response r) {
try {
if (type.getType().getTypeName().startsWith(DaprResponse.class.getName())) {
if (type.getType() instanceof ParameterizedType) {
Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
type = TypeRef.get(actualTypeArguments[0]);
}
}
return (Mono<T>) Mono.just(new HttpDaprResponse<T>(r, objectSerializer, type));
}
T object = objectSerializer.deserialize(r.getBody(), type);
if (object == null) {
return Mono.empty();
Expand Down
41 changes: 41 additions & 0 deletions sdk/src/main/java/io/dapr/client/domain/response/DaprResponse.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client.domain.response;

import java.io.IOException;
import java.util.Map;

/**
* Response.
*/
public interface DaprResponse<T> {

/**
* get response code.
* @return response code
*/
int getCode();

/**
* get response data.
* @return response data
*/
T getData() throws IOException;

/**
* get response header.
* @return response header
*/
Map<String,String> getHeaders();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client.domain.response;

import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;

import java.io.IOException;
import java.util.Map;

/**
* GrpcDaprResponse.
*/
public class GrpcDaprResponse<T> implements DaprResponse<T> {

private final DaprObjectSerializer serializer;

private final TypeRef<T> type;

private final byte[] data;

private final Map<String,String> headers;

/**
* build grpc dapr response.
* @param data grpc invoke response data
* @param headers grpc invoke headers
* @param serializer objectSerializer
* @param type type
*/
public GrpcDaprResponse(byte[] data, Map<String,String> headers, DaprObjectSerializer serializer, TypeRef<T> type) {
this.data = data;
this.headers = headers;
this.serializer = serializer;
this.type = type;
}

@Override
public int getCode() {
// InvokeResponse didn't have it.
return 200;
}

@Override
public T getData() throws IOException {
return serializer.deserialize(data, type);
}

@Override
public Map<String, String> getHeaders() {
return this.headers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2021 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
limitations under the License.
*/

package io.dapr.client.domain.response;

import io.dapr.client.DaprHttp;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.utils.TypeRef;

import java.io.IOException;
import java.util.Map;

/**
* HttpDaprResponse.
*/
public class HttpDaprResponse<T> implements DaprResponse<T> {

private final DaprHttp.Response response;

private final DaprObjectSerializer serializer;

private final TypeRef<T> type;

/**
* build http dapr response.
* @param response http invoke response
* @param serializer serializer
* @param type type
*/
public HttpDaprResponse(DaprHttp.Response response, DaprObjectSerializer serializer, TypeRef<T> type) {
this.response = response;
this.serializer = serializer;
this.type = type;
}

@Override
public int getCode() {
return response.getStatusCode();
}

@Override
public T getData() throws IOException {
byte[] data = response.getBody();
if (type.getType() == String.class) {
return (T) new String(data);
}
return serializer.deserialize(data, type);
}

@Override
public Map<String, String> getHeaders() {
return response.getHeaders();
}
}
17 changes: 17 additions & 0 deletions sdk/src/test/java/io/dapr/client/DaprClientGrpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.response.DaprResponse;
import io.dapr.config.Properties;
import io.dapr.serializer.DaprObjectSerializer;
import io.dapr.serializer.DefaultObjectSerializer;
Expand Down Expand Up @@ -633,6 +634,22 @@ public void invokeServiceTest() {
assertEquals(expected, strOutput);
}

@Test
public void invokeServiceTestReturnResponse() throws IOException {
String expected = "Value";
doAnswer((Answer<Void>) invocation -> {
StreamObserver<CommonProtos.InvokeResponse> observer = (StreamObserver<CommonProtos.InvokeResponse>) invocation.getArguments()[1];
observer.onNext(CommonProtos.InvokeResponse.newBuilder().setData(getAny(expected)).build());
observer.onCompleted();
return null;
}).when(daprStub).invokeService(any(DaprProtos.InvokeServiceRequest.class), any());

Mono<DaprResponse<String>> result = client.invokeMethod("appId", "method", "request", HttpExtension.NONE, null, new TypeRef<DaprResponse<String>>() {});
DaprResponse<String> res = result.block();

assertEquals(expected, res.getData());
}

@Test
public void invokeServiceObjectTest() throws Exception {
MyObject object = new MyObject(1, "Value");
Expand Down
44 changes: 43 additions & 1 deletion sdk/src/test/java/io/dapr/client/DaprClientHttpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.dapr.client.domain.State;
import io.dapr.client.domain.StateOptions;
import io.dapr.client.domain.TransactionalStateOperation;
import io.dapr.client.domain.response.DaprResponse;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.serializer.DaprObjectSerializer;
Expand All @@ -39,6 +40,7 @@
import okio.BufferedSink;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;
Expand Down Expand Up @@ -417,7 +419,47 @@ public void invokeServiceWithContext() {
}

@Test
public void invokeBinding() {
public void invokeServiceReturnResponse() throws IOException {
String resultString = "request success";
String resultHeaderName = "test-header";
String resultHeaderValue = "1";
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/invoke/41/method/neworder")
.respond(resultString)
.addHeader(resultHeaderName,resultHeaderValue);

InvokeMethodRequest req = new InvokeMethodRequest("41", "neworder")
.setBody("request")
.setHttpExtension(HttpExtension.POST);
Mono<DaprResponse<String>> result = daprClientHttp.invokeMethod(req, new TypeRef<DaprResponse<String>>() {});
DaprResponse<String> response = result.block();
Assertions.assertNotNull(response);
Assertions.assertEquals(200, response.getCode());
Assertions.assertEquals(resultString,response.getData());
Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName));
}

@Test
public void invokeBinding() throws IOException {
String resultString = "request success";
String resultHeaderName = "test-header";
String resultHeaderValue = "1";
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
.respond(resultString)
.addHeader(resultHeaderName,resultHeaderValue);

Mono<DaprResponse<String>> mono = daprClientHttp.invokeBinding("sample-topic", "myoperation", "", new TypeRef<DaprResponse<String>>() {});
DaprResponse<String> response = mono.block();
Assertions.assertNotNull(response);
Assertions.assertEquals(200, response.getCode());
Assertions.assertEquals(resultString,response.getData());
Assertions.assertEquals(resultHeaderValue,response.getHeaders().get(resultHeaderName));
}

@Test
public void invokeBindingReturnResponse() {
Map<String, String> map = new HashMap<>();
mockInterceptor.addRule()
.post("http://127.0.0.1:3000/v1.0/bindings/sample-topic")
Expand Down

0 comments on commit 82c8e27

Please sign in to comment.