Skip to content
This repository was archived by the owner on Jan 28, 2020. It is now read-only.

Commit 74914b8

Browse files
author
George Schneeloch
committed
Merge pull request #802 from mitodl/refactor/gs/tasks_api
Added tasks API
2 parents 49fcbde + 39aa8ea commit 74914b8

File tree

12 files changed

+923
-258
lines changed

12 files changed

+923
-258
lines changed

apiary.apib

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,86 @@ Page numbering is 1-based. Omitting the `?page` parameter will return the first
8989
]
9090
}
9191

92+
## Group Tasks
93+
94+
## Tasks Collection [/tasks/]
95+
96+
### List All Tasks [GET]
97+
98+
List all recent tasks for a user.
99+
100+
+ Response 200 (application/json)
101+
102+
{
103+
"count": 1,
104+
"next": null,
105+
"previous": null,
106+
"results": [
107+
{
108+
"id": "45e3c830-0ff8-4d84-85b4-c0a6e3ce81b3",
109+
"status": "processing",
110+
"result": {
111+
"url": "/media/resource_exports/sarah_exports.tar.gz",
112+
"collision": false
113+
},
114+
"task_type": "resource_export",
115+
"task_info": {
116+
"repo_slug": "repo",
117+
"ids": [
118+
23517,
119+
23518
120+
]
121+
}
122+
}
123+
]
124+
}
125+
126+
### Create a New Task [POST]
127+
128+
Queue a new task.
129+
130+
+ Request (application/json)
131+
132+
{
133+
"task_info": {
134+
"repo_slug": "repo",
135+
"ids": [1]
136+
},
137+
"task_type": "resource_export"
138+
}
139+
140+
+ Response 200 (application/json)
141+
142+
{
143+
"id": "45e3c830-0ff8-4d84-85b4-c0a6e3ce81b3"
144+
}
145+
146+
## Task [/tasks/{task_id}/]
147+
148+
+ Parameters
149+
+ task_id: `45e3c830-0ff8-4d84-85b4-c0a6e3ce81b3` (string, required) - task identifier
150+
151+
### Retrieve a Task [GET]
152+
153+
+ Response 200 (application/json)
154+
155+
{
156+
"id": "45e3c830-0ff8-4d84-85b4-c0a6e3ce81b3",
157+
"status": "processing",
158+
"result": {
159+
"url": "/media/resource_exports/sarah_exports.tar.gz",
160+
"collision": false
161+
},
162+
"task_type": "resource_export",
163+
"task_info": {
164+
"repo_slug": "repo",
165+
"ids": [
166+
23517,
167+
23518
168+
]
169+
}
170+
}
171+
92172
## Group Repositories
93173

94174
## Repository Collection [/repositories/]
@@ -477,6 +557,8 @@ Clears the export list for this repository.
477557

478558
## LearningResourceExportTasks Collection [/repositories/{repo_slug}/learning_resource_export_tasks/]
479559

560+
Deprecated. Use /api/v1/tasks/ instead.
561+
480562
+ Parameters
481563
+ repo_slug: `physics-1` (string, required) - slug for the repository
482564

@@ -515,7 +597,9 @@ Queue a new LearningResourceExportTask task for the given LearningResource ids.
515597
"id": "45e3c830-0ff8-4d84-85b4-c0a6e3ce81b3"
516598
}
517599

518-
## LearningResourceExportTask [/repositories/{repo_slug}/learning_resource_export_tasks/{task_id}/
600+
## LearningResourceExportTask [/repositories/{repo_slug}/learning_resource_export_tasks/{task_id}/]
601+
602+
Deprecated. Use /api/v1/tasks/ instead.
519603

520604
+ Parameters
521605
+ repo_slug: `physics-1` (string, required) - slug for the repository

exporter/tasks.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
from __future__ import unicode_literals
66

7+
from django.core.files.storage import default_storage
8+
79
from lore.celery import async
810
from exporter.api import export_resources_to_tarball
911

@@ -18,8 +20,14 @@ def export_resources(learning_resources, username):
1820
LearningResources to export in tarball
1921
username (unicode): Name of user
2022
Returns:
21-
(unicode, bool):
22-
First item is newly created temp directory with files inside of it.
23-
Second item is True if a static asset collision was detected.
23+
dict:
24+
name is path of tarball.
25+
url is URL of tarball using django-storage.
26+
collision is True if a static asset collision was detected.
2427
"""
25-
return export_resources_to_tarball(learning_resources, username)
28+
name, collision = export_resources_to_tarball(learning_resources, username)
29+
return {
30+
"name": name,
31+
"url": default_storage.url(name),
32+
"collision": collision
33+
}

exporter/tests/test_export.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,10 @@ def test_export_task(self):
175175
"""Test exporting resources task."""
176176
resources = LearningResource.objects.all()
177177

178-
path, collision = export_resources.delay(
178+
result = export_resources.delay(
179179
resources, self.user.username).get()
180+
path = result['name']
181+
collision = result['collision']
180182
tempdir = mkdtemp()
181183

182184
self.assertTrue(collision)

rest/serializers.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
SerializerMethodField,
2222
IntegerField,
2323
FloatField,
24+
DictField,
25+
BooleanField,
2426
)
2527

2628
from rest.util import LambdaDefault, RequiredBooleanField
@@ -308,6 +310,16 @@ class LearningResourceExportTaskSerializer(Serializer):
308310
id = CharField()
309311
status = CharField()
310312
url = CharField()
313+
collision = BooleanField()
314+
315+
316+
class TaskSerializer(Serializer):
317+
"""Serializer for tasks."""
318+
id = CharField()
319+
status = CharField()
320+
result = DictField()
321+
task_type = CharField()
322+
task_info = DictField()
311323

312324

313325
class RepositorySearchSerializer(Serializer):

rest/tasks.py

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
"""
2+
Functions to manipulate tasks via REST API.
3+
"""
4+
5+
from __future__ import unicode_literals
6+
7+
from celery.result import AsyncResult
8+
from celery.states import FAILURE, SUCCESS, REVOKED
9+
from django.contrib.auth.models import User
10+
from django.http.response import Http404
11+
from rest_framework.exceptions import ValidationError
12+
13+
from exporter.tasks import export_resources
14+
from learningresources.api import get_repo
15+
from learningresources.models import LearningResource
16+
17+
TASK_KEY = 'tasks'
18+
EXPORT_TASK_TYPE = 'resource_export'
19+
EXPORTS_KEY = 'learning_resource_exports'
20+
21+
22+
def create_initial_task_dict(task, task_type, task_info):
23+
"""
24+
Create initial task data about a newly created Celery task.
25+
26+
Args:
27+
task (Task): A Celery task.
28+
task_type (unicode): Type of task.
29+
task_info (dict): Extra information about a task.
30+
Returns:
31+
dict: Initial data about task.
32+
"""
33+
34+
result = None
35+
if task.successful():
36+
result = task.get()
37+
elif task.failed():
38+
result = {'error': str(task.result)}
39+
40+
return {
41+
"id": task.id,
42+
"initial_state": task.state,
43+
"task_type": task_type,
44+
"task_info": task_info,
45+
"result": result
46+
}
47+
48+
49+
def create_task_result_dict(initial_data):
50+
"""
51+
Convert initial data we put in session to dict for REST API.
52+
This will use the id to look up current data about task to return
53+
to user.
54+
55+
Args:
56+
task (dict): Initial data about task stored in session.
57+
Returns:
58+
dict: Updated data about task.
59+
"""
60+
initial_state = initial_data['initial_state']
61+
task_id = initial_data['id']
62+
task_type = initial_data['task_type']
63+
task_info = initial_data['task_info']
64+
65+
state = "processing"
66+
result = None
67+
# initial_state is a workaround for EagerResult used in testing.
68+
# In production initial_state should usually be pending.
69+
async_result = AsyncResult(task_id)
70+
71+
if initial_state == SUCCESS:
72+
state = "success"
73+
result = initial_data['result']
74+
elif initial_state in (FAILURE, REVOKED):
75+
state = "failure"
76+
result = initial_data['result']
77+
elif async_result.successful():
78+
state = "success"
79+
result = async_result.get()
80+
elif async_result.failed():
81+
state = "failure"
82+
result = {'error': str(async_result.result)}
83+
84+
return {
85+
"id": task_id,
86+
"status": state,
87+
"result": result,
88+
"task_type": task_type,
89+
"task_info": task_info
90+
}
91+
92+
93+
def get_tasks(session):
94+
"""
95+
Get initial task data for session.
96+
97+
Args:
98+
session (SessionStore): The request session.
99+
Returns:
100+
dict:
101+
The initial task data stored in session for all user's tasks. The
102+
keys are task ids and the values are initial task data.
103+
"""
104+
try:
105+
return session[TASK_KEY]
106+
except KeyError:
107+
return {}
108+
109+
110+
def get_task(session, task_id):
111+
"""
112+
Get initial task data for a single task.
113+
114+
Args:
115+
session (SessionStore): The request session.
116+
task_id (unicode): The task id.
117+
Returns:
118+
dict: The initial task data stored in session.
119+
"""
120+
try:
121+
return session[TASK_KEY][task_id]
122+
except KeyError:
123+
return None
124+
125+
126+
def track_task(session, task, task_type, task_info):
127+
"""
128+
Add a Celery task to the session.
129+
130+
Args:
131+
session (SessionStore): The request session.
132+
task_type (unicode): The type of task being started.
133+
task_info (dict): Extra information about the task.
134+
Returns:
135+
dict: The initial task data (will also be stored in session).
136+
"""
137+
initial_data = create_initial_task_dict(task, task_type, task_info)
138+
if TASK_KEY not in session:
139+
session[TASK_KEY] = {}
140+
session[TASK_KEY][task.id] = initial_data
141+
session.modified = True
142+
return initial_data
143+
144+
145+
def create_task(session, user_id, task_type, task_info):
146+
"""
147+
Start a new Celery task from REST API.
148+
149+
Args:
150+
session (SessionStore): The request session.
151+
user_id (int): The id for user creating task.
152+
task_type (unicode): The type of task being started.
153+
task_info (dict): Extra information about the task.
154+
Returns:
155+
dict: The initial task data (will also be stored in session).
156+
"""
157+
158+
if task_type == EXPORT_TASK_TYPE:
159+
try:
160+
repo_slug = task_info['repo_slug']
161+
except KeyError:
162+
raise ValidationError("Missing repo_slug")
163+
164+
# Verify repository ownership.
165+
get_repo(repo_slug, user_id)
166+
167+
try:
168+
exports = set(session[EXPORTS_KEY][repo_slug])
169+
except KeyError:
170+
exports = set()
171+
172+
try:
173+
ids = task_info['ids']
174+
except KeyError:
175+
raise ValidationError("Missing ids")
176+
177+
for resource_id in ids:
178+
if resource_id not in exports:
179+
raise ValidationError("id {id} is not in export list".format(
180+
id=resource_id
181+
))
182+
183+
learning_resources = LearningResource.objects.filter(id__in=ids).all()
184+
user = User.objects.get(id=user_id)
185+
result = export_resources.delay(learning_resources, user.username)
186+
187+
# Put new task in session.
188+
initial_data = track_task(session, result, task_type, task_info)
189+
190+
return initial_data
191+
else:
192+
raise ValidationError("Unknown task_type {task_type}".format(
193+
task_type=task_type
194+
))
195+
196+
197+
def remove_task(session, task_id):
198+
"""
199+
Cancel task and remove task from task list.
200+
201+
Args:
202+
session (SessionStore): The request session.
203+
task_id (int): The task id.
204+
"""
205+
tasks = session.get(TASK_KEY, {})
206+
if task_id not in tasks:
207+
raise Http404
208+
209+
AsyncResult(task_id).revoke()
210+
del tasks[task_id]
211+
session[TASK_KEY] = tasks

0 commit comments

Comments
 (0)