Skip to content

Commit

Permalink
Merge pull request skydive-project#2302 from lebauce/feature/elastics…
Browse files Browse the repository at this point in the history
…earch_tls_invalid_cert

Elasticsearch: define several hosts and allow invalid certs
  • Loading branch information
safchain authored Dec 3, 2020
2 parents 2a67f49 + b286a88 commit 8f0c434
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 58 deletions.
7 changes: 6 additions & 1 deletion analyzer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package analyzer

import (
"crypto/tls"
"errors"
"fmt"
"net/http"
"time"
Expand All @@ -34,6 +35,7 @@ import (
ondemand "github.com/skydive-project/skydive/flow/ondemand/client"
"github.com/skydive-project/skydive/flow/server"
"github.com/skydive-project/skydive/flow/storage"
"github.com/skydive-project/skydive/graffiti/api/rest"
etcdclient "github.com/skydive-project/skydive/graffiti/etcd/client"
etcdserver "github.com/skydive-project/skydive/graffiti/etcd/server"
"github.com/skydive-project/skydive/graffiti/graph"
Expand Down Expand Up @@ -120,7 +122,10 @@ func (s *Server) createStartupCapture() error {
logging.GetLogger().Infof("Invoke capturing of type '%s' from startup with gremlin: %s and BPF: %s", captureType, gremlin, bpf)
capture := types.NewCapture(gremlin, bpf)
capture.Type = captureType
return apiHandler.Create(capture, nil)
if err := apiHandler.Create(capture, nil); err != nil && !errors.Is(err, rest.ErrDuplicatedResource) {
return err
}
return nil
}

// Start the analyzer server
Expand Down
16 changes: 14 additions & 2 deletions analyzer/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,27 @@ func NewESConfig(name ...string) es.Config {
path += "elasticsearch"
}

cfg.ElasticHost = config.GetString(path + ".host")
// To be backwards compatible, check if .host key (old) has a string value.
// In that case, use that value as .hosts (converting the ip:port to http://ip:port)
// .host will have preference over .hosts
cfg.ElasticHosts = config.GetStringSlice(path + ".hosts")
oldElasticHost := config.GetString(path + ".host")
if oldElasticHost != "" {
cfg.ElasticHosts = []string{fmt.Sprintf("http://%s", oldElasticHost)}
}

cfg.InsecureSkipVerify = config.GetBool(path + ".ssl_insecure")
cfg.Username = config.GetString(path + ".auth.username")
cfg.Password = config.GetString(path + ".auth.password")
cfg.BulkMaxDelay = config.GetInt(path + ".bulk_maxdelay")
cfg.TotalFieldsLimit = config.GetInt(path + ".total_fields_limit")

cfg.EntriesLimit = config.GetInt(path + ".index_entries_limit")
cfg.AgeLimit = config.GetInt(path + ".index_age_limit")
cfg.IndicesLimit = config.GetInt(path + ".indices_to_keep")
cfg.NoSniffing = config.GetBool(path + ".disable_sniffing")
cfg.IndexPrefix = config.GetString(path + ".index_prefix")
cfg.NoHealthcheck = config.GetBool(path + ".disable_healthcheck")
cfg.Debug = config.GetBool(path + ".debug")

return cfg
}
Expand Down
7 changes: 5 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,10 @@ func init() {
cfg.SetDefault("rbac.model.policy_effect", []string{"some(where (p_eft == allow)) && !some(where (p_eft == deny))"})
cfg.SetDefault("rbac.model.matchers", []string{"g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act"})

// defined for backward compatibility and to set defaults
// storage section is defined for backward compatibility and to set defaults
cfg.SetDefault("storage.elasticsearch.driver", "elasticsearch")
cfg.SetDefault("storage.elasticsearch.host", "127.0.0.1:9200")
cfg.SetDefault("storage.elasticsearch.hosts", []string{"http://127.0.0.1:9200"})
cfg.SetDefault("storage.elasticsearch.ssl_insecure", false)
cfg.SetDefault("storage.elasticsearch.bulk_maxdelay", 5)
cfg.SetDefault("storage.elasticsearch.total_fields_limit", 1000)
cfg.SetDefault("storage.elasticsearch.index_age_limit", 0)
Expand All @@ -219,6 +220,8 @@ func init() {
"Metadata.OVN.Options",
"Metadata.OVN.IPv6RAConfigs",
})
cfg.SetDefault("storage.elasticsearch.disable_sniffing", false)
cfg.SetDefault("storage.elasticsearch.disable_healthcheck", false)
cfg.SetDefault("storage.memory.driver", "memory")
cfg.SetDefault("storage.orientdb.driver", "orientdb")
cfg.SetDefault("storage.orientdb.addr", "http://localhost:2480")
Expand Down
24 changes: 23 additions & 1 deletion etc/skydive.yml.default
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,17 @@ storage:
# Elasticsearch backend information.
myelasticsearch:
# driver: elasticsearch
# host: 127.0.0.1:9200
# hosts:
# - http://127.0.0.1:9200

# Disable TLS certificate verification
# Default: false
# ssl_insecure: true

# Basic auth
# auth:
# username: user
# password: secret

# Define the maximum delay before flushing document
# bulk_maxdelay: 5
Expand Down Expand Up @@ -416,6 +426,18 @@ storage:
# - Metadata.OVN.Options
# - Metadata.OVN.IPv6RAConfigs

# Snif Nodes Info API to get all the nodes in the cluster
# See https://pkg.go.dev/gopkg.in/olivere/elastic.v2?tab=doc#NewClient
# Default: false
# disable_sniffing: true

# Disable health check
# Default: false
# disable_healthcheck: true

# Debug queries
# debug: false

# OrientDB backend information.
myorientdb:
# driver: orientdb
Expand Down
1 change: 1 addition & 0 deletions graffiti/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ require (
github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d
github.com/olivere/elastic/v7 v7.0.21
github.com/pierrec/xxHash v0.1.5
github.com/pkg/errors v0.9.1
github.com/pmylund/go-cache v2.1.0+incompatible
github.com/safchain/insanelock v0.0.0-20200217234559-cfbf166e05b3
github.com/skydive-project/go-debouncer v1.0.0
Expand Down
163 changes: 112 additions & 51 deletions graffiti/storage/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package elasticsearch

import (
"context"
"errors"
"crypto/tls"
"fmt"
"net/http"
"net/url"
"sort"
"strings"
Expand All @@ -31,7 +32,7 @@ import (
version "github.com/hashicorp/go-version"
uuid "github.com/nu7hatch/gouuid"
elastic "github.com/olivere/elastic/v7"
esconfig "github.com/olivere/elastic/v7/config"
"github.com/pkg/errors"

etcd "github.com/skydive-project/skydive/graffiti/etcd/client"
"github.com/skydive-project/skydive/graffiti/filters"
Expand All @@ -41,19 +42,27 @@ import (

const (
schemaVersion = "13"
minimalVersion = "5.5"
minimalVersion = "7.0"
)

var errOutdatedVersion = errors.New("elasticsearch server doesn't match the minimal required version")

// Config describes configuration for elasticsearch
type Config struct {
ElasticHost string
BulkMaxDelay int
TotalFieldsLimit int
EntriesLimit int
AgeLimit int
IndicesLimit int
NoSniffing bool
IndexPrefix string
ElasticHosts []string
InsecureSkipVerify bool
Username string
Password string
BulkMaxDelay int
TotalFieldsLimit int
EntriesLimit int
AgeLimit int
IndicesLimit int
NoSniffing bool
IndexPrefix string
SniffingScheme string
NoHealthcheck bool
Debug bool
}

// ClientInterface describes the mechanism API of ElasticSearch database client
Expand Down Expand Up @@ -81,7 +90,6 @@ type Index struct {
type Client struct {
sync.RWMutex
Config Config
url *url.URL
esClient *elastic.Client
bulkProcessor *elastic.BulkProcessor
started atomic.Value
Expand All @@ -91,6 +99,30 @@ type Client struct {
masterElection etcd.MasterElection
}

// TraceLogger implements the oliviere/elastic Logger interface to be used with trace messages
type TraceLogger struct{}

// Printf sends elastic trace messages to skydive logger Debug
func (l TraceLogger) Printf(format string, v ...interface{}) {
logging.GetLogger().Debugf(format, v...)
}

// InfoLogger implements the oliviere/elastic Logger interface to be used with info messages
type InfoLogger struct{}

// Printf sends elastic info messages to skydive logger Info
func (l InfoLogger) Printf(format string, v ...interface{}) {
logging.GetLogger().Infof(format, v...)
}

// ErrorLogger implements the oliviere/elastic Logger interface to be used with error mesages
type ErrorLogger struct{}

// Printf sends elastic error messages to skydive logger Error
func (l ErrorLogger) Printf(format string, v ...interface{}) {
logging.GetLogger().Errorf(format, v...)
}

var (
// ErrBadConfig error bad configuration file
ErrBadConfig = func(reason string) error { return fmt.Errorf("Config file is misconfigured: %s", reason) }
Expand Down Expand Up @@ -203,20 +235,65 @@ func (c *Client) createIndices() error {
return nil
}

func (c *Client) start() error {
esConfig, err := esconfig.Parse(c.url.String())
func (c *Client) checkServerVersion(host, minimalVersion string) error {
vt, err := c.esClient.ElasticsearchVersion(host)
if err != nil {
return err
return errors.Wrapf(err, "unable to retrieve the version for '%s'", host)
}

v, err := version.NewVersion(vt)
if err != nil {
return errors.Wrapf(err, "unable to parse the version for '%s'", host)
}

if c.Config.NoSniffing {
esConfig.Sniff = new(bool)
*esConfig.Sniff = false
min, _ := version.NewVersion(minimalVersion)
if v.LessThan(min) {
return errors.Wrapf(errOutdatedVersion, "requires at least %s, found %s", minimalVersion, vt)
}

esClient, err := elastic.NewClientFromConfig(esConfig)
return nil
}

func (c *Client) start() error {
httpClient := http.DefaultClient

if c.Config.InsecureSkipVerify {
logging.GetLogger().Warning("Skipping SSL certificates verification")

tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
httpClient = &http.Client{Transport: tr}
}

var traceLogger, infoLogger elastic.Logger
if c.Config.Debug {
traceLogger = TraceLogger{}
infoLogger = InfoLogger{}
}

scheme := elastic.DefaultScheme
if len(c.Config.ElasticHosts) > 0 {
url, err := url.Parse(c.Config.ElasticHosts[0])
if err != nil {
return errors.Wrap(err, "invalid host url")
}
scheme = url.Scheme
}

esClient, err := elastic.NewClient(
elastic.SetHttpClient(httpClient),
elastic.SetURL(c.Config.ElasticHosts...),
elastic.SetBasicAuth(c.Config.Username, c.Config.Password),
elastic.SetSniff(!c.Config.NoSniffing),
elastic.SetScheme(scheme),
elastic.SetHealthcheck(!c.Config.NoHealthcheck),
elastic.SetTraceLog(traceLogger),
elastic.SetInfoLog(infoLogger),
elastic.SetErrorLog(ErrorLogger{}),
)
if err != nil {
return err
return fmt.Errorf("creating elasticsearch client: %s", err)
}
c.esClient = esClient

Expand All @@ -237,24 +314,27 @@ func (c *Client) start() error {
FlushInterval(time.Duration(c.Config.BulkMaxDelay) * time.Second).
Do(context.Background())
if err != nil {
return err
return fmt.Errorf("creating elasticsearch bulk processor: %s", err)
}

c.bulkProcessor = bulkProcessor

vt, err := esClient.ElasticsearchVersion(c.url.String())
if err != nil {
return fmt.Errorf("Unable to get the version: %s", vt)
}

v, err := version.NewVersion(vt)
if err != nil {
return fmt.Errorf("Unable to parse the version: %s", vt)
// check minimal version for all servers
checkVersion := false
for _, host := range c.Config.ElasticHosts {
err := c.checkServerVersion(host, minimalVersion)
if err != nil {
if errors.Is(err, errOutdatedVersion) {
return err
}
logging.GetLogger().Warning(err)
} else {
checkVersion = true
}
}

min, _ := version.NewVersion(minimalVersion)
if v.LessThan(min) {
return fmt.Errorf("Elasticsearch backend requires a minimal version of %s, found: %s", minimalVersion, vt)
if !checkVersion {
return errors.New("failed to verify minimal versions of elasticsearch servers")
}

if c.masterElection == nil || c.masterElection.IsMaster() {
Expand Down Expand Up @@ -471,19 +551,6 @@ func (c *Client) Started() bool {
return c.started.Load() == true
}

func urlFromHost(host string) (*url.URL, error) {
urlStr := host
if !strings.HasPrefix(urlStr, "http://") && !strings.HasPrefix(urlStr, "https://") {
urlStr = "http://" + urlStr
}

url, err := url.Parse(urlStr)
if err != nil || url.Port() == "" {
return nil, ErrBadConfig(fmt.Sprintf("wrong url format, %s", urlStr))
}
return url, nil
}

// GetClient returns the elastic client object
func (c *Client) GetClient() *elastic.Client {
return c.esClient
Expand All @@ -498,11 +565,6 @@ func (c *Client) AddEventListener(listener storage.EventListener) {

// NewClient creates a new ElasticSearch client based on configuration
func NewClient(indices []Index, cfg Config, electionService etcd.MasterElectionService) (*Client, error) {
url, err := urlFromHost(cfg.ElasticHost)
if err != nil {
return nil, err
}

var names []string

indicesMap := make(map[string]Index, 0)
Expand All @@ -525,7 +587,6 @@ func NewClient(indices []Index, cfg Config, electionService etcd.MasterElectionS

client := &Client{
Config: cfg,
url: url,
indices: indicesMap,
masterElection: electionService.NewElection("/elections/es-index-creator-" + u5.String()),
}
Expand Down
4 changes: 4 additions & 0 deletions scripts/ci/run-tests-utils.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#!/bin/bash

set -v

network_setup() {
sudo iptables -F
sudo iptables -P FORWARD ACCEPT
Expand Down
Loading

0 comments on commit 8f0c434

Please sign in to comment.