1
+ import json
1
2
import re
2
3
from typing import Optional , Union
3
4
@@ -11,16 +12,33 @@ class DummyBackend:
11
12
and allows inspection of posted process graphs
12
13
"""
13
14
15
+ __slots__ = (
16
+ "connection" ,
17
+ "sync_requests" ,
18
+ "batch_jobs" ,
19
+ "validation_requests" ,
20
+ "next_result" ,
21
+ "next_validation_errors" ,
22
+ )
23
+
14
24
# Default result (can serve both as JSON or binary data)
15
25
DEFAULT_RESULT = b'{"what?": "Result data"}'
16
26
17
27
def __init__ (self , requests_mock , connection : Connection ):
18
28
self .connection = connection
19
29
self .sync_requests = []
20
30
self .batch_jobs = {}
31
+ self .validation_requests = []
21
32
self .next_result = self .DEFAULT_RESULT
22
- requests_mock .post (connection .build_url ("/result" ), content = self ._handle_post_result )
23
- requests_mock .post (connection .build_url ("/jobs" ), content = self ._handle_post_jobs )
33
+ self .next_validation_errors = []
34
+ requests_mock .post (
35
+ connection .build_url ("/result" ),
36
+ content = self ._handle_post_result ,
37
+ )
38
+ requests_mock .post (
39
+ connection .build_url ("/jobs" ),
40
+ content = self ._handle_post_jobs ,
41
+ )
24
42
requests_mock .post (
25
43
re .compile (connection .build_url (r"/jobs/(job-\d+)/results$" )), content = self ._handle_post_job_results
26
44
)
@@ -32,12 +50,19 @@ def __init__(self, requests_mock, connection: Connection):
32
50
re .compile (connection .build_url ("/jobs/(.*?)/results/result.data$" )),
33
51
content = self ._handle_get_job_result_asset ,
34
52
)
53
+ requests_mock .post (connection .build_url ("/validation" ), json = self ._handle_post_validation )
35
54
36
55
def _handle_post_result (self , request , context ):
37
56
"""handler of `POST /result` (synchronous execute)"""
38
57
pg = request .json ()["process" ]["process_graph" ]
39
58
self .sync_requests .append (pg )
40
- return self .next_result
59
+ result = self .next_result
60
+ if isinstance (result , (dict , list )):
61
+ result = json .dumps (result ).encode ("utf-8" )
62
+ elif isinstance (result , str ):
63
+ result = result .encode ("utf-8" )
64
+ assert isinstance (result , bytes )
65
+ return result
41
66
42
67
def _handle_post_jobs (self , request , context ):
43
68
"""handler of `POST /jobs` (create batch job)"""
@@ -83,6 +108,12 @@ def _handle_get_job_result_asset(self, request, context):
83
108
assert self .batch_jobs [job_id ]["status" ] == "finished"
84
109
return self .next_result
85
110
111
+ def _handle_post_validation (self , request , context ):
112
+ """Handler of `POST /validation` (validate process graph)."""
113
+ pg = request .json ()["process_graph" ]
114
+ self .validation_requests .append (pg )
115
+ return {"errors" : self .next_validation_errors }
116
+
86
117
def get_sync_pg (self ) -> dict :
87
118
"""Get one and only synchronous process graph"""
88
119
assert len (self .sync_requests ) == 1
0 commit comments