Skip to content

Commit 8ece7a7

Browse files
kpshervazzacharo
authored andcommitted
flows: remove invenio webhooks
1 parent 90e4caa commit 8ece7a7

File tree

22 files changed

+650
-333
lines changed

22 files changed

+650
-333
lines changed

cds/modules/deposit/api.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363
from ..records.tasks import create_symlinks
6464
from ..records.utils import lowercase_value
6565
from ..records.validators import PartialDraft4Validator
66-
from ..webhooks.status import (get_deposit_events, get_tasks_status_by_task,
66+
from ..webhooks.status import (get_deposit_flows, get_tasks_status_by_task,
6767
merge_tasks_status)
6868
from .errors import DiscardConflict
6969
from .resolver import get_video_pid
@@ -931,7 +931,7 @@ def edit(self, pid=None):
931931

932932
def _clean_tasks(self):
933933
"""Clean all tasks."""
934-
events = get_deposit_events(deposit_id=self['_deposit']['id'])
934+
events = get_deposit_flows(deposit_id=self['_deposit']['id'])
935935
for event in events:
936936
event.receiver.delete(event=event)
937937

@@ -1004,7 +1004,7 @@ def get_report_number_sequence(self, **kwargs):
10041004
def _current_tasks_status(self):
10051005
"""Return up-to-date tasks status."""
10061006
return get_tasks_status_by_task(
1007-
get_deposit_events(self['_deposit']['id']),
1007+
get_deposit_flows(self['_deposit']['id']),
10081008
statuses=deepcopy(self['_cds'].get('state', {})))
10091009

10101010
def generate_duration(self):

cds/modules/deposit/static/js/cds_deposit/avc/avc.module.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,9 @@ function cdsDepositsConfig(
102102
iiif: '/api/iiif/v2/<%=deposit%>:<%=version_id%>:<%=key%>/full/!<%=res%>/0/default.png',
103103
categories: '/api/categories',
104104
video: '/deposit/<%=deposit%>/preview/video/<%=key%>',
105-
eventInfo: '/hooks/receivers/avc/events/<%=eventId%>',
106-
restartEvent: '/hooks/receivers/avc/events/<%=eventId%>/tasks/<%=taskId%>',
107-
taskFeedback: '/hooks/receivers/avc/events/<%=eventId%>/feedback',
105+
eventInfo: '/hooks/receivers/avc/flows/<%=eventId%>',
106+
restartEvent: '/hooks/receivers/avc/flows/<%=eventId%>/tasks/<%=taskId%>',
107+
taskFeedback: '/hooks/receivers/avc/flows/<%=eventId%>/feedback',
108108
selfVideo: '/api/deposits/video/<%=deposit%>',
109109
bucketVideo: '/api/files/<%=bucket%>',
110110
actionVideo: '/api/deposits/video/<%=deposit%>/actions/<%=action%>',

cds/modules/deposit/static/js/cds_deposit/avc/components/cdsDeposit.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ function cdsDepositCtrl(
209209

210210
this.restartFailedSubformats = function(subformatKeys) {
211211
var master = that.findMasterFile();
212-
var eventId = master.tags._event_id;
212+
var eventId = master.tags._flow_id;
213213
master.subformat.forEach(
214214
function(subformat) {
215215
if (subformatKeys.includes(subformat.key)) {
@@ -223,7 +223,7 @@ function cdsDepositCtrl(
223223
var restartEvents = data.filter(function(taskInfo) {
224224
return subformatKeys.includes(taskInfo.info.payload.key);
225225
}).map(function(taskInfo) {
226-
var eventId = taskInfo.info.payload.event_id;
226+
var eventId = taskInfo.info.payload.flow_id;
227227
var taskId = taskInfo.id;
228228
return that.restartEvent(eventId, taskId);
229229
});
@@ -275,7 +275,7 @@ function cdsDepositCtrl(
275275

276276
var fetchPresetsPromise = $q.resolve();
277277
if (that.presets && that.presets.length == 0) {
278-
var eventId = masterFile.tags._event_id;
278+
var eventId = masterFile.tags._flow_id;
279279
if (eventId) {
280280
var eventUrl = urlBuilder.eventInfo({eventId: eventId});
281281
var updatePresets = function (resp) {
@@ -344,7 +344,7 @@ function cdsDepositCtrl(
344344
// Update only if it is ``draft``
345345
if (that.isDraft()){
346346
var masterFile = that.findMasterFile();
347-
var eventId = _.get(masterFile, 'tags._event_id', undefined);
347+
var eventId = _.get(masterFile, 'tags._flow_id', undefined);
348348
that.webhookEventId = eventId;
349349
if (eventId) {
350350
that.getTaskFeedback(eventId)

cds/modules/deposit/static/js/cds_deposit/avc/components/cdsUploader.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ function cdsUploaderCtrl(
347347
that.files.push(newMasterFile);
348348
that.queue.push(newMasterFile);
349349
// Upload the video file
350-
var old_event_id = old_master[0]['tags']['_event_id']
350+
var old_event_id = old_master[0]['tags']['_flow_id']
351351
that.deleteEvent(old_event_id).then(
352352
function success(response) {
353353
that.cdsDepositCtrl.previewer = null;

cds/modules/deposit/static/templates/cds_deposit/deposits.html

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ <h3>Click here to select videos to upload</h3>
100100
dropbox-enabled="true"
101101
dropbox-selector=".dropbox-upload"
102102
dropbox-app-key="{{ $ctrl.dropboxAppKey }}"
103-
remote-master-receiver="/api/hooks/receivers/avc/events/"
104-
remote-children-receiver="/api/hooks/receivers/downloader/events/"
103+
remote-master-receiver="/api/hooks/receivers/avc/flows/"
104+
remote-children-receiver="/api/hooks/receivers/downloader/flows/"
105105
>
106106
<div class="dropbox-upload"></div>
107107
</cds-remote-uploader>
@@ -138,15 +138,15 @@ <h3>Click here to select videos to upload</h3>
138138
auto-start-upload="true"
139139
files="child._files"
140140
template="/static/templates/cds_deposit/types/video/uploader.html"
141-
remote-master-receiver="/api/hooks/receivers/avc/events/"
141+
remote-master-receiver="/api/hooks/receivers/avc/flows/"
142142
>
143143
<cds-remote-uploader
144144
template="/static/templates/cds_deposit/remote_upload.html"
145145
dropbox-enabled="true"
146146
dropbox-selector=".dropbox-upload"
147147
dropbox-app-key="{{ $ctrl.dropboxAppKey }}"
148-
remote-master-receiver="/api/hooks/receivers/avc/events/"
149-
remote-children-receiver="/api/hooks/receivers/downloader/events/"
148+
remote-master-receiver="/api/hooks/receivers/avc/flows/"
149+
remote-children-receiver="/api/hooks/receivers/downloader/flows/"
150150
>
151151
<div class="dropbox-upload"></div>
152152
</cds-remote-uploader>

cds/modules/flows/api.py

Lines changed: 87 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
# as an Intergovernmental Organization or submit itself to any jurisdiction.
2525

2626
"""Invenio-Flow python API."""
27-
27+
import json
2828
import logging
2929
from functools import wraps
3030
from itertools import repeat
@@ -94,18 +94,37 @@ def payload(self):
9494
"""Get flow payload."""
9595
return self.model.payload if self.model else None
9696

97-
@property
98-
def previous_id(self):
99-
"""Get the previous run of the flow."""
100-
return self.model.previous_id if self.model else None
101-
10297
@payload.setter
10398
def payload(self, value):
10499
"""Update payload."""
105100
if self.model:
106101
self.model.payload = value
107102
db.session.merge(self.model)
108103

104+
@property
105+
def response(self):
106+
"""Get flow payload."""
107+
return self.model.response if self.model else None
108+
109+
@response.setter
110+
def response(self, value):
111+
"""Update payload."""
112+
if self.model:
113+
self.model.response = value
114+
db.session.merge(self.model)
115+
116+
@property
117+
def response_code(self):
118+
"""Get flow payload."""
119+
return self.model.response_code if self.model else None
120+
121+
@response_code.setter
122+
def response_code(self, value):
123+
"""Update payload."""
124+
if self.model:
125+
self.model.response_code = value
126+
db.session.merge(self.model)
127+
109128
@property
110129
def created(self):
111130
"""Get creation timestamp."""
@@ -117,30 +136,40 @@ def updated(self):
117136
return self.model.updated if self.model else None
118137

119138
@property
120-
def status(self):
139+
def json(self):
121140
"""Get flow status."""
122141
if self.model is None:
123142
return None
124143

125144
res = self.model.to_dict()
126-
res.update({'tasks': [t.to_dict() for t in self.model.tasks]})
145+
res.update(
146+
{'tasks': [t.to_dict() for t in self.model.tasks]}
147+
)
127148
return res
128149

150+
@property
151+
def status(self):
152+
return self.model.status if self.model else None
153+
129154
@classmethod
130155
def get_flow(cls, id_):
131156
"""Retrieve a Flow from the database by Id."""
132157
obj = FlowModel.get(id_)
133158
return cls(obj)
134159

135160
@classmethod
136-
def create(cls, name, payload=None, id_=None, previous_id=None):
161+
def create(cls, name, payload=None,
162+
id_=None, user_id=None,
163+
deposit_id=None, receiver_id=None):
137164
"""Create a new flow instance and store it in the database.."""
138165
with db.session.begin_nested():
139166
obj = FlowModel(
140167
name=name,
141168
id=id_ or uuid(),
142169
payload=payload or dict(),
143-
previous_id=previous_id,
170+
user_id=user_id,
171+
deposit_id=deposit_id,
172+
receiver_id=receiver_id
144173
)
145174
db.session.add(obj)
146175
logger.info('Created new Flow %s', obj)
@@ -163,11 +192,11 @@ def _new_task(self, task, kwargs, previous):
163192
task_id = uuid()
164193
kwargs = kwargs if kwargs else {}
165194
kwargs.update({'flow_id': str(self.id), 'task_id': task_id})
166-
kwargs.update(self.payload,) # TODO: Do we need to move this to a key?
195+
kwargs.update(self.payload, )
167196
signature = task.subtask(
168197
task_id=task_id,
169198
kwargs=kwargs,
170-
immutable=True, # TODO, ad this as an option/parameter?
199+
immutable=True,
171200
)
172201

173202
_ = TaskModel.create(
@@ -181,7 +210,7 @@ def _new_task(self, task, kwargs, previous):
181210
return signature
182211

183212
def build(self):
184-
"""."""
213+
"""Build flow."""
185214
raise NotImplementedError()
186215

187216
def assemble(self, build_func):
@@ -221,7 +250,7 @@ def assemble(self, build_func):
221250
def start(self):
222251
"""Start the flow asynchronously."""
223252
if not self._canvas:
224-
self.assemble()
253+
self.assemble(self.build)
225254
return self._canvas.apply_async()
226255

227256
def stop(self):
@@ -266,15 +295,13 @@ def restart_task(self, task_id):
266295

267296
kwargs = {'flow_id': str(self.id), 'task_id': str(task.id)}
268297
kwargs.update(task.payload)
269-
kwargs.update(self.payload) # TODO: Do we need to move this to a key?
298+
kwargs.update(self.payload)
270299
return (
271-
celery_app.tasks.get(task.name)
272-
.subtask(
273-
task_id=str(task.id),
300+
celery_app.tasks.get(task.name).subtask(
301+
task_id=str(task.id),
274302
kwargs=kwargs,
275-
immutable=True, # TODO, ad this as an option/parameter?
276-
)
277-
.apply_async()
303+
immutable=True,
304+
).apply_async()
278305
)
279306

280307

@@ -308,3 +335,42 @@ def on_success(self, retval, task_id, args, kwargs):
308335
'{}'.format(retval),
309336
)
310337
super(Task, self).on_success(retval, task_id, args, kwargs)
338+
339+
@staticmethod
340+
def build_task_json_status(task_json):
341+
"""."""
342+
from ..webhooks.status import TASK_NAMES
343+
# Get the UI name of the task
344+
task_name = TASK_NAMES.get(task_json['name'])
345+
346+
# Add the information the UI needs on the right position
347+
payload = task_json['payload']
348+
payload['type'] = task_name
349+
350+
payload['key'] = payload.get('preset_quality', payload['key'])
351+
352+
if task_name == 'file_video_metadata_extraction':
353+
# try to load message as JSON,
354+
# we only need this for this particular task
355+
try:
356+
payload['extracted_metadata'] = \
357+
json.loads(task_json['message'])
358+
except ValueError:
359+
payload['extracted_metadata'] = task_json['message']
360+
361+
task_json['message'] = 'Attached video metadata'
362+
363+
celery_task_status = 'REVOKED' if 'Not transcoding' in task_json[
364+
'message'] else task_json['status']
365+
366+
task_status = {
367+
'name': task_name,
368+
'id': task_json['id'],
369+
'status': celery_task_status,
370+
'info': {
371+
'payload': payload,
372+
'message': task_json['message'],
373+
},
374+
}
375+
376+
return task_status

cds/modules/flows/migration.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,18 @@
3131
from ..deposit.api import deposit_video_resolver
3232
from ..records.api import CDSVideosFilesIterator
3333
from .models import Status
34+
from ..webhooks.receivers import AVCWorkflow
3435

3536

36-
def migrate_event(event):
37+
def migrate_event(deposit_id):
3738
"""Migrate an old event into Flows."""
38-
receiver = event.receiver
39-
flow = receiver._workflow(event=event)
39+
receiver = AVCWorkflow()
40+
deposit = deposit_video_resolver(deposit_id)
41+
import ipdb;ipdb.set_trace()
42+
43+
flow = receiver._workflow(deposit_id)
4044

4145
# Update flow task status depending on the content of th record
42-
deposit_id = event.payload['deposit_id']
4346
deposit = deposit_video_resolver(deposit_id)
4447

4548
original_file = CDSVideosFilesIterator.get_master_video_file(deposit)

0 commit comments

Comments
 (0)