-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathjob.go
127 lines (99 loc) · 3.15 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package mapper
import (
"bytes"
"encoding/gob"
"github.com/captaincodeman/datastore-locker"
"golang.org/x/net/context"
"google.golang.org/appengine/datastore"
"google.golang.org/appengine/log"
)
type (
// job is the datastore struct to control execution of a job instance
job struct {
locker.Lock
common `datastore:"-"`
// JobSpec is the job processor name
JobName string `datastore:"job_name"`
// JobSpec is the job processor instance
JobSpec JobSpec `datastore:"-"`
// Bucket defines the cloud storage bucket to write output to
// if empty, then no output will be written
Bucket string `datastore:"bucket,noindex"`
// Abort is used to cancel a job
Abort bool `datastore:"abort,noindex"`
// Shards is the target number of shards to use when splitting a namespace
Shards int `datastore:"shards,noindex"`
// Iterating indicates if the iterator is still active
Iterating bool `datastore:"iterating,noindex"`
// NamespacesTotal is the total number of namespaces generated by the iterator
NamespacesTotal int `datastore:"ns_total,noindex"`
// NamespacesSuccessful is the number of namespaces completed successfully
NamespacesSuccessful int `datastore:"ns_successful,noindex"`
// NamespacesFailed is the number of namespaces failed
NamespacesFailed int `datastore:"ns_failed,noindex"`
}
)
const (
jobKind = "job"
)
func (j *job) start(c context.Context, mapper *mapper) error {
if mapper.config.LogVerbose {
log.Debugf(c, "creating iterator for job %s", j.id)
}
iterator := new(iterator)
iterator.start(j.Query)
key := datastore.NewKey(c, mapper.config.DatastorePrefix+iteratorKind, j.id, 0, nil)
return mapper.locker.Schedule(c, key, iterator, mapper.config.Path+iteratorURL, nil)
}
func (j *job) completed(c context.Context, mapper *mapper, key *datastore.Key) error {
j.complete()
j.Lock.Complete()
// everything is complete when this runs, so no need for a transaction
if _, err := datastore.Put(c, key, j); err != nil {
return err
}
return nil
}
// Load implements the datastore PropertyLoadSaver imterface
func (j *job) Load(props []datastore.Property) error {
datastore.LoadStruct(j, props)
j.common.Load(props)
for _, prop := range props {
switch prop.Name {
case "job_spec":
payload := bytes.NewBuffer(prop.Value.([]byte))
enc := gob.NewDecoder(payload)
if err := enc.Decode(&j.JobSpec); err != nil {
return err
}
}
}
// jobSpec might be nil if it can't be gob encoded
// create instance from name if so
if j.JobSpec == nil {
jobSpec, err := CreateJobInstance(j.JobName)
if err != nil {
return err
}
j.JobSpec = jobSpec
}
return nil
}
// Save implements the datastore PropertyLoadSaver imterface
func (j *job) Save() ([]datastore.Property, error) {
props, err := datastore.SaveStruct(j)
if err != nil {
return nil, err
}
jprops, err := j.common.Save()
if err != nil {
return nil, err
}
props = append(props, jprops...)
payload := new(bytes.Buffer)
enc := gob.NewEncoder(payload)
if err := enc.Encode(&j.JobSpec); err == nil {
props = append(props, datastore.Property{Name: "job_spec", Value: payload.Bytes(), NoIndex: true, Multiple: false})
}
return props, nil
}