Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- [#6603](https://github.com/influxdata/telegraf/pull/6603): Add support for per output flush jitter.
- [#6650](https://github.com/influxdata/telegraf/pull/6650): Add a nameable file tag to file input plugin.
- [#6640](https://github.com/influxdata/telegraf/pull/6640): Add Splunk MultiMetric support.
- [#6680](https://github.com/influxdata/telegraf/pull/6668): Add support for sending HTTP Basic Auth in influxdb input

#### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub"
_ "github.com/influxdata/telegraf/plugins/inputs/cloud_pubsub_push"
_ "github.com/influxdata/telegraf/plugins/inputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/inputs/confluence"
_ "github.com/influxdata/telegraf/plugins/inputs/conntrack"
_ "github.com/influxdata/telegraf/plugins/inputs/consul"
_ "github.com/influxdata/telegraf/plugins/inputs/couchbase"
Expand Down
22 changes: 22 additions & 0 deletions plugins/inputs/confluence/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Confluence Input Plugin

Gather data from given confluence url

### Configuration

```toml
[[inputs.confluence]]
## The confluence url
url = "https://confluence.linuxfoundation.org"
# username = "admin"
# password = "admin"

## Set response_timeout
http_timeout = "5s"

## Worker pool for confluence plugin only
## Empty this field will use default value 5
# max_connections = 5
```

TODO
145 changes: 145 additions & 0 deletions plugins/inputs/confluence/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package confluence

import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
)

type client struct {
baseURL string
httpClient *http.Client
username string
password string
sessionCookie *http.Cookie
semaphore chan struct{}
}

func newClient(httpClient *http.Client, url, username, password string, maxConnections int) *client {
return &client{
baseURL: url,
httpClient: httpClient,
username: username,
password: password,
semaphore: make(chan struct{}, maxConnections),
}
}

func (c *client) init() error {
// get the session cookie
req, err := http.NewRequest("GET", c.baseURL, nil)
if err != nil {
return err
}

if c.username != "" && c.password != "" {
req.SetBasicAuth(c.username, c.password)
}

res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
for _, cookie := range res.Cookies() {
if strings.Contains(cookie.Name, "JSESSIONID") {
c.sessionCookie = cookie
break
}
}

// fetch
if err := c.doGet(context.Background(), spacePath, new(spaceResponse)); err != nil {
return err
}
return nil
}

func (c *client) doGet(ctx context.Context, url string, v interface{}) error {
req, err := createGetRequest(c.baseURL+url, c.username, c.password, c.sessionCookie)
if err != nil {
return err
}

select {
case c.semaphore <- struct{}{}:
break
case <-ctx.Done():
return ctx.Err()
}

res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
<-c.semaphore
return err
}
defer func() {
res.Body.Close()
<-c.semaphore
}()

// Clear invalid token if unauthorized
if res.StatusCode == http.StatusUnauthorized {
c.sessionCookie = nil
return APIError{
URL: url,
StatusCode: res.StatusCode,
Title: res.Status,
}
}
if res.StatusCode < 200 || res.StatusCode >= 300 {
return APIError{
URL: url,
StatusCode: res.StatusCode,
Title: res.Status,
}
}
if res.StatusCode == http.StatusNoContent {
return APIError{
URL: url,
StatusCode: res.StatusCode,
Title: res.Status,
}
}
if err = json.NewDecoder(res.Body).Decode(v); err != nil {
return err
}
return nil
}

type APIError struct {
URL string
StatusCode int
Title string
Description string
}

func (e APIError) Error() string {
if e.Description != "" {
return fmt.Sprintf("[%s] %s: %s", e.URL, e.Title, e.Description)
}
return fmt.Sprintf("[%s] %s", e.URL, e.Title)
}

func createGetRequest(url, username, password string, sessionCookie *http.Cookie) (*http.Request, error) {
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
if username != "" && password != "" {
req.SetBasicAuth(username, password)
}
if sessionCookie != nil {
req.AddCookie(sessionCookie)
}
req.Header.Add("Accept", "application/json")
return req, nil
}

func (c *client) getAllSpaces(ctx context.Context) (spaceResp *spaceResponse, err error) {
spaceResp = new(spaceResponse)
err = c.doGet(ctx, spacePath, spaceResp)
return spaceResp, err
}
200 changes: 200 additions & 0 deletions plugins/inputs/confluence/confluence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package confluence

import (
"context"
"fmt"
"net/http"
"net/url"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/internal/tls"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/selfstat"
)

// measurements
const (
spacePath = "/rest/api/space"
historyPath = "/rest/api/history"
versionPath = "/rest/api/version"

measurementSpace = "confluence_space"
measurementHistory = "confluence_history"
measurementVersion = "confluence_version"
)

// Confluence plugin gathers information about various confluence objects
type Confluence struct {
URL string `toml:"url"`
Username string `toml:"username"`
Password string `toml:"password"`
// HTTP Timeout specified as a string - 3s, 1m, 1h
HTTPTimeout internal.Duration `toml:"http_timeout"`
MaxConnections int `toml:"max_connections"`

tls.ClientConfig
client *client

//client *client
RateLimit selfstat.Stat
RateLimitErrors selfstat.Stat
RateRemaining selfstat.Stat
semaphore chan struct{}
}

const sampleConfig = `
## The confluence url
url = "https://confluence.linuxfoundation.org"
# username = "admin"
# password = "admin"

## Set response_timeout
http_timeout = "5s"

## Worker pool for confluence plugin only
## Empty this field will use default value 5
# max_connections = 5
`

// SampleConfig returns sample configuration for this plugin
func (c *Confluence) SampleConfig() string {
return sampleConfig
}

// Description implements telegraf.Input interface
func (c *Confluence) Description() string {
return "Gather confluence data."
}

// Gather implements telegraf.Input interface
func (c *Confluence) Gather(acc telegraf.Accumulator) error {
if c.client == nil {
client, err := c.newHTTPClient()
if err != nil {
return err
}
if err = c.initialize(client); err != nil {
return err
}
}

c.gatherSpacesData(acc)
return nil
}

// newHTTPClient instantiates a new http client when called
func (c *Confluence) newHTTPClient() (*http.Client, error) {
tlsCfg, err := c.ClientConfig.TLSConfig()
if err != nil {
return nil, fmt.Errorf("error parse confluence config[%s]: %v", c.URL, err)
}
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsCfg,
MaxIdleConns: c.MaxConnections,
},
Timeout: c.HTTPTimeout.Duration,
}, nil
}

// separate the client as dependency to use httptest Client for mocking
func (c *Confluence) initialize(client *http.Client) error {
if c.MaxConnections <= 0 {
c.MaxConnections = 5
}

c.semaphore = make(chan struct{})
c.client = newClient(client, c.URL, c.Username, c.Password, c.MaxConnections)

return c.client.init()
}

type space struct {
Id int32 `json:"id"`
Key string `json:"key"`
Name string `json:"name"`
Type string `json:"type"`
Links spaceLinks `json:"_links"`
Expandable spaceExpandable `json:"_expandable"`
}

type spaceLinks struct {
WebUI string `json:"webui"`
Self string `json:"self"`
}

type spaceExpandable struct {
Metadata string `json:"metadata"`
Icon string `json:"icon"`
Description string `json:"description"`
Homepage string `json:"homepage"`
}

type spaceResponse struct {
Results []space `json:"results"`
}

func (c *Confluence) gatherSpaceData(s space, acc telegraf.Accumulator) error {

tags := map[string]string{}
if s.Name == "" {
return fmt.Errorf("error empty space name")
}

u, err := url.Parse(c.URL)
if err != nil {
return err
}
tags["source"] = u.Hostname()
tags["port"] = u.Port()

fields := make(map[string]interface{})
fields["id"] = s.Id
fields["key"] = s.Key
fields["name"] = s.Name
fields["type"] = s.Type
fields["webui"] = s.Links.WebUI
fields["self"] = s.Links.Self

acc.AddFields(measurementSpace, fields, tags)
return nil
}

func (c *Confluence) gatherSpacesData(acc telegraf.Accumulator) {
spaceResp, err := c.client.getAllSpaces(context.Background())
if err != nil {
acc.AddError(err)
return
}
// get each space's data
for _, space := range spaceResp.Results {
err = c.gatherSpaceData(space, acc)
if err == nil {
continue
}
acc.AddError(err)
}
}

// wrap the tcp request with doGet
// block tcp request if buffered channel is full
func (c *Confluence) doGet(tcp func() error) error {
c.semaphore <- struct{}{}
if err := tcp(); err != nil {
<-c.semaphore
return err
}
<-c.semaphore
return nil
}

func init() {
inputs.Add("confluence", func() telegraf.Input {
return &Confluence{
HTTPTimeout: internal.Duration{Duration: time.Duration(time.Hour)},
MaxConnections: 5,
}
})
}
4 changes: 4 additions & 0 deletions plugins/inputs/influxdb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ InfluxDB-formatted endpoints. See below for more information.
"http://localhost:8086/debug/vars"
]

## Username and password to send using HTTP Basic Authentication.
# username = ""
# password = ""

## Optional TLS Config
# tls_ca = "/etc/telegraf/ca.pem"
# tls_cert = "/etc/telegraf/cert.pem"
Expand Down
Loading