Skip to content

Commit

Permalink
IWF-122: add set search attributes and set data attributes (#262)
Browse files Browse the repository at this point in the history
Co-authored-by: Katie Atrops <[email protected]>
  • Loading branch information
ktrops and Katie Atrops authored Nov 14, 2024
1 parent 2412de7 commit 055ceaf
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 4 deletions.
1 change: 1 addition & 0 deletions script/.env
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ ELASTICSEARCH_VERSION=7.16.2
MYSQL_VERSION=8
POSTGRESQL_VERSION=13
TEMPORAL_VERSION=1.25
TEMPORAL_ADMIN_TOOLS_VERSION=1.25.2-tctl-1.18.1-cli-1.1.1
TEMPORAL_UI_VERSION=2.31.2
4 changes: 3 additions & 1 deletion script/docker-compose-init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ for run in {1..120}; do
sleep 0.1
temporal operator search-attribute create --name CustomStringField --type Text
sleep 0.1
temporal operator search-attribute create --name CustomKeywordArrayField --type KeywordList
sleep 0.1

if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField"; then
if checkExists "IwfWorkflowType" && checkExists "IwfGlobalWorkflowVersion" && checkExists "IwfExecutingStateIds" && checkExists "CustomKeywordField" && checkExists "CustomIntField" && checkExists "CustomBoolField" && checkExists "CustomDoubleField" && checkExists "CustomDatetimeField" && checkExists "CustomStringField" && checkExists "CustomKeywordArrayField"; then
echo "All search attributes are registered"
break
fi
Expand Down
2 changes: 1 addition & 1 deletion script/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ services:
environment:
- TEMPORAL_ADDRESS=temporal:7233
- TEMPORAL_CLI_ADDRESS=temporal:7233
image: temporalio/admin-tools:${TEMPORAL_VERSION}
image: temporalio/admin-tools:${TEMPORAL_ADMIN_TOOLS_VERSION}
networks:
- temporal-network
stdin_open: true
Expand Down
113 changes: 113 additions & 0 deletions src/main/java/io/iworkflow/core/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,68 @@ private Map<String, Object> doGetWorkflowDataAttributes(
return result;
}

/**
* Set the value for data attributes aka objects for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param dataAttributes required
* */
public void setWorkflowDataAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final Map<String, Object> dataAttributes) {
doSetWorkflowDataAttributes(workflowClass, workflowId, workflowRunId, dataAttributes);
}

/**
* Set the value for data attributes aka objects for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param dataAttributes required
* */
public void setWorkflowDataAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final Map<String, Object> dataAttributes) {
doSetWorkflowDataAttributes(workflowClass, workflowId, "", dataAttributes);
}

private void doSetWorkflowDataAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final Map<String, Object> dataAttributes
) {
//check if workflow type exists
final String wfType = workflowClass.getSimpleName();
checkWorkflowTypeExists(wfType);

for (final Map.Entry<String, Object> entry : dataAttributes.entrySet()) {
final String key = entry.getKey();
//check that key exists in the store
if (!registry.getDataAttributeTypeStore(wfType).isValidNameOrPrefix(key)) {
throw new IllegalArgumentException(String.format("data attribute %s is not registered", key));
}

final Class<?> registeredType = registry.getDataAttributeTypeStore(wfType).getType(key);
final Class<?> requestedType = entry.getValue().getClass();
//check that type is registered in schema
if (!requestedType.isAssignableFrom(registeredType)) {
throw new IllegalArgumentException(
String.format(
"registered type %s is not assignable from %s",
registeredType.getName(),
requestedType.getName()));
}
}

unregisteredClient.setAnyWorkflowDataObjects(workflowId, workflowRunId, dataAttributes);
}

/**
* This is a simplified API to search without pagination, use the other searchWorkflow API for pagination feature
*
Expand Down Expand Up @@ -834,6 +896,37 @@ public Map<String, Object> getAllSearchAttributes(
return doGetWorkflowSearchAttributes(workflowClass, workflowId, workflowRunId, null);
}


/**
* Set the value of search attributes for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param searchAttributes required
* */
public void setWorkflowSearchAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final List<SearchAttribute> searchAttributes) {
doSetWorkflowSearchAttributes(workflowClass, workflowId, "", searchAttributes);
}

/**
* Set the value of search attributes for a workflow
*
* @param workflowClass required
* @param workflowId required
* @param workflowRunId optional, can be empty
* @param searchAttributes required
* */
public void setWorkflowSearchAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final List<SearchAttribute> searchAttributes) {
doSetWorkflowSearchAttributes(workflowClass, workflowId, workflowRunId, searchAttributes);
}

/**
* Get all the search attributes of a workflow
*
Expand Down Expand Up @@ -934,6 +1027,26 @@ private Map<String, Object> doGetWorkflowSearchAttributes(
return result;
}

private void doSetWorkflowSearchAttributes(
final Class<? extends ObjectWorkflow> workflowClass,
final String workflowId,
final String workflowRunId,
final List<SearchAttribute> searchAttributes
) {
final String wfType = workflowClass.getSimpleName();
checkWorkflowTypeExists(wfType);

final Map<String, SearchAttributeValueType> searchAttributeKeyToTypeMap = registry.getSearchAttributeKeyToTypeMap(wfType);
//Check that the requested sa type is registered to the key
searchAttributes.forEach(sa -> {
final SearchAttributeValueType registeredValueType = searchAttributeKeyToTypeMap.get(sa.getKey());
if (sa.getValueType() != null && registeredValueType != null && !registeredValueType.equals(sa.getValueType())) {
throw new IllegalArgumentException(String.format("Search attribute key, %s is registered to type %s, but tried to add search attribute type %s", sa.getKey(), registeredValueType.getValue(), sa.getValueType().getValue()));
}
});
unregisteredClient.setAnyWorkflowSearchAttributes(workflowId, workflowRunId, searchAttributes);
}

static Object getSearchAttributeValue(final SearchAttributeValueType saType, final SearchAttribute searchAttribute) {
switch (saType) {
case INT:
Expand Down
45 changes: 44 additions & 1 deletion src/main/java/io/iworkflow/core/UnregisteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@
import io.iworkflow.gen.api.ApiClient;
import io.iworkflow.gen.api.DefaultApi;
import io.iworkflow.gen.models.EncodedObject;
import io.iworkflow.gen.models.KeyValue;
import io.iworkflow.gen.models.PersistenceLoadingPolicy;
import io.iworkflow.gen.models.SearchAttribute;
import io.iworkflow.gen.models.SearchAttributeKeyAndType;
import io.iworkflow.gen.models.StateCompletionOutput;
import io.iworkflow.gen.models.WorkflowAlreadyStartedOptions;
import io.iworkflow.gen.models.WorkflowGetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowGetDataObjectsResponse;
import io.iworkflow.gen.models.WorkflowGetRequest;
Expand All @@ -25,6 +26,8 @@
import io.iworkflow.gen.models.WorkflowRpcResponse;
import io.iworkflow.gen.models.WorkflowSearchRequest;
import io.iworkflow.gen.models.WorkflowSearchResponse;
import io.iworkflow.gen.models.WorkflowSetDataObjectsRequest;
import io.iworkflow.gen.models.WorkflowSetSearchAttributesRequest;
import io.iworkflow.gen.models.WorkflowSignalRequest;
import io.iworkflow.gen.models.WorkflowSkipTimerRequest;
import io.iworkflow.gen.models.WorkflowStartOptions;
Expand All @@ -35,6 +38,7 @@
import io.iworkflow.gen.models.WorkflowWaitForStateCompletionRequest;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -613,6 +617,28 @@ public WorkflowGetDataObjectsResponse getAnyWorkflowDataObjects(
}
}

public void setAnyWorkflowDataObjects(
final String workflowId,
final String workflowRunId,
final Map<String, Object> dataObjects
) {
final List<KeyValue> encodedObjects = dataObjects.entrySet().stream().map(entry -> new KeyValue()
.key(entry.getKey())
.value(clientOptions.getObjectEncoder().encode(entry.getValue())))
.collect(Collectors.toList());

try {
defaultApi.apiV1WorkflowDataobjectsSetPost(
new WorkflowSetDataObjectsRequest()
.workflowId(workflowId)
.workflowRunId(workflowRunId)
.objects(encodedObjects)
);
} catch (final FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}
}

public WorkflowSearchResponse searchWorkflow(final String query, final int pageSize) {

try {
Expand Down Expand Up @@ -650,6 +676,23 @@ public WorkflowGetSearchAttributesResponse getAnyWorkflowSearchAttributes(
}
}

public void setAnyWorkflowSearchAttributes(
final String workflowId,
final String workflowRunId,
final List<SearchAttribute> searchAttributes
) {
try {
defaultApi.apiV1WorkflowSearchattributesSetPost(
new WorkflowSetSearchAttributesRequest()
.workflowId(workflowId)
.workflowRunId(workflowRunId)
.searchAttributes(searchAttributes)
);
} catch (final FeignException.FeignClientException exp) {
throw IwfHttpException.fromFeignException(clientOptions.getObjectEncoder(), exp);
}
}

public <T> T invokeRpc(
Class<T> valueClass,
final Object input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,16 @@ public SearchAttributeRWImpl(final Map<String, SearchAttributeValueType> keyToTy
case INT:
int64AttributeMap.put(sa.getKey(), sa.getIntegerValue());
break;
case DOUBLE:
doubleAttributeMap.put(sa.getKey(), sa.getDoubleValue());
case BOOL:
boolAttributeMap.put(sa.getKey(), sa.getBoolValue());
break;
case KEYWORD_ARRAY:
stringArrayAttributeMap.put(sa.getKey(), sa.getStringArrayValue());
break;
default:
throw new IllegalStateException("empty search attribute value type shouldn't exist");
throw new IllegalStateException(String.format("empty or not supported search attribute value type, %s", type));
}
});
}
Expand Down
Loading

0 comments on commit 055ceaf

Please sign in to comment.