-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathNotifyHelper.ecl
349 lines (304 loc) · 12.1 KB
/
NotifyHelper.ecl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
/**
* This module provides an easy way to use ECL's NOTIFY() function to pass
* data between a running job and a waiting job. For more information on
* creating jobs that wait for events, see NOTIFY(), EVENT(), WHEN() and WAIT()
* within the ECL language reference manual.
*
* This module focuses more on the parameter-passing aspect of NOTIFY() than
* anything else, with the goal of being able to pass much richer data between
* jobs than usual. It is possible, with functions defined here, to pass
* arbitrary datasets and sets between jobs as well as simple string values.
*
* There are three submodules within this module, each encompassing separate
* aspects of encoding values to pass, decoding them, and actually
* triggering the NOTIFY():
*
* Encode
* AnyDataset()
* SetOfString()
* SimpleString()
* Decode
* AsAnyDataset()
* AsSetOfString()
* AsSimpleString()
* NotifyWith
* AnyDataset()
* SetOfString()
* SimpleString()
*
* This module has been designed to work together. Specifically, the values
* are coded and decoded with tags that the module knows about. Everything
* works best if you pair the writer and reader using the functions here
* (for instance, if you use NotifyWith.SetOfString() to send off the
* notification, the recipient should use Decode.AsSetOfString() to read the
* data).
*
* Documentation on the specific functions can be found inline. Example
* BWRs (one that would wait for a notification, and one that triggers it) can
* be found at the end of the file in comment blocks.
*
* Origin: https://github.com/hpccsystems-solutions-lab/Useful_ECL
*/
IMPORT Std;
EXPORT NotifyHelper := MODULE
/**
* The Encode module contains functions for encoding data and preparing
* it for use with the NOTIFY() ECL function. The module is intended to be
* used by running ECL jobs that need to send data to waiting jobs.
*
* Note that you can use the NotifyWith module functions to both encode
* the data and send the notification in one step. If those functions
* are used then the Encode functions do not need to be used separately.
*/
EXPORT Encode := MODULE
/**
* Converts a dataset into a string parameter suitable to be used
* as the second argument of the NOTIFY() ECL command, so the data
* can be passed to an ECL job waiting for it.
*
* The dataset should not be too large. If you want to pass a large
* amount of data, if would probably be better to write the data to
* a file and then pass the logical filename of that file instead.
*
* @param inDS A dataset to encode
*
* @return A string containing the encoded data.
*
* @see Decode.AsAnyDataset()
*/
EXPORT AnyDataset(inDS) := FUNCTIONMACRO
#UNIQUENAME(jsonDS);
LOCAL %jsonDS% := PROJECT
(
inDS,
TRANSFORM
(
{STRING s},
SELF.s := '{' + (STRING)TOJSON(LEFT) + '}'
)
);
#UNIQUENAME(rolledUpJSON);
LOCAL %rolledUpJSON% := ROLLUP
(
%jsonDS%,
TRUE,
TRANSFORM
(
RECORDOF(LEFT),
SELF.s := LEFT.s + IF(LEFT.s != '', ', ', '') + RIGHT.s
),
STABLE, ORDERED(TRUE)
);
#UNIQUENAME(finalJSON);
LOCAL %finalJSON% := '{"d": [' + %rolledUpJSON%[1].s + ']}';
#UNIQUENAME(xmlEncodedValue);
LOCAL %xmlEncodedValue% := '<JSON_DATA>' + %finalJSON% + '</JSON_DATA>';
#UNIQUENAME(result);
LOCAL %result% := '<Event>' + %xmlEncodedValue% + '</Event>';
RETURN %result%;
ENDMACRO;
/**
* Converts a SET OF STRING value into a string parameter suitable to
* be used as the second argument of the NOTIFY() ECL command, so the
* data can be passed to an ECL job waiting for it.
*
* @param inSet A SET OF STRING to encode
*
* @return A string containing the encoded data.
*
* @see Decode.AsSetOfString()
*/
EXPORT SetOfString(SET OF STRING inSet) := FUNCTION
RETURN AnyDataset(DATASET(inSet, {STRING s}));
END;
/**
* Encodes a simple string in a format suitable to be used as the
* second argument of the NOTIFY() ECL command, so the string can be
* passed to an ECL job waiting for it.
*
* @param inString A STRING to encode
*
* @return A string containing the encoded data.
*
* @see Decode.AsSimpleString()
*/
EXPORT SimpleString(STRING inString) := FUNCTION
xmlEncodedValue := '<SIMPLE_STRING>' + inString + '</SIMPLE_STRING>';
result := '<Event>' + xmlEncodedValue + '</Event>';
RETURN result;
END;
END;
/**
* The Decode module contains functions for decoding data previously
* created by functions within the Encode module. The module is intended to
* be used by waiting ECL jobs that have just awakened by a notification
* and need to extract passed-in data from the event.
*/
EXPORT Decode := MODULE
/**
* Reads the data originally posted to an event by Encode.AnyDataset()
* and creates a new dataset using the record definition provided.
*
* @param datasetLayout The RECORD definition describing the
* data; it is helpful to explicitly tag
* attributes within the definition with
* {XPATH()} options
*
* @return A new dataset in datasetLayout format containing the
* passed data
*
* @see Encode.AnyDataset()
*/
EXPORT AsAnyDataset(datasetLayout) := FUNCTIONMACRO
#UNIQUENAME(dataAsJSON);
LOCAL %dataAsJSON% := EVENTEXTRA('JSON_DATA') : GLOBAL;
#UNIQUENAME(TempLayout);
LOCAL %TempLayout% := RECORD
DATASET(datasetLayout) d {XPATH('d')};
END;
#UNIQUENAME(parsedRow);
LOCAL %parsedRow% := FROMJSON
(
%TempLayout%,
%dataAsJSON%,
ONFAIL(TRANSFORM(%TempLayout%, SELF := []))
);
#UNIQUENAME(parsedParams);
LOCAL %parsedParams% := NORMALIZE
(
DATASET(%parsedRow%),
LEFT.d,
TRANSFORM
(
datasetLayout,
SELF := RIGHT
)
);
RETURN %parsedParams%;
ENDMACRO;
/**
* Reads the data originally posted to an event by Encode.SetOfString()
* and creates a new SET OF STRING value from it.
*
* @return A new SET OF STRING value containing the passed data
*
* @see Encode.SetOfString()
*/
EXPORT AsSetOfString() := FUNCTION
dsValue := AsAnyDataset({STRING s});
RETURN SET(dsValue, s);
END;
/**
* Reads the data originally posted to an event by Encode.SimpleString()
* and creates a new STRING value from it.
*
* @return A new STRING value containing the passed data
*
* @see Encode.SimpleString()
*/
EXPORT AsSimpleString() := FUNCTION
RETURN GLOBAL(EVENTEXTRA('SIMPLE_STRING'));
END;
END;
/**
* The NotifyWith module contains convenience methods for both encoding
* data to be passed to a waiting ECL job and then sending the notification
* in one step. The module is intended to be used by running ECL jobs that
* need to send data to waiting jobs.
*/
EXPORT NotifyWith := MODULE
/**
* Encodes a dataset and sends it as an event argument in a NOTIFY()
* call.
*
* @param name The name of the event; this name should match
* the name the waiting ECL job is using to watch
* for events within its WHEN() or WAIT() calls
* @param inDS The dataset to send to the waiting ECL job
*
* @return A NOTIFY() action
*
* @see Encode.AnyDataset()
*/
EXPORT AnyDataset(name, inDS) := FUNCTIONMACRO
RETURN NOTIFY((STRING)name, Useful_ECL.NotifyHelper.Encode.AnyDataset(inDS));
ENDMACRO;
/**
* Encodes a set of strings and sends it as an event argument in a
* NOTIFY() call.
*
* @param name The name of the event; this name should match
* the name the waiting ECL job is using to watch
* for events within its WHEN() or WAIT() calls
* @param inSet The SET OF STRING value to send to the waiting
* ECL job
*
* @return A NOTIFY() action
*
* @see Encode.SetOfString()
*/
EXPORT SetOfString(STRING name, SET OF STRING inSet) := FUNCTION
RETURN NOTIFY((STRING)name, Encode.SetOfString(inSet));
END;
/**
* Encodes a string and sends it as an event argument in a
* NOTIFY() call.
*
* @param name The name of the event; this name should match
* the name the waiting ECL job is using to watch
* for events within its WHEN() or WAIT() calls
* @param inStr The string value to send to the waiting ECL job
*
* @return A NOTIFY() action
*
* @see Encode.SetOfString()
*/
EXPORT SimpleString(STRING name, STRING inStr) := FUNCTION
RETURN NOTIFY(name, Encode.SimpleString(inStr));
END;
END;
END;
//------------------------------------------------------------------------------
// Sample code (passing full datasets between ECL jobs)
//------------------------------------------------------------------------------
/*******************************************************************************
// Sample 'task runner' BWR that waits for a single notification, reads the
// passed parameters that are expected in specific format, then outputs those
// parameters. This code should be submitted first, so that it is waiting
// for events that the 'task starter' BWR will emit. This BWR will sit in
// wait mode until the event is received, then terminate.
IMPORT Useful_ECL;
#WORKUNIT('name', 'TaskRunner');
EVENT_NAME := 'RunTestProcess'; // Must match sender
RunProcess() := FUNCTION
DataRec := RECORD
STRING a {XPATH('a')};
STRING b {XPATH('b')};
END;
paramInfo := Useful_ECL.NotifyHelper.Decode.AsAnyDataset(DataRec);
RETURN OUTPUT(paramInfo, NAMED('RetrievedParameter'));
END;
RunProcess() : WHEN(EVENT_NAME, COUNT(1));
*******************************************************************************/
/*******************************************************************************
// Sample 'task starter' BWR that creates a sample inline dataset in the
// format that the task runner BWR expects, then emits a notification with
// that data as a parameter.
IMPORT Useful_ECL;
#WORKUNIT('name', 'TaskStarter');
EVENT_NAME := 'RunTestProcess'; // Must match receiver
DataRec := RECORD
STRING a;
STRING b;
END;
ds := DATASET
(
[
{'1', '2'},
{'3', '4'},
{'5', '6'}
],
DataRec
);
Useful_ECL.NotifyHelper.NotifyWith.AnyDataset(EVENT_NAME, ds);
*******************************************************************************/