Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Providing the ability to output to a file instead of the default output to NATS #364

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ run: nats
go$(v) mod tidy; \
DEBUG=true GOPROXY=direct GOSUMDB=off go run main.go

.PHONY: file
## output to a YAML file
file:
go mod tidy; \
DEBUG=true GOPROXY=direct GOSUMDB=off go run . -output file

.PHONY: check
## Lint check Meshsync.
check:
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func New(provider string) (config.Handler, error) {
err error
)
opts := configprovider.Options{
FilePath: utils.GetHome(),
FilePath: utils.GetHome() + "/.meshery",
FileType: "yaml",
FileName: "meshsync_config",
}
Expand Down
17 changes: 16 additions & 1 deletion internal/config/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package config

import "golang.org/x/exp/slices"
import (
"flag"

"golang.org/x/exp/slices"
)

const (
ServerKey = "server-config"
Expand All @@ -17,6 +21,17 @@ const (
InformerStore = "informer-store"
)

// Command line flag to determine the output mode
var (
OutputMode string
)

func init() {
flag.StringVar(&OutputMode, "output", "nats", "Output mode: 'file' or 'nats'")

// Parse the command=line flags to get the output mode
flag.Parse()
}
type PipelineConfigs []PipelineConfig

func (p PipelineConfigs) Add(pc PipelineConfig) PipelineConfigs {
Expand Down
2 changes: 1 addition & 1 deletion internal/pipeline/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (ri *RegisterInformer) GetEventHandlers() cache.ResourceEventHandlerFuncs {
if err != nil {
ri.log.Error(err)
}
ri.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
// ri.log.Info("Received ADD event for: ", obj.(*unstructured.Unstructured).GetName(), "/", obj.(*unstructured.Unstructured).GetNamespace(), " of kind: ", obj.(*unstructured.Unstructured).GroupVersionKind().Kind)
},
UpdateFunc: func(oldObj, obj interface{}) {
oldObjCasted := oldObj.(*unstructured.Unstructured)
Expand Down
80 changes: 46 additions & 34 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,43 +99,55 @@ func main() {
os.Exit(1)
}
//Skip/Comment the below connectivity test in local environment
connectivityTest(cfg.GetKey(config.BrokerURL), log)
// Initialize Broker instance
br, err := nats.New(nats.Options{
URLS: []string{cfg.GetKey(config.BrokerURL)},
ConnectionName: "meshsync",
Username: "",
Password: "",
ReconnectWait: 2 * time.Second,
MaxReconnect: 60,
})
if err != nil {
log.Error(err)
os.Exit(1)
}

chPool := channels.NewChannelPool()
meshsyncHandler, err := meshsync.New(cfg, log, br, chPool)
if err != nil {
log.Error(err)
os.Exit(1)
// check if output mode is nats
if config.OutputMode == "nats" {

log.Info("NATS output mode selected")

connectivityTest(cfg.GetKey(config.BrokerURL), log)

// Initialize Broker instance
br, err := nats.New(nats.Options{
URLS: []string{cfg.GetKey(config.BrokerURL)},
ConnectionName: "meshsync",
Username: "",
Password: "",
ReconnectWait: 2 * time.Second,
MaxReconnect: 60,
})
if err != nil {
log.Error(err)
os.Exit(1)
}

chPool := channels.NewChannelPool()
meshsyncHandler, err := meshsync.New(cfg, log, br, chPool)
if err != nil {
log.Error(err)
os.Exit(1)
}

go meshsyncHandler.WatchCRDs()

go meshsyncHandler.Run()
go meshsyncHandler.ListenToRequests()

log.Info("Server started")
// Handle graceful shutdown
signal.Notify(chPool[channels.OS].(channels.OSChannel), syscall.SIGTERM, os.Interrupt)
select {
case <-chPool[channels.OS].(channels.OSChannel):
close(chPool[channels.Stop].(channels.StopChannel))
log.Info("Shutting down")
case <-chPool[channels.Stop].(channels.StopChannel):
close(chPool[channels.Stop].(channels.StopChannel))
log.Info("Shutting down")
}
}

go meshsyncHandler.WatchCRDs()

go meshsyncHandler.Run()
go meshsyncHandler.ListenToRequests()

log.Info("Server started")
// Handle graceful shutdown
signal.Notify(chPool[channels.OS].(channels.OSChannel), syscall.SIGTERM, os.Interrupt)
select {
case <-chPool[channels.OS].(channels.OSChannel):
close(chPool[channels.Stop].(channels.StopChannel))
log.Info("Shutting down")
case <-chPool[channels.Stop].(channels.StopChannel):
close(chPool[channels.Stop].(channels.StopChannel))
log.Info("Shutting down")
if config.OutputMode == "file" {
log.Info("File output mode is not implemented yet")
}
}

Expand Down
67 changes: 54 additions & 13 deletions meshsync/handlers.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package meshsync

import (
"encoding/json"
"fmt"
"path/filepath"
"time"
"encoding/json"
"os"

"github.com/layer5io/meshkit/broker"
"github.com/layer5io/meshkit/utils"
"github.com/layer5io/meshkit/utils/kubernetes"
"github.com/layer5io/meshsync/internal/channels"
"github.com/layer5io/meshsync/internal/config"
config "github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/pkg/model"
"golang.org/x/net/context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -69,6 +71,9 @@ func (h *Handler) UpdateInformer() error {
}

func (h *Handler) ListenToRequests() {



listenerConfigs := make(map[string]config.ListenerConfig, 10)
err := h.Config.GetObject(config.ListenersKey, &listenerConfigs)
if err != nil {
Expand Down Expand Up @@ -115,16 +120,27 @@ func (h *Handler) ListenToRequests() {
storeObjects := h.listStoreObjects()
splitSlices := splitIntoMultipleSlices(storeObjects, 5) // performance of NATS is bound to degrade if huge messages are sent

h.Log.Info("Publishing the data from informer stores to the subject: ", replySubject)
for _, val := range splitSlices {
err = h.Broker.Publish(replySubject, &broker.Message{
Object: val,
if config.OutputMode == "file" {
h.Log.Info("Writing the data from informer stores to the file")
for _, val := range splitSlices {
err = writeToFile(replySubject, val)
if err != nil {
h.Log.Error(err)
continue
}
}
} else {
h.Log.Info("Publishing the data from informer stores to the subject: ", replySubject)
for _, val := range splitSlices {
err = h.Broker.Publish(replySubject, &broker.Message{
Object: val,
})
if err != nil {
h.Log.Error(err)
continue
}
}
}

case broker.ReSyncDiscoveryEntity:
h.Log.Info("Resyncing")
Expand All @@ -144,13 +160,23 @@ func (h *Handler) ListenToRequests() {
continue
}
case "meshsync-meta":
h.Log.Info("Publishing MeshSync metadata to the subject")
err := h.Broker.Publish("meshsync-meta", &broker.Message{
Object: config.Server["version"],
})
if err != nil {
h.Log.Error(err)
continue
if config.OutputMode == "file" {
h.Log.Info("Writing MeshSync metadata to the file")
err = writeToFile("meshsync-meta", config.Server["version"])
if err != nil {
h.Log.Error(err)
continue
}
} else {
h.Log.Info("Publishing MeshSync metadata to the subject")

err := h.Broker.Publish("meshsync-meta", &broker.Message{
Object: config.Server["version"],
})
if err != nil {
h.Log.Error(err)
continue
}
}
}
}
Expand Down Expand Up @@ -262,3 +288,18 @@ func splitIntoMultipleSlices(s []model.KubernetesResource, maxItmsPerSlice int)

return result
}

// writing to a file
func writeToFile(filename string, data interface{}) error {
filePath := filepath.Join("data", filename+".json")
file, err := os.Create(filePath)
if err != nil {
return err
}

defer file.Close()

enc := json.NewEncoder(file)
enc.SetIndent("", " ")
return enc.Encode(data)
}
Loading