-
Notifications
You must be signed in to change notification settings - Fork 113
/
Copy pathContextPropagator.java
169 lines (158 loc) · 6.34 KB
/
ContextPropagator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Modifications copyright (C) 2017 Uber Technologies, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License is
* located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.uber.cadence.context;
import java.util.Map;
/**
* Context Propagators are used to propagate information from workflow to activity, workflow to
* child workflow, and workflow to child thread (using {@link com.uber.cadence.workflow.Async}).
*
* <p>It is important to note that all threads share one ContextPropagator instance, so your
* implementation <b>must</b> be thread-safe and store any state in ThreadLocal variables.
*
* <p>A sample <code>ContextPropagator</code> that copies all {@link org.slf4j.MDC} entries starting
* with a given prefix along the code path looks like this:
*
* <pre>{@code
* public class MDCContextPropagator implements ContextPropagator {
*
* public String getName() {
* return this.getClass().getName();
* }
*
* public Object getCurrentContext() {
* Map<String, String> context = new HashMap<>();
* for (Map.Entry<String, String> entry : MDC.getCopyOfContextMap().entrySet()) {
* if (entry.getKey().startsWith("X-")) {
* context.put(entry.getKey(), entry.getValue());
* }
* }
* return context;
* }
*
* public void setCurrentContext(Object context) {
* Map<String, String> contextMap = (Map<String, String>)context;
* for (Map.Entry<String, String> entry : contextMap.entrySet()) {
* MDC.put(entry.getKey(), entry.getValue());
* }
* }
*
* public Map<String, byte[]> serializeContext(Object context) {
* Map<String, String> contextMap = (Map<String, String>)context;
* Map<String, byte[]> serializedContext = new HashMap<>();
* for (Map.Entry<String, String> entry : contextMap.entrySet()) {
* serializedContext.put(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8));
* }
* return serializedContext;
* }
*
* public Object deserializeContext(Map<String, byte[]> context) {
* Map<String, String> contextMap = new HashMap<>();
* for (Map.Entry<String, byte[]> entry : context.entrySet()) {
* contextMap.put(entry.getKey(), new String(entry.getValue(), StandardCharsets.UTF_8));
* }
* return contextMap;
* }
* }
* }</pre>
*
* To set up the context propagators, you must configure them when: <br>
* Setting up your workflow/activity workers:
*
* <pre>{@code
* Worker.FactoryOptions factoryOptions = new Worker.FactoryOptions.Builder()
* .setContextPropagators(Collections.singletonList(new MDCContextPropagator()))
* .build();
* Worker.Factory factory = new Worker.Factory("cadence", 7933,"test-domain", factoryOptions);
* }</pre>
*
* <br>
* Creating your {@link com.uber.cadence.client.WorkflowClient}:
*
* <pre>{@code
* WorkflowOptions options = new WorkflowOptions.Builder()
* .setExecutionStartToCloseTimeout(Duration.ofSeconds(5))
* .setTaskList("myTaskList")
* .setContextPropagators(Collections.singletonList(new MDCContextPropagator()))
* .build();
*
* WorkflowClient workflowClient = WorkflowClient.newInstance("cadence", 7933,"test-domain");
* }</pre>
*
* <br>
* If you want to have override the {@code ContextPropagator} instances for your activities, you can
* specify them at the {@link com.uber.cadence.activity.ActivityOptions} level like so:
*
* <pre>{@code
* activities = Workflow.newActivityStub(Activity.class,
* new ActivityOptions.Builder()
* .setScheduleToCloseTimeout(Duration.ofSeconds(60))
* .setContextPropagators(Collections.singletonList(new MDCContextPropagator()))
* .build());
* }</pre>
*
* <br>
* And similarly, if you wish to override them for child workflows, you can do so when creating a
* {@link com.uber.cadence.workflow.ChildWorkflowStub}:
*
* <pre>{@code
* ChildWorkflow childWorkflow = Workflow.newChildWorkflowStub(ChildWorkflow.class,
* new ChildWorkflowOptions.Builder()
* .setContextPropagators(Collections.singletonList(new MDCContextPropagator()))
* .build());
* }</pre>
*/
public interface ContextPropagator {
/**
* Returns the name of the context propagator (for use in serialization and transfer).
* ContextPropagators will only be given context information
*
* @return The name of this propagator
*/
String getName();
/** Given context data, serialize it for transmission in the Cadence header */
Map<String, byte[]> serializeContext(Object context);
/** Turn the serialized header data into context object(s) */
Object deserializeContext(Map<String, byte[]> context);
/** Returns the current context in object form */
Object getCurrentContext();
/** Sets the current context */
void setCurrentContext(Object context);
/**
* This is a lifecycle method, called after the context has been propagated to the
* workflow/activity thread but the workflow/activity has not yet started.
*/
default void setUp() {
// No-op
}
/**
* This is a lifecycle method, called after the workflow/activity has completed. If the method
* finished without exception, {@code successful} will be true. Otherwise, it will be false and
* {@link #onError(Throwable)} will have already been called.
*/
default void finish(boolean successful) {
// No-op
}
/**
* This is a lifecycle method, called when the workflow/activity finishes by throwing an unhandled
* exception. {@link #finish(boolean)} is called after this method.
*
* @param t The unhandled exception that caused the workflow/activity to terminate
*/
default void onError(Throwable t) {
// No-op
}
}