Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var createCmd = &cobra.Command{
RunE: RunECreate,
}

//RunECreate ...
// RunECreate ...
func RunECreate(cmd *cobra.Command, args []string) error {
configs, err := getCreateCmdConfig(cmd)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cli/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var deleteCmd = &cobra.Command{
RunE: RunEDelete,
}

//RunEDelete ...
// RunEDelete ...
func RunEDelete(cmd *cobra.Command, args []string) error {
req := connectors.ConnectorRequest{
Name: connector,
Expand Down
2 changes: 1 addition & 1 deletion cli/cmd/pause.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var pauseCmd = &cobra.Command{
RunE: RunEPause,
}

//RunEPause ...
// RunEPause ...
func RunEPause(cmd *cobra.Command, args []string) error {
req := connectors.ConnectorRequest{
Name: connector,
Expand Down
2 changes: 1 addition & 1 deletion cli/cmd/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var resumeCmd = &cobra.Command{
RunE: RunEResume,
}

//RunEResume ...
// RunEResume ...
func RunEResume(cmd *cobra.Command, args []string) error {
req := connectors.ConnectorRequest{
Name: connector,
Expand Down
2 changes: 1 addition & 1 deletion cli/cmd/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var updateCmd = &cobra.Command{
RunE: RunEUpdate,
}

//RunEUpdate ...
// RunEUpdate ...
func RunEUpdate(cmd *cobra.Command, args []string) error {
req := connectors.CreateConnectorRequest{}

Expand Down
66 changes: 35 additions & 31 deletions lib/connectors/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *baseClient) SetHeader(name string, value string) {
c.restClient.SetHeader(name, value)
}

//ErrorResponse is generic error returned by kafka connect
// ErrorResponse is generic error returned by kafka connect
type ErrorResponse struct {
ErrorCode int `json:"error_code,omitempty"`
Message string `json:"message,omitempty"`
Expand All @@ -68,15 +68,19 @@ func (err ErrorResponse) Error() string {
return fmt.Sprintf("error code: %d , message: %s", err.ErrorCode, err.Message)
}

func newBaseClient(url string) BaseClient {
func newBaseClient(url string, timeoutOptional ...time.Duration) BaseClient {
timeout := 10 * time.Second
if len(timeoutOptional) > 0 {
timeout = timeoutOptional[0] * time.Second
}
restClient := resty.New().
SetError(ErrorResponse{}).
SetHostURL(url).
SetHeader("Accept", "application/json").
SetRetryCount(5).
SetRetryWaitTime(500 * time.Millisecond).
SetRetryMaxWaitTime(5 * time.Second).
SetTimeout(10 * time.Second).
SetTimeout(timeout).
AddRetryCondition(func(resp *resty.Response) (bool, error) {
return resp.StatusCode() == 409, nil
})
Expand All @@ -86,52 +90,52 @@ func newBaseClient(url string) BaseClient {

// ------------- Connectors ------------

//ConnectorRequest is generic request used when interacting with connector endpoint
// ConnectorRequest is generic request used when interacting with connector endpoint
type ConnectorRequest struct {
Name string `json:"name"`
}

//EmptyResponse is response returned by multiple endpoint when only StatusCode matter
// EmptyResponse is response returned by multiple endpoint when only StatusCode matter
type EmptyResponse struct {
Code int
ErrorResponse
}

//CreateConnectorRequest is request used for creating connector
// CreateConnectorRequest is request used for creating connector
type CreateConnectorRequest struct {
ConnectorRequest
Config map[string]interface{} `json:"config"`
}

//GetAllConnectorsResponse is request used to get list of available connectors
// GetAllConnectorsResponse is request used to get list of available connectors
type GetAllConnectorsResponse struct {
EmptyResponse
Connectors []string
}

//ConnectorResponse is generic response when interacting with connector endpoint
// ConnectorResponse is generic response when interacting with connector endpoint
type ConnectorResponse struct {
EmptyResponse
Name string `json:"name"`
Config map[string]interface{} `json:"config"`
Tasks []TaskID `json:"tasks"`
}

//GetConnectorConfigResponse is response returned by GetConfig endpoint
// GetConnectorConfigResponse is response returned by GetConfig endpoint
type GetConnectorConfigResponse struct {
EmptyResponse
Config map[string]interface{}
}

//GetConnectorStatusResponse is response returned by GetStatus endpoint
// GetConnectorStatusResponse is response returned by GetStatus endpoint
type GetConnectorStatusResponse struct {
EmptyResponse
Name string `json:"name"`
ConnectorStatus map[string]string `json:"connector"`
TasksStatus []TaskStatus `json:"tasks"`
}

//GetAll gets the list of all active connectors
// GetAll gets the list of all active connectors
func (c *baseClient) GetAll() (GetAllConnectorsResponse, error) {
result := GetAllConnectorsResponse{}
var connectors []string
Expand All @@ -153,7 +157,7 @@ func (c *baseClient) GetAll() (GetAllConnectorsResponse, error) {
return result, nil
}

//GetConnector return information on specific connector
// GetConnector return information on specific connector
func (c *baseClient) GetConnector(req ConnectorRequest) (ConnectorResponse, error) {
result := ConnectorResponse{}

Expand All @@ -173,7 +177,7 @@ func (c *baseClient) GetConnector(req ConnectorRequest) (ConnectorResponse, erro
return result, nil
}

//CreateConnector create connector using specified config and name
// CreateConnector create connector using specified config and name
func (c *baseClient) CreateConnector(req CreateConnectorRequest) (ConnectorResponse, error) {
result := ConnectorResponse{}

Expand All @@ -193,7 +197,7 @@ func (c *baseClient) CreateConnector(req CreateConnectorRequest) (ConnectorRespo
return result, nil
}

//UpdateConnector update a connector config
// UpdateConnector update a connector config
func (c *baseClient) UpdateConnector(req CreateConnectorRequest) (ConnectorResponse, error) {
result := ConnectorResponse{}

Expand All @@ -214,7 +218,7 @@ func (c *baseClient) UpdateConnector(req CreateConnectorRequest) (ConnectorRespo
return result, nil
}

//DeleteConnector delete a connector
// DeleteConnector delete a connector
func (c *baseClient) DeleteConnector(req ConnectorRequest) (EmptyResponse, error) {
result := EmptyResponse{}

Expand All @@ -234,7 +238,7 @@ func (c *baseClient) DeleteConnector(req ConnectorRequest) (EmptyResponse, error
return result, nil
}

////GetConnectorConfig return config of a connector
// //GetConnectorConfig return config of a connector
func (c *baseClient) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfigResponse, error) {
result := GetConnectorConfigResponse{}
var config map[string]interface{}
Expand All @@ -255,7 +259,7 @@ func (c *baseClient) GetConnectorConfig(req ConnectorRequest) (GetConnectorConfi
return result, nil
}

//GetConnectorStatus return current status of connector
// GetConnectorStatus return current status of connector
func (c *baseClient) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatusResponse, error) {
result := GetConnectorStatusResponse{}

Expand All @@ -274,7 +278,7 @@ func (c *baseClient) GetConnectorStatus(req ConnectorRequest) (GetConnectorStatu
return result, nil
}

//RestartConnector restart connector
// RestartConnector restart connector
func (c *baseClient) RestartConnector(req ConnectorRequest) (EmptyResponse, error) {
result := EmptyResponse{}

Expand All @@ -293,8 +297,8 @@ func (c *baseClient) RestartConnector(req ConnectorRequest) (EmptyResponse, erro
return result, nil
}

//PauseConnector pause a running connector
//asynchronous operation
// PauseConnector pause a running connector
// asynchronous operation
func (c *baseClient) PauseConnector(req ConnectorRequest) (EmptyResponse, error) {
result := EmptyResponse{}

Expand All @@ -314,8 +318,8 @@ func (c *baseClient) PauseConnector(req ConnectorRequest) (EmptyResponse, error)
return result, nil
}

//ResumeConnector resume a paused connector
//asynchronous operation
// ResumeConnector resume a paused connector
// asynchronous operation
func (c *baseClient) ResumeConnector(req ConnectorRequest) (EmptyResponse, error) {
result := EmptyResponse{}

Expand All @@ -337,45 +341,45 @@ func (c *baseClient) ResumeConnector(req ConnectorRequest) (EmptyResponse, error

// ----------- Tasks ---------

//TaskRequest is generic request when interacting with task endpoint
// TaskRequest is generic request when interacting with task endpoint
type TaskRequest struct {
Connector string
TaskID int
}

//GetAllTasksResponse is response to get all tasks of a specific endpoint
// GetAllTasksResponse is response to get all tasks of a specific endpoint
type GetAllTasksResponse struct {
Code int
Tasks []TaskDetails
}

//TaskDetails is detail of a specific task on a specific endpoint
// TaskDetails is detail of a specific task on a specific endpoint
type TaskDetails struct {
ID TaskID `json:"id"`
Config map[string]interface{} `json:"config"`
}

//TaskID identify a task and its connector
// TaskID identify a task and its connector
type TaskID struct {
Connector string `json:"connector"`
TaskID int `json:"task"`
}

//TaskStatusResponse is response returned by get task status endpoint
// TaskStatusResponse is response returned by get task status endpoint
type TaskStatusResponse struct {
Code int
Status TaskStatus
}

//TaskStatus define task status
// TaskStatus define task status
type TaskStatus struct {
ID int `json:"id"`
State string `json:"state"`
WorkerID string `json:"worker_id"`
Trace string `json:"trace,omitempty"`
}

//GetAllTasks return list of running task
// GetAllTasks return list of running task
func (c *baseClient) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, error) {
var result GetAllTasksResponse

Expand All @@ -394,7 +398,7 @@ func (c *baseClient) GetAllTasks(req ConnectorRequest) (GetAllTasksResponse, err
return result, nil
}

//GetTaskStatus return current status of task
// GetTaskStatus return current status of task
func (c *baseClient) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error) {
var result TaskStatusResponse

Expand All @@ -414,7 +418,7 @@ func (c *baseClient) GetTaskStatus(req TaskRequest) (TaskStatusResponse, error)
return result, nil
}

//RestartTask try to restart task
// RestartTask try to restart task
func (c *baseClient) RestartTask(req TaskRequest) (EmptyResponse, error) {
var result EmptyResponse

Expand Down
Loading