Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support invokeXXX return Response #755

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion sdk/src/main/java/io/dapr/client/AbstractDaprClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryMethodResponse;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.SaveStateRequest;
Expand Down Expand Up @@ -296,7 +297,7 @@ public <T> Mono<State<T>> getState(String storeName, String key, Class<T> clazz)
*/
@Override
public <T> Mono<State<T>> getState(
String storeName, String key, StateOptions options, TypeRef<T> type) {
String storeName, String key, StateOptions options, TypeRef<T> type) {
GetStateRequest request = new GetStateRequest(storeName, key)
.setStateOptions(options);
return this.getState(request, type);
Expand Down Expand Up @@ -388,6 +389,14 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Cla
return this.queryState(request, TypeRef.get(clazz));
}

/**
* {@inheritDoc}
*/
@Override
public <T> Mono<QueryMethodResponse<T>> queryMethod(InvokeMethodRequest request, Class<T> clazz) {
return this.queryMethod(request, TypeRef.get(clazz));
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -542,6 +551,7 @@ public Mono<List<ConfigurationItem>> getConfiguration(
/**
* {@inheritDoc}
*/
@Override
public Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys) {
List<String> listOfKeys = filterEmptyKeys(keys);
SubscribeConfigurationRequest request = new SubscribeConfigurationRequest(storeName, listOfKeys);
Expand All @@ -551,6 +561,7 @@ public Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName,
/**
* {@inheritDoc}
*/
@Override
public Flux<List<ConfigurationItem>> subscribeToConfiguration(
String storeName,
List<String> keys,
Expand Down
90 changes: 68 additions & 22 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryMethodResponse;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
Expand All @@ -54,6 +55,7 @@
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
import org.jetbrains.annotations.NotNull;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
Expand All @@ -62,7 +64,10 @@

import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -186,34 +191,17 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
@Override
public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef<T> type) {
try {
String appId = invokeMethodRequest.getAppId();
String method = invokeMethodRequest.getMethod();
Object body = invokeMethodRequest.getBody();
HttpExtension httpExtension = invokeMethodRequest.getHttpExtension();
DaprProtos.InvokeServiceRequest envelope = buildInvokeServiceRequest(
httpExtension,
appId,
method,
body);
// Regarding missing metadata in method invocation for gRPC:
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

return Mono.subscriberContext().flatMap(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
)
).flatMap(
return queryMethod(invokeMethodRequest).flatMap(
it -> {
try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().getValue().toByteArray(), type));
} catch (IOException e) {
throw DaprException.propagate(e);
} catch (IOException e) {
return DaprException.wrapMono(e);
}
}
);
} catch (Exception ex) {
return DaprException.wrapMono(ex);
} catch (Exception e) {
return DaprException.wrapMono(e);
}
}

Expand Down Expand Up @@ -717,6 +705,64 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
}
}

@Override
public <T> Mono<QueryMethodResponse<T>> queryMethod(InvokeMethodRequest request, TypeRef<T> type) {
try {
return queryMethod(request).flatMap(r -> {
try {
return Mono.justOrEmpty(buildQueryMethodResponse(r, type));
} catch (Exception e) {
return DaprException.wrapMono(e);
}
});
} catch (Exception e) {
return DaprException.wrapMono(e);
}
}

@NotNull
private Mono<CommonProtos.InvokeResponse> queryMethod(InvokeMethodRequest invokeMethodRequest) throws IOException {
String appId = invokeMethodRequest.getAppId();
String method = invokeMethodRequest.getMethod();
Object body = invokeMethodRequest.getBody();
HttpExtension httpExtension = invokeMethodRequest.getHttpExtension();
DaprProtos.InvokeServiceRequest envelope = buildInvokeServiceRequest(
httpExtension,
appId,
method,
body);
// Regarding missing metadata in method invocation for gRPC:
// gRPC to gRPC does not handle metadata in Dapr runtime proto.
// gRPC to HTTP does not map correctly in Dapr runtime as per https://github.com/dapr/dapr/issues/2342

Mono<CommonProtos.InvokeResponse> invokeResponseMono = Mono.subscriberContext().flatMap(
context -> this.<CommonProtos.InvokeResponse>createMono(
it -> intercept(context, asyncStub).invokeService(envelope, it)
)
);
return invokeResponseMono;
}

private <T> QueryMethodResponse<T> buildQueryMethodResponse(CommonProtos.InvokeResponse response,
TypeRef<T> type) throws IOException {
Map<String, String> headers = new HashMap<>();
headers.put(io.dapr.client.domain.Metadata.CONTENT_TYPE,response.getContentType());
byte[] respBody = response.getData().getValue().toByteArray();
if (respBody.length > 1 && respBody[0] == 34) {
respBody = Arrays.copyOfRange(respBody, 1, respBody.length - 1);
}
Object data;
if (type.getType() == String.class) {
data = new String(respBody, StandardCharsets.UTF_8);
} else if (type.getType() == byte[].class) {
data = respBody;
} else {
data = objectSerializer.deserialize(respBody, type);
}

return new QueryMethodResponse<T>(200, headers, (T) data);
}

private <T> QueryStateItem<T> buildQueryStateKeyValue(
DaprProtos.QueryStateItem item,
TypeRef<T> type) throws IOException {
Expand Down
118 changes: 85 additions & 33 deletions sdk/src/main/java/io/dapr/client/DaprClientHttp.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.dapr.client.domain.InvokeBindingRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.PublishEventRequest;
import io.dapr.client.domain.QueryMethodResponse;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
Expand All @@ -46,13 +47,17 @@
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -180,40 +185,16 @@ public Mono<Void> publishEvent(PublishEventRequest request) {
@Override
public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef<T> type) {
try {
final String appId = invokeMethodRequest.getAppId();
final String method = invokeMethodRequest.getMethod();
final Object request = invokeMethodRequest.getBody();
final HttpExtension httpExtension = invokeMethodRequest.getHttpExtension();
final String contentType = invokeMethodRequest.getContentType();
if (httpExtension == null) {
throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead.");
}
// If the httpExtension is not null, then the method will not be null based on checks in constructor
final String httpMethod = httpExtension.getMethod().toString();
if (appId == null || appId.trim().isEmpty()) {
throw new IllegalArgumentException("App Id cannot be null or empty.");
}
if (method == null || method.trim().isEmpty()) {
throw new IllegalArgumentException("Method name cannot be null or empty.");
}


String[] methodSegments = method.split("/");

List<String> pathSegments = new ArrayList<>(Arrays.asList(DaprHttp.API_VERSION, "invoke", appId, "method"));
pathSegments.addAll(Arrays.asList(methodSegments));

byte[] serializedRequestBody = objectSerializer.serialize(request);
final Map<String, String> headers = new HashMap<>();
if (contentType != null && !contentType.isEmpty()) {
headers.put("content-type", contentType);
if (!(type instanceof ParameterizedType) || ((ParameterizedType)type).getRawType() != QueryMethodResponse.class) {
return queryMethod(invokeMethodRequest).flatMap(r -> getMono(type, r));
} else {
TypeRef resultType = type;
Type[] actualTypeArguments = ((ParameterizedType) type.getType()).getActualTypeArguments();
if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
resultType = TypeRef.get(actualTypeArguments[0]);
}
return this.queryMethod(invokeMethodRequest, resultType);
}
headers.putAll(httpExtension.getHeaders());
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]),
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
);
return response.flatMap(r -> getMono(type, r));
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
Expand Down Expand Up @@ -697,6 +678,61 @@ public <T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, Typ
}
}

/**
* {@inheritDoc}
*/
@Override
public <T> Mono<QueryMethodResponse<T>> queryMethod(InvokeMethodRequest request, TypeRef<T> type) {
try {
return queryMethod(request).flatMap(r -> {
try {
return Mono.justOrEmpty(buildQueryMethodResponse(r, type));
} catch (Exception e) {
return DaprException.wrapMono(e);
}
});
} catch (Exception e) {
return DaprException.wrapMono(e);
}
}

private Mono<DaprHttp.Response> queryMethod(InvokeMethodRequest invokeMethodRequest) throws IOException {
final String appId = invokeMethodRequest.getAppId();
final String method = invokeMethodRequest.getMethod();
final Object request = invokeMethodRequest.getBody();
final HttpExtension httpExtension = invokeMethodRequest.getHttpExtension();
final String contentType = invokeMethodRequest.getContentType();
if (httpExtension == null) {
throw new IllegalArgumentException("HttpExtension cannot be null. Use HttpExtension.NONE instead.");
}
// If the httpExtension is not null, then the method will not be null based on checks in constructor
final String httpMethod = httpExtension.getMethod().toString();
if (appId == null || appId.trim().isEmpty()) {
throw new IllegalArgumentException("App Id cannot be null or empty.");
}
if (method == null || method.trim().isEmpty()) {
throw new IllegalArgumentException("Method name cannot be null or empty.");
}


String[] methodSegments = method.split("/");

List<String> pathSegments = new ArrayList<>(Arrays.asList(DaprHttp.API_VERSION, "invoke", appId, "method"));
pathSegments.addAll(Arrays.asList(methodSegments));

byte[] serializedRequestBody = objectSerializer.serialize(request);
final Map<String, String> headers = new HashMap<>();
if (contentType != null && !contentType.isEmpty()) {
headers.put("content-type", contentType);
}
headers.putAll(httpExtension.getHeaders());
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(httpMethod, pathSegments.toArray(new String[0]),
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
);
return response;
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -758,6 +794,22 @@ private <T> QueryStateResponse<T> buildQueryStateResponse(DaprHttp.Response resp
return new QueryStateResponse<>(result, token).setMetadata(metadata);
}

private <T> QueryMethodResponse<T> buildQueryMethodResponse(DaprHttp.Response response,
TypeRef<T> type) throws IOException {
byte[] respBody = response.getBody();
Object data = null;
if (Objects.nonNull(type)) {
if (type.getType() == String.class) {
data = new String(respBody, StandardCharsets.UTF_8);
} else if (type.getType() == byte[].class) {
data = respBody;
} else {
data = objectSerializer.deserialize(respBody, type);
}
}
return new QueryMethodResponse<T>(response.getStatusCode(), response.getHeaders(), (T) data);
}

/**
* {@inheritDoc}
*/
Expand Down
22 changes: 22 additions & 0 deletions sdk/src/main/java/io/dapr/client/DaprPreviewClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.InvokeMethodRequest;
import io.dapr.client.domain.QueryMethodResponse;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.SubscribeConfigurationRequest;
Expand Down Expand Up @@ -223,4 +225,24 @@ <T> Mono<QueryStateResponse<T>> queryState(String storeName, Query query,
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryStateResponse<T>> queryState(QueryStateRequest request, TypeRef<T> type);

/**
* Query for method using a query request.
*
* @param request Query request object.
* @param clazz The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryMethodResponse<T>> queryMethod(InvokeMethodRequest request, Class<T> clazz);

/**
* Query for method using a query request.
*
* @param request Query request object.
* @param type The type needed as return for the call.
* @param <T> The Type of the return, use byte[] to skip serialization.
* @return A Mono of QueryStateResponse of type T.
*/
<T> Mono<QueryMethodResponse<T>> queryMethod(InvokeMethodRequest request, TypeRef<T> type);
}
Loading