Skip to content

Commit a0b4eb0

Browse files
Merge pull request #1 from globus/compute-transfer-examples
Compute and Transfer Example Flows
2 parents ed99534 + c376b4b commit a0b4eb0

6 files changed

+592
-0
lines changed

compute_transfer_examples/README.md

+171
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
# Globus Compute and Transfer Flow Examples
2+
3+
This guide demonstrates how to build flows that combine Globus Compute and Globus Transfer to process and move data. You'll learn how to create two different flows that archive (tar) files and transfer them from a source collection to a destination collection.
4+
5+
## Prerequisites
6+
7+
Before starting, ensure you have a co-located GCS collection and Globus Compute endpoint.
8+
If you haven't set these up, follow [this guide](https://docs.globus.org/globus-connect-server/v5.4/) for setting up the GCS collection, and [this guide](https://globus-compute.readthedocs.io/en/latest/endpoints/installation.html) for setting up the Globus Compute endpoint. **Note**: The GCS collection and Globus Compute endpoint must both have read/write permissions to the same filesystem location where operations will be performed.
9+
10+
You will also need to have installed the `globus-cli` and the `globus-compute-sdk` python package. You can install these with:
11+
```bash
12+
pip install globus-cli globus-compute-sdk
13+
```
14+
**Note**: It is recommended that you work inside a virtual environment for this tutorial.
15+
16+
## Register the Globus Compute Function
17+
18+
First, register the `do_tar` Compute function that your flows will invoke to create the output tarfiles. Run the provided python script:
19+
20+
```bash
21+
./register_compute_func.py
22+
```
23+
24+
and save the Compute function's UUID.
25+
26+
**Important**: Use the same Python version for registration as the one running on your Globus Compute Endpoint.
27+
28+
### The `do_tar` Compute function
29+
30+
`do_tar` takes three parameters that the flow will need to provide:
31+
32+
| Parameter | Description |
33+
|-----------|-------------|
34+
| `src_paths` | List of paths to the files/directories to be archived |
35+
| `dest_path` | Where to write the tar archive (directory or file path) |
36+
| `gcs_base_path` | The co-located GCS collection's configured base path. (default: "/") |
37+
38+
### GCS Collection Base Paths
39+
40+
The `gcs_base_path` parameter is provided to the Compute function to allow it to transform the user input paths to absolute paths. This is needed when the co-located GCS instance has [configured the collection's base path](https://docs.globus.org/globus-connect-server/v5/data-access-guide/#configure_collection_base_path).
41+
42+
**Example scenario:**
43+
- Your GCS collection has configured its base path to `/path/to/root/`.
44+
- A user wants to tar the files at the absolute path `/path/to/root/input_files/`.
45+
- To both the user and Flows service, this path appears as `/input_files/` on the GCS collection.
46+
- However, the Compute function running on the co-located GCS instance **does not know** about the collection's configured base path and can only find the files using absolute paths.
47+
48+
Thus, the Compute function must be provided with the GCS collection's configured base path to do the necessary transformations. In this example, `gcs_base_path` would need to be set to `/path/to/root/`.
49+
50+
## Compute and Transfer Flow: Example 1
51+
In the first example, the Compute and Transfer flow takes a user-provided list of source files that **already** exist in the co-located GCS collection, creates a tarfile from them, and transfers the tarfile to a user-provided destination collection. Specifically, the flow will:
52+
1. Set constants for the run
53+
2. Create an output directory named after the flow's run ID on your GCS collection
54+
3. Invoke the Compute function `do_tar` on the source endpoint to create a tar archive from the input source files and save it in the output directory
55+
4. Transfer the resulting tarfile to the destination collection provided in the flow input
56+
5. Delete the output directory
57+
58+
### Registering the Flow
59+
60+
1. Edit `compute_transfer_example_1_definition.json` and replace the placeholder values:
61+
- `gcs_endpoint_id`: Your GCS Collection ID
62+
- `compute_endpoint_id`: Your Compute Endpoint ID
63+
- `compute_function_id`: The UUID of the registered `do_tar` function
64+
65+
If your GCS collection has a configured base path, also edit `gcs_base_path`.
66+
67+
68+
2. Register the flow:
69+
```bash
70+
globus flows create "Compute and Transfer Flow Example 1" \
71+
./compute_transfer_example_1_definition.json \
72+
--input-schema ./compute_transfer_example_1_schema.json
73+
```
74+
75+
3. Save the flow ID returned by this command
76+
77+
### Running the Flow
78+
79+
1. Create the flow input json file like so:
80+
```json
81+
{
82+
"source_paths": ["/path/to/file1", "/path/to/file2"],
83+
"destination_path": "/path/to/your/destination/file.tar.gz",
84+
"destination_endpoint_id": "your-destination-endpoint-uuid"
85+
}
86+
```
87+
88+
2. Start the flow:
89+
```bash
90+
globus flows start <YOUR FLOW ID> \
91+
--input <YOUR FLOW INPUT FILE> \
92+
--label "Compute and Transfer Flow Example 1 Run"
93+
```
94+
And save the run ID for use in the next command.
95+
96+
3. Monitor the run progress:
97+
```bash
98+
globus flows run show <RUN_ID>
99+
```
100+
At this point, you might see that your flow has become INACTIVE. This is because you need to give data access consents for any GCS collection that your flow is interacting with. Run the command:
101+
102+
```bash
103+
globus flows run resume <RUN_ID>
104+
```
105+
And you will be prompted to run a `globus session consent`. After granting the requested consent, try resuming the run once again and your flow should be able to proceed. As your flow interacts with other collections, it may encounter additional `data_access` consents. If so, you might need to repeat this step. Once you have granted consents to a flow, it will remain (until revoked) for future runs of that flow with the same client that was used to grant the consent.
106+
107+
## Compute and Transfer Flow: Example 2
108+
In the second example, the Compute and Transfer flow takes a user-provided list of source files that exist on a user-provided source collection, transfers the source files to your GCS collection, creates a tarfile from them, and transfers the tarfile to a user-provided destination collection. Specifically, the flow will:
109+
1. Set constants for the run
110+
2. Create an output directory named after the flow's run ID on your intermediate GCS collection
111+
3. Iterate through the list of input source files and create the destination paths for files on your intermediate GCS collection
112+
4. Transfer the source paths from the user-provided source collection to the newly created output directory folder on your intermediate GCS collection
113+
5. Invoke the Compute function `do_tar` on the source endpoint to create a tar archive from the input source files and save it in the output directory
114+
6. Transfer the resulting tarfile to the destination collection provided in the flow input
115+
7. Delete the output directory on your intermediate GCS collection
116+
117+
**Implementation Note**: Step 3 is implemented using six different states in the flow definition (`SetSourcePathsIteratorVariables`, `EvalShouldIterateSourcePaths`, `IterateSourcePaths`, `EvalGetSourcePath`, `GetSourcePathInfo`, and `EvalSourcePathInfo`). These states work together to create a loop that processes each source path. While this demonstrates how to implement a loop in Flows, a simpler approach could be to create a separate Compute function to handle this work, which would significantly reduce the complexity of this flow.
118+
119+
### Registering the Flow
120+
121+
1. Edit `compute_transfer_example_2_definition.json` and replace the placeholder values (same as in the first example).
122+
123+
2. Register as a new flow:
124+
```bash
125+
globus flows create "Compute and Transfer Flow Example 2" \
126+
./compute_transfer_example_2_definition.json \
127+
--input-schema ./compute_transfer_example_2_schema.json
128+
```
129+
130+
Or update the existing flow from example 1:
131+
```bash
132+
globus flows update <FLOW_ID> \
133+
--title "Compute and Transfer Flow Example 2" \
134+
--definition ./compute_transfer_example_2_definition.json \
135+
--input-schema ./compute_transfer_example_2_schema.json
136+
```
137+
138+
3. Save the flow ID returned by this command
139+
140+
### Running the Flow
141+
142+
1. Create the input json file for your flow:
143+
```json
144+
{
145+
"source_endpoint": "your-source-endpoint-uuid",
146+
"source_paths": ["/path/to/file1", "/path/to/file2"],
147+
"destination": {
148+
"id": "your-destination-endpoint-uuid",
149+
"path": "/path/to/your/destination/archive.tar.gz"
150+
}
151+
}
152+
```
153+
154+
2. Start the flow:
155+
```bash
156+
globus flows start <FLOW_ID> \
157+
--input <YOUR FLOW INPUT FILE> \
158+
--label "Compute and Transfer Flow Example 2 Run"
159+
```
160+
And save the run ID for use in the next command.
161+
162+
3. Monitor the run progress:
163+
```bash
164+
globus flows run show <RUN_ID>
165+
```
166+
167+
Remember, if your flow has gone inactive, run:
168+
```bash
169+
globus flows run resume <RUN_ID>
170+
```
171+
and then run the prompted `globus session consent` command and try resuming the run again.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
{
2+
"StartAt": "SetConstants",
3+
"States" : {
4+
"SetConstants": {
5+
"Type": "ExpressionEval",
6+
"Parameters": {
7+
"gcs_endpoint_id": "<INSERT YOUR GCS ENDPOINT ID HERE>",
8+
"gcs_base_path": "/",
9+
"compute_endpoint_id": "<INSERT YOUR COMPUTE ENDPOINT ID HERE>",
10+
"compute_function_id": "<INSERT YOUR COMPUTE FUNCTION ID HERE>",
11+
"compute_output_directory.=": "'/' + _context.run_id + '/'"
12+
},
13+
"ResultPath": "$.constants",
14+
"Next" : "MakeComputeWorkingDir"
15+
},
16+
"MakeComputeWorkingDir": {
17+
"Type": "Action",
18+
"ActionUrl": "https://transfer.actions.globus.org/mkdir",
19+
"Parameters": {
20+
"endpoint_id.$": "$.constants.gcs_endpoint_id",
21+
"path.$": "$.constants.compute_output_directory"
22+
},
23+
"ResultPath": "$.mkdir_result",
24+
"Next": "RunComputeFunction"
25+
},
26+
"RunComputeFunction": {
27+
"Type": "Action",
28+
"ActionUrl": "https://compute.actions.globus.org/v3",
29+
"Parameters": {
30+
"endpoint_id.$": "$.constants.compute_endpoint_id",
31+
"tasks": [
32+
{
33+
"function_id.$": "$.constants.compute_function_id",
34+
"args": [],
35+
"kwargs": {
36+
"src_paths.$" : "$.source_paths",
37+
"dest_path.$" : "$.constants.compute_output_directory",
38+
"gcs_base_path.$": "$.constants.gcs_base_path"
39+
}
40+
}
41+
]
42+
},
43+
"ResultPath": "$.compute_result",
44+
"Next" : "TransferFromComputeEndpoint"
45+
},
46+
"TransferFromComputeEndpoint": {
47+
"Type": "Action",
48+
"ActionUrl": "https://transfer.actions.globus.org/transfer",
49+
"Parameters": {
50+
"source_endpoint.$": "$.constants.gcs_endpoint_id",
51+
"destination_endpoint.$": "$.destination_endpoint_id",
52+
"DATA": [
53+
{
54+
"source_path.=": "compute_result.details.result[0]",
55+
"destination_path.$": "$.destination_path"
56+
}
57+
]
58+
},
59+
"ResultPath": "$.transfer_to_dest_result",
60+
"Next" : "CleanupComputeEndpoint"
61+
},
62+
"CleanupComputeEndpoint": {
63+
"Type": "Action",
64+
"ActionUrl": "https://transfer.actions.globus.org/delete",
65+
"Parameters": {
66+
"endpoint.$": "$.constants.gcs_endpoint_id",
67+
"recursive": true,
68+
"DATA": [
69+
{
70+
"path.$": "$.constants.compute_output_directory"
71+
}
72+
]
73+
},
74+
"ResultPath": "$.delete_compute_output_result",
75+
"End" : true
76+
}
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"type": "object",
3+
"required": [
4+
"source_paths",
5+
"destination_path",
6+
"destination_endpoint_id"
7+
],
8+
"properties": {
9+
"source_paths": {
10+
"type": "array",
11+
"title": "Source Collection Paths",
12+
"description": "A list of paths on the source collection for the data"
13+
},
14+
"destination_path": {
15+
"type": "string",
16+
"title": "Destination Collection Path",
17+
"description": "The path on the destination collection for the data"
18+
},
19+
"destination_endpoint_id": {
20+
"type": "string",
21+
"title": "Destination Collection Endpoint ID",
22+
"description": "The endpoint id of destination collection"
23+
}
24+
},
25+
"additionalProperties": false
26+
}

0 commit comments

Comments
 (0)