Skip to content

Commit 1323505

Browse files
committed
chore: Remove platform dependency
1 parent 55f1366 commit 1323505

29 files changed

+562
-865
lines changed

compiler.go

Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,25 @@ package flux
22

33
import (
44
"context"
5-
"time"
5+
"fmt"
66
)
77

8-
const (
9-
FluxCompilerType = "flux"
10-
SpecCompilerType = "spec"
11-
)
12-
13-
// AddCompilerMappings adds the Flux specific compiler mappings.
14-
func AddCompilerMappings(mappings CompilerMappings) error {
15-
if err := mappings.Add(FluxCompilerType, func() Compiler {
16-
return new(FluxCompiler)
17-
18-
}); err != nil {
19-
return err
20-
}
21-
return mappings.Add(SpecCompilerType, func() Compiler {
22-
return new(SpecCompiler)
23-
})
8+
// Compiler produces a specification for the query.
9+
type Compiler interface {
10+
// Compile produces a specification for the query.
11+
Compile(ctx context.Context) (*Spec, error)
12+
CompilerType() CompilerType
2413
}
2514

26-
// FluxCompiler compiles a Flux script into a spec.
27-
type FluxCompiler struct {
28-
Query string `json:"query"`
29-
}
30-
31-
func (c FluxCompiler) Compile(ctx context.Context) (*Spec, error) {
32-
return Compile(ctx, c.Query, time.Now())
33-
}
15+
// CompilerType is the name of a query compiler.
16+
type CompilerType string
17+
type CreateCompiler func() Compiler
18+
type CompilerMappings map[CompilerType]CreateCompiler
3419

35-
func (c FluxCompiler) CompilerType() CompilerType {
36-
return FluxCompilerType
37-
}
38-
39-
// SpecCompiler implements Compiler by returning a known spec.
40-
type SpecCompiler struct {
41-
Spec *Spec `json:"spec"`
42-
}
43-
44-
func (c SpecCompiler) Compile(ctx context.Context) (*Spec, error) {
45-
return c.Spec, nil
46-
}
47-
func (c SpecCompiler) CompilerType() CompilerType {
48-
return SpecCompilerType
20+
func (m CompilerMappings) Add(t CompilerType, c CreateCompiler) error {
21+
if _, ok := m[t]; ok {
22+
return fmt.Errorf("duplicate compiler mapping for %q", t)
23+
}
24+
m[t] = c
25+
return nil
4926
}

control/controller.go

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/influxdata/flux"
3232
"github.com/influxdata/flux/execute"
3333
"github.com/influxdata/flux/plan"
34-
"github.com/influxdata/platform"
3534
opentracing "github.com/opentracing/opentracing-go"
3635
"github.com/pkg/errors"
3736
"github.com/prometheus/client_golang/prometheus"
@@ -48,7 +47,8 @@ type Controller struct {
4847
queryDone chan *Query
4948
cancelRequest chan QueryID
5049

51-
metrics *controllerMetrics
50+
metrics *controllerMetrics
51+
labelKeys []string
5252

5353
verbose bool
5454

@@ -69,6 +69,10 @@ type Config struct {
6969
PlannerOptions []plan.Option
7070
Logger *zap.Logger
7171
Verbose bool
72+
// MetricLabelKeys is a list of labels to add to the metrics produced by the controller.
73+
// The value for a given key will be read off the context.
74+
// The context value must be a string or an implementation of the Stringer interface.
75+
MetricLabelKeys []string
7276
}
7377

7478
type QueryID uint64
@@ -90,18 +94,19 @@ func New(c Config) *Controller {
9094
pplanner: plan.NewPlanner(c.PlannerOptions...),
9195
executor: execute.NewExecutor(c.ExecutorDependencies, logger),
9296
logger: logger,
93-
metrics: newControllerMetrics(),
9497
verbose: c.Verbose,
98+
metrics: newControllerMetrics(c.MetricLabelKeys),
99+
labelKeys: c.MetricLabelKeys,
95100
}
96101
go ctrl.run()
97102
return ctrl
98103
}
99104

100105
// Query submits a query for execution returning immediately.
101106
// Done must be called on any returned Query objects.
102-
func (c *Controller) Query(ctx context.Context, req *flux.Request) (flux.Query, error) {
103-
q := c.createQuery(ctx, req.OrganizationID)
104-
if err := c.compileQuery(q, req.Compiler); err != nil {
107+
func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) {
108+
q := c.createQuery(ctx, compiler.CompilerType())
109+
if err := c.compileQuery(q, compiler); err != nil {
105110
q.parentSpan.Finish()
106111
return nil, err
107112
}
@@ -112,11 +117,28 @@ func (c *Controller) Query(ctx context.Context, req *flux.Request) (flux.Query,
112117
return q, nil
113118
}
114119

115-
func (c *Controller) createQuery(ctx context.Context, orgID platform.ID) *Query {
120+
type Stringer interface {
121+
String() string
122+
}
123+
124+
func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) *Query {
116125
id := c.nextID()
117-
labelValues := []string{
118-
orgID.String(),
126+
labelValues := make([]string, len(c.labelKeys))
127+
compileLabelValues := make([]string, len(c.labelKeys)+1)
128+
for i, k := range c.labelKeys {
129+
value := ctx.Value(k)
130+
var str string
131+
switch v := value.(type) {
132+
case string:
133+
str = v
134+
case Stringer:
135+
str = v.String()
136+
}
137+
labelValues[i] = str
138+
compileLabelValues[i] = str
119139
}
140+
compileLabelValues[len(compileLabelValues)-1] = string(ct)
141+
120142
cctx, cancel := context.WithCancel(ctx)
121143
parentSpan, parentCtx := StartSpanFromContext(
122144
cctx,
@@ -126,16 +148,16 @@ func (c *Controller) createQuery(ctx context.Context, orgID platform.ID) *Query
126148
)
127149
ready := make(chan map[string]flux.Result, 1)
128150
return &Query{
129-
id: id,
130-
orgID: orgID,
131-
labelValues: labelValues,
132-
state: Created,
133-
c: c,
134-
now: time.Now().UTC(),
135-
ready: ready,
136-
parentCtx: parentCtx,
137-
parentSpan: parentSpan,
138-
cancel: cancel,
151+
id: id,
152+
labelValues: labelValues,
153+
compileLabelValues: compileLabelValues,
154+
state: Created,
155+
c: c,
156+
now: time.Now().UTC(),
157+
ready: ready,
158+
parentCtx: parentCtx,
159+
parentSpan: parentSpan,
160+
cancel: cancel,
139161
}
140162
}
141163

@@ -296,7 +318,7 @@ func (c *Controller) processQuery(q *Query) (pop bool, err error) {
296318
return true, errors.New("failed to transition query into executing state")
297319
}
298320
q.alloc = new(execute.Allocator)
299-
r, err := c.executor.Execute(q.executeCtx, q.orgID, q.plan, q.alloc)
321+
r, err := c.executor.Execute(q.executeCtx, q.plan, q.alloc)
300322
if err != nil {
301323
return true, errors.Wrap(err, "failed to execute query")
302324
}
@@ -338,9 +360,8 @@ func (c *Controller) PrometheusCollectors() []prometheus.Collector {
338360
type Query struct {
339361
id QueryID
340362

341-
orgID platform.ID
342-
343-
labelValues []string
363+
labelValues []string
364+
compileLabelValues []string
344365

345366
c *Controller
346367

@@ -383,10 +404,6 @@ func (q *Query) ID() QueryID {
383404
return q.id
384405
}
385406

386-
func (q *Query) OrganizationID() platform.ID {
387-
return q.orgID
388-
}
389-
390407
func (q *Query) Spec() *flux.Spec {
391408
return &q.spec
392409
}
@@ -549,8 +566,8 @@ func (q *Query) tryCompile() bool {
549566
q.compileSpan, q.compilingCtx = StartSpanFromContext(
550567
q.parentCtx,
551568
"compiling",
552-
q.c.metrics.compilingDur.WithLabelValues(q.labelValues...),
553-
q.c.metrics.compiling.WithLabelValues(q.labelValues...),
569+
q.c.metrics.compilingDur.WithLabelValues(q.compileLabelValues...),
570+
q.c.metrics.compiling.WithLabelValues(q.compileLabelValues...),
554571
)
555572

556573
q.state = Compiling

control/controller_test.go

Lines changed: 13 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/influxdata/flux/execute"
1111
"github.com/influxdata/flux/mock"
1212
"github.com/influxdata/flux/plan"
13-
"github.com/influxdata/platform"
1413
"github.com/pkg/errors"
1514
"github.com/prometheus/client_golang/prometheus"
1615
dto "github.com/prometheus/client_model/go"
@@ -33,18 +32,14 @@ func TestController_CompileQuery_Failure(t *testing.T) {
3332
}
3433

3534
ctrl := New(Config{})
36-
req := &flux.Request{
37-
OrganizationID: platform.ID("a"),
38-
Compiler: compiler,
39-
}
4035

4136
// Run the query. It should return an error.
42-
if _, err := ctrl.Query(context.Background(), req); err == nil {
37+
if _, err := ctrl.Query(context.Background(), compiler); err == nil {
4338
t.Fatal("expected error")
4439
}
4540

4641
// Verify the metrics say there are no queries.
47-
gauge, err := ctrl.metrics.all.GetMetricWithLabelValues(req.OrganizationID.String())
42+
gauge, err := ctrl.metrics.all.GetMetricWithLabelValues()
4843
if err != nil {
4944
t.Fatalf("unexpected error: %s", err)
5045
}
@@ -71,13 +66,9 @@ func TestController_EnqueueQuery_Failure(t *testing.T) {
7166
}
7267

7368
ctrl := New(Config{})
74-
req := &flux.Request{
75-
OrganizationID: platform.ID("a"),
76-
Compiler: compiler,
77-
}
7869

7970
// Run the query. It should return an error.
80-
if _, err := ctrl.Query(context.Background(), req); err == nil {
71+
if _, err := ctrl.Query(context.Background(), compiler); err == nil {
8172
t.Fatal("expected error")
8273
}
8374

@@ -86,7 +77,7 @@ func TestController_EnqueueQuery_Failure(t *testing.T) {
8677
"all": ctrl.metrics.all,
8778
"queueing": ctrl.metrics.queueing,
8879
} {
89-
gauge, err := gaugeVec.GetMetricWithLabelValues(req.OrganizationID.String())
80+
gauge, err := gaugeVec.GetMetricWithLabelValues()
9081
if err != nil {
9182
t.Fatalf("unexpected error: %s", err)
9283
}
@@ -104,19 +95,15 @@ func TestController_EnqueueQuery_Failure(t *testing.T) {
10495

10596
func TestController_ExecuteQuery_Failure(t *testing.T) {
10697
executor := mock.NewExecutor()
107-
executor.ExecuteFn = func(context.Context, platform.ID, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) {
98+
executor.ExecuteFn = func(context.Context, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) {
10899
return nil, errors.New("expected")
109100
}
110101

111102
ctrl := New(Config{})
112103
ctrl.executor = executor
113-
req := &flux.Request{
114-
OrganizationID: platform.ID("a"),
115-
Compiler: mockCompiler,
116-
}
117104

118105
// Run a query and then wait for it to be ready.
119-
q, err := ctrl.Query(context.Background(), req)
106+
q, err := ctrl.Query(context.Background(), mockCompiler)
120107
if err != nil {
121108
t.Fatalf("unexpected error: %s", err)
122109
}
@@ -134,7 +121,7 @@ func TestController_ExecuteQuery_Failure(t *testing.T) {
134121
q.Done()
135122

136123
// Verify the metrics say there are no queries.
137-
gauge, err := ctrl.metrics.all.GetMetricWithLabelValues(req.OrganizationID.String())
124+
gauge, err := ctrl.metrics.all.GetMetricWithLabelValues()
138125
if err != nil {
139126
t.Fatalf("unexpected error: %s", err)
140127
}
@@ -151,20 +138,16 @@ func TestController_ExecuteQuery_Failure(t *testing.T) {
151138

152139
func TestController_CancelQuery(t *testing.T) {
153140
executor := mock.NewExecutor()
154-
executor.ExecuteFn = func(context.Context, platform.ID, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) {
141+
executor.ExecuteFn = func(context.Context, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) {
155142
// Return an empty result.
156143
return map[string]flux.Result{}, nil
157144
}
158145

159146
ctrl := New(Config{})
160147
ctrl.executor = executor
161-
req := &flux.Request{
162-
OrganizationID: platform.ID("a"),
163-
Compiler: mockCompiler,
164-
}
165148

166149
// Run a query and then wait for it to be ready.
167-
q, err := ctrl.Query(context.Background(), req)
150+
q, err := ctrl.Query(context.Background(), mockCompiler)
168151
if err != nil {
169152
t.Fatalf("unexpected error: %s", err)
170153
}
@@ -179,7 +162,7 @@ func TestController_CancelQuery(t *testing.T) {
179162
q.Done()
180163

181164
// Verify the metrics say there are no queries.
182-
gauge, err := ctrl.metrics.all.GetMetricWithLabelValues(req.OrganizationID.String())
165+
gauge, err := ctrl.metrics.all.GetMetricWithLabelValues()
183166
if err != nil {
184167
t.Fatalf("unexpected error: %s", err)
185168
}
@@ -198,20 +181,16 @@ func TestController_BlockedExecutor(t *testing.T) {
198181
done := make(chan struct{})
199182

200183
executor := mock.NewExecutor()
201-
executor.ExecuteFn = func(context.Context, platform.ID, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) {
184+
executor.ExecuteFn = func(context.Context, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) {
202185
<-done
203186
return nil, nil
204187
}
205188

206189
ctrl := New(Config{})
207190
ctrl.executor = executor
208-
req := &flux.Request{
209-
OrganizationID: platform.ID("a"),
210-
Compiler: mockCompiler,
211-
}
212191

213192
// Run a query that will cause the controller to stall.
214-
q, err := ctrl.Query(context.Background(), req)
193+
q, err := ctrl.Query(context.Background(), mockCompiler)
215194
if err != nil {
216195
t.Fatalf("unexpected error: %s", err)
217196
}
@@ -234,7 +213,7 @@ func TestController_BlockedExecutor(t *testing.T) {
234213
}
235214
}()
236215

237-
if _, err := ctrl.Query(ctx, req); err == nil {
216+
if _, err := ctrl.Query(ctx, mockCompiler); err == nil {
238217
t.Fatal("expected error")
239218
} else if got, want := err, context.Canceled; got != want {
240219
t.Fatalf("unexpected error: got=%q want=%q", got, want)

0 commit comments

Comments
 (0)