Skip to content

Commit 503d91c

Browse files
author
licheng
committed
optimize interface for etl jobs, add test cases and examples for etl
1 parent 48be6df commit 503d91c

4 files changed

+199
-55
lines changed

client_etl.go

+45-32
Original file line numberDiff line numberDiff line change
@@ -48,50 +48,45 @@ type ETLSink struct {
4848
}
4949

5050
type ListETLResponse struct {
51-
Total int `json:"total"`
52-
Count int `json:"count"`
51+
Total int `json:"total"`
52+
Count int `json:"count"`
5353
Results []*ETL `json:"results"`
5454
}
5555

56-
5756
func NewETL(endpoint, accessKeyId, accessKeySecret, logstore, name, project string) ETL {
5857
sink := ETLSink{
59-
AccessKeyId:accessKeyId,
60-
AccessKeySecret:accessKeySecret,
61-
Endpoint:endpoint,
62-
Logstore:logstore,
63-
Name:name,
64-
Project:project,
65-
Type: ETLSinksType,
66-
}
67-
config := ETLConfiguration {
68-
AccessKeyId:accessKeyId,
69-
AccessKeySecret:accessKeySecret,
70-
FromTime: time.Now().Unix(),
71-
Script: "e_set('new','aliyun')",
72-
Version:ETLVersion,
73-
Logstore:logstore,
74-
ETLSinks:[]ETLSink{sink},
75-
Parameters: map[string]string{},
76-
58+
AccessKeyId: accessKeyId,
59+
AccessKeySecret: accessKeySecret,
60+
Endpoint: endpoint,
61+
Logstore: logstore,
62+
Name: name,
63+
Project: project,
64+
Type: ETLSinksType,
65+
}
66+
config := ETLConfiguration{
67+
AccessKeyId: accessKeyId,
68+
AccessKeySecret: accessKeySecret,
69+
FromTime: time.Now().Unix(),
70+
Script: "e_set('new','aliyun')",
71+
Version: ETLVersion,
72+
Logstore: logstore,
73+
ETLSinks: []ETLSink{sink},
74+
Parameters: map[string]string{},
7775
}
7876
schedule := ETLSchedule{
79-
Type:"Resident",
77+
Type: "Resident",
8078
}
81-
etljob := ETL {
82-
Configuration:config,
83-
DisplayName:"displayname",
84-
Description:"go sdk case",
85-
Name:name,
86-
Schedule:schedule,
87-
Type:ETLType,
88-
79+
etljob := ETL{
80+
Configuration: config,
81+
DisplayName: "displayname",
82+
Description: "go sdk case",
83+
Name: name,
84+
Schedule: schedule,
85+
Type: ETLType,
8986
}
9087
return etljob
9188
}
9289

93-
94-
9590
func (c *Client) CreateETL(project string, etljob ETL) error {
9691
body, err := json.Marshal(etljob)
9792
if err != nil {
@@ -212,3 +207,21 @@ func (c *Client) StopETL(project, name string) error {
212207
r.Body.Close()
213208
return nil
214209
}
210+
211+
func (c *Client) RestartETL(project string, etljob ETL) error {
212+
body, err := json.Marshal(etljob)
213+
if err != nil {
214+
return NewClientError(err)
215+
}
216+
h := map[string]string{
217+
"x-log-bodyrawsize": fmt.Sprintf("%v", len(body)),
218+
"Content-Type": "application/json",
219+
}
220+
uri := fmt.Sprintf("/jobs/%s?action=RESTART", etljob.Name)
221+
r, err := c.request(project, "PUT", uri, h, body)
222+
if err != nil {
223+
return err
224+
}
225+
r.Body.Close()
226+
return nil
227+
}

client_etl_test.go

+65-23
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package sls
22

33
import (
4-
"github.com/stretchr/testify/suite"
54
"os"
65
"testing"
76
"time"
7+
8+
"github.com/stretchr/testify/suite"
89
)
910

1011
func TestETLJobV2(t *testing.T) {
@@ -19,7 +20,6 @@ type ETLJobTestV2Suite struct {
1920
accessKeyID string
2021
accessKeySecret string
2122
targetLogstoreName string
22-
etlName string
2323
client *Client
2424
}
2525

@@ -37,12 +37,12 @@ func (s *ETLJobTestV2Suite) SetupTest() {
3737
}
3838
}
3939

40-
func (s *ETLJobTestV2Suite) createETLJobV2() error {
40+
func (s *ETLJobTestV2Suite) createETLJobV2(etlName string) error {
4141
sink := ETLSink{
4242
AccessKeyId: s.accessKeyID,
4343
AccessKeySecret: s.accessKeySecret,
4444
Endpoint: s.endpoint,
45-
Logstore: s.logstoreName,
45+
Logstore: s.targetLogstoreName,
4646
Name: "aliyun-etl-test",
4747
Project: s.projectName,
4848
}
@@ -63,86 +63,128 @@ func (s *ETLJobTestV2Suite) createETLJobV2() error {
6363
Configuration: config,
6464
DisplayName: "displayName",
6565
Description: "go sdk case",
66-
Name: s.etlName,
66+
Name: etlName,
6767
Schedule: schedule,
6868
Type: "ETL",
6969
}
7070
return s.client.CreateETL(s.projectName, etljob)
7171
}
7272

7373
func (s *ETLJobTestV2Suite) TestClient_UpdateETLJobV2() {
74-
err := s.createETLJobV2()
74+
etlName := "test_update_etl"
75+
err := s.createETLJobV2(etlName)
7576
s.Require().Nil(err)
76-
etljob, err := s.client.GetETL(s.projectName, s.etlName)
77+
etljob, err := s.client.GetETL(s.projectName, etlName)
7778
s.Require().Nil(err)
7879
etljob.DisplayName = "update"
7980
etljob.Description = "update description"
8081
etljob.Configuration.Script = "e_set('update','update')"
8182
err = s.client.UpdateETL(s.projectName, *etljob)
8283
s.Require().Nil(err)
83-
etljob, err = s.client.GetETL(s.projectName, s.etlName)
84+
etljob, err = s.client.GetETL(s.projectName, etlName)
8485
s.Require().Nil(err)
8586
s.Require().Equal("update", etljob.DisplayName)
8687
s.Require().Equal("update description", etljob.Description)
87-
err = s.client.DeleteETL(s.projectName, s.etlName)
88+
err = s.client.DeleteETL(s.projectName, etlName)
8889
s.Require().Nil(err)
8990
}
9091

9192
func (s *ETLJobTestV2Suite) TestClient_DeleteETLJobV2() {
92-
err := s.createETLJobV2()
93+
etlName := "test_delete_etl"
94+
err := s.createETLJobV2(etlName)
9395
s.Require().Nil(err)
94-
_, err = s.client.GetETL(s.projectName, s.etlName)
96+
_, err = s.client.GetETL(s.projectName, etlName)
9597
s.Require().Nil(err)
96-
err = s.client.DeleteETL(s.projectName, s.etlName)
98+
err = s.client.DeleteETL(s.projectName, etlName)
9799
s.Require().Nil(err)
98100
time.Sleep(time.Second * 100)
99-
_, err = s.client.GetETL(s.projectName, s.etlName)
101+
_, err = s.client.GetETL(s.projectName, etlName)
100102
s.Require().NotNil(err)
101-
102103
}
103104

104105
func (s *ETLJobTestV2Suite) TestClient_ListETLJobV2() {
105-
err := s.createETLJobV2()
106+
etlName := "test_list_etl"
107+
err := s.createETLJobV2(etlName)
106108
s.Require().Nil(err)
107109
etljobList, err := s.client.ListETL(s.projectName, 0, 100)
108110
s.Require().Nil(err)
109111
s.Require().Equal(1, etljobList.Total)
110112
s.Require().Equal(1, etljobList.Count)
111-
err = s.client.DeleteETL(s.projectName, s.etlName)
113+
err = s.client.DeleteETL(s.projectName, etlName)
112114
s.Require().Nil(err)
113-
114115
}
115116

116117
func (s *ETLJobTestV2Suite) TestClient_StartStopETLJobV2() {
117-
err := s.createETLJobV2()
118+
etlName := "test_start_stop_etl"
119+
err := s.createETLJobV2(etlName)
118120
s.Require().Nil(err)
119121
for {
120-
etljob, err := s.client.GetETL(s.projectName, s.etlName)
122+
etljob, err := s.client.GetETL(s.projectName, etlName)
121123
s.Require().Nil(err)
122124
time.Sleep(10 * time.Second)
123125
if etljob.Status == "RUNNING" {
124126
break
125127
}
126128
}
127129

128-
err = s.client.StopETL(s.projectName, s.etlName)
130+
err = s.client.StopETL(s.projectName, etlName)
129131
for {
130-
etljob, err := s.client.GetETL(s.projectName, s.etlName)
132+
etljob, err := s.client.GetETL(s.projectName, etlName)
131133
s.Require().Nil(err)
132134
time.Sleep(10 * time.Second)
133135
if etljob.Status == "STOPPED" {
134136
break
135137
}
136138
}
137-
err = s.client.StartETL(s.projectName, s.etlName)
139+
err = s.client.StartETL(s.projectName, etlName)
140+
for {
141+
etljob, err := s.client.GetETL(s.projectName, etlName)
142+
s.Require().Nil(err)
143+
time.Sleep(10 * time.Second)
144+
if etljob.Status == "RUNNING" {
145+
break
146+
}
147+
}
148+
err = s.client.DeleteETL(s.projectName, etlName)
149+
s.Require().Nil(err)
150+
}
151+
152+
func (s *ETLJobTestV2Suite) TestClient_RestartETLJobV2() {
153+
etlName := "test_restart_etl"
154+
err := s.createETLJobV2(etlName)
155+
s.Require().Nil(err)
138156
for {
139-
etljob, err := s.client.GetETL(s.projectName, s.etlName)
157+
etljob, err := s.client.GetETL(s.projectName, etlName)
140158
s.Require().Nil(err)
141159
time.Sleep(10 * time.Second)
142160
if etljob.Status == "RUNNING" {
143161
break
144162
}
145163
}
146164

165+
etljob, err := s.client.GetETL(s.projectName, etlName)
166+
s.Require().Nil(err)
167+
etljob.DisplayName = "update"
168+
etljob.Description = "update description"
169+
etljob.Configuration.Script = "e_set('update','update')"
170+
171+
err = s.client.RestartETL(s.projectName, *etljob)
172+
s.Require().Nil(err)
173+
174+
for {
175+
time.Sleep(10 * time.Second)
176+
etljob, err := s.client.GetETL(s.projectName, etlName)
177+
s.Require().Nil(err)
178+
if etljob.Status == "RUNNING" {
179+
break
180+
}
181+
}
182+
183+
etljob, err = s.client.GetETL(s.projectName, etlName)
184+
s.Require().Nil(err)
185+
s.Require().Equal("update", etljob.DisplayName)
186+
s.Require().Equal("update description", etljob.Description)
147187

188+
err = s.client.DeleteETL(s.projectName, etlName)
189+
s.Require().Nil(err)
148190
}

client_interface.go

+9
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@ type ClientInterface interface {
149149
RemoveConfigFromMachineGroup(project string, confName, groupName string) (err error)
150150

151151
// #################### ETL Operations #####################
152+
CreateETL(project string, etljob ETL) error
153+
UpdateETL(project string, etljob ETL) error
154+
GetETL(project string, etlName string) (ETLJob *ETL, err error)
155+
ListETL(project string, offset int, size int) (*ListETLResponse, error)
156+
DeleteETL(project string, etlName string) error
157+
StartETL(project, name string) error
158+
StopETL(project, name string) error
159+
RestartETL(project string, etljob ETL) error
160+
152161
CreateEtlMeta(project string, etlMeta *EtlMeta) (err error)
153162
UpdateEtlMeta(project string, etlMeta *EtlMeta) (err error)
154163
DeleteEtlMeta(project string, etlMetaName, etlMetaKey string) (err error)

token_auto_update_client.go

+80
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,86 @@ func (c *TokenAutoUpdateClient) RemoveConfigFromMachineGroup(project string, con
476476
return
477477
}
478478

479+
func (c *TokenAutoUpdateClient) CreateETL(project string, etljob ETL) (err error) {
480+
for i := 0; i < c.maxTryTimes; i++ {
481+
err = c.logClient.CreateETL(project, etljob)
482+
if !c.processError(err) {
483+
return
484+
}
485+
}
486+
return
487+
}
488+
489+
func (c *TokenAutoUpdateClient) UpdateETL(project string, etljob ETL) (err error) {
490+
for i := 0; i < c.maxTryTimes; i++ {
491+
err = c.logClient.UpdateETL(project, etljob)
492+
if !c.processError(err) {
493+
return
494+
}
495+
}
496+
return
497+
}
498+
499+
func (c *TokenAutoUpdateClient) GetETL(project string, etlName string) (ETLJob *ETL, err error) {
500+
for i := 0; i < c.maxTryTimes; i++ {
501+
ETLJob, err = c.logClient.GetETL(project, etlName)
502+
if !c.processError(err) {
503+
return
504+
}
505+
}
506+
return
507+
}
508+
509+
func (c *TokenAutoUpdateClient) ListETL(project string, offset int, size int) (ETLResponse *ListETLResponse, err error) {
510+
for i := 0; i < c.maxTryTimes; i++ {
511+
ETLResponse, err = c.logClient.ListETL(project, offset, size)
512+
if !c.processError(err) {
513+
return
514+
}
515+
}
516+
return
517+
}
518+
519+
func (c *TokenAutoUpdateClient) DeleteETL(project string, etlName string) (err error) {
520+
for i := 0; i < c.maxTryTimes; i++ {
521+
err = c.logClient.DeleteETL(project, etlName)
522+
if !c.processError(err) {
523+
return
524+
}
525+
}
526+
return
527+
}
528+
529+
func (c *TokenAutoUpdateClient) StartETL(project string, name string) (err error) {
530+
for i := 0; i < c.maxTryTimes; i++ {
531+
err = c.logClient.StartETL(project, name)
532+
if !c.processError(err) {
533+
return
534+
}
535+
}
536+
return
537+
}
538+
539+
func (c *TokenAutoUpdateClient) StopETL(project string, name string) (err error) {
540+
for i := 0; i < c.maxTryTimes; i++ {
541+
err = c.logClient.StopETL(project, name)
542+
if !c.processError(err) {
543+
return
544+
}
545+
}
546+
return
547+
}
548+
549+
func (c *TokenAutoUpdateClient) RestartETL(project string, etljob ETL) (err error) {
550+
for i := 0; i < c.maxTryTimes; i++ {
551+
err = c.logClient.RestartETL(project, etljob)
552+
if !c.processError(err) {
553+
return
554+
}
555+
}
556+
return
557+
}
558+
479559
func (c *TokenAutoUpdateClient) CreateEtlMeta(project string, etlMeta *EtlMeta) (err error) {
480560
for i := 0; i < c.maxTryTimes; i++ {
481561
err = c.logClient.CreateEtlMeta(project, etlMeta)

0 commit comments

Comments
 (0)