Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve: provide opts to set the truncate size in text mode to reduce memory cost #731

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,7 @@ format:
@clang-format -i -style=$(STYLE) kern/openssl_masterkey_3.2.h
@clang-format -i -style=$(STYLE) kern/boringssl_masterkey.h
@clang-format -i -style=$(STYLE) utils/*.c

.PHONY: test
test:
go test -v -race ./...
7 changes: 5 additions & 2 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ var (
)

const (
defaultPid uint64 = 0
defaultUid uint64 = 0
defaultPid uint64 = 0
defaultUid uint64 = 0
defaultTruncateSize uint64 = 0
)

const (
Expand Down Expand Up @@ -134,6 +135,7 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&globalConf.LoggerAddr, "logaddr", "l", "", "send logs to this server. -l /tmp/ecapture.log or -l tcp://127.0.0.1:8080")
rootCmd.PersistentFlags().StringVar(&globalConf.EventCollectorAddr, "eventaddr", "", "the server address that receives the captured event. --eventaddr tcp://127.0.0.1:8090, default: same as logaddr")
rootCmd.PersistentFlags().StringVar(&globalConf.Listen, "listen", eCaptureListenAddr, "listen on this address for http server, default: 127.0.0.1:28256")
rootCmd.PersistentFlags().Uint64VarP(&globalConf.TruncateSize, "tsize", "t", defaultTruncateSize, "the truncate size in text mode, default: 0 (B), no truncate")

rootCmd.SilenceUsage = true
}
Expand All @@ -156,6 +158,7 @@ func setModConfig(globalConf config.BaseConfig, modConf config.IConfig) {
modConf.SetBTF(globalConf.BtfMode)
modConf.SetPerCpuMapSize(globalConf.PerCpuMapSize)
modConf.SetAddrType(loggerTypeStdout)
modConf.SetTruncateSize(globalConf.TruncateSize)

switch ByteCodeFiles {
case "core":
Expand Down
5 changes: 5 additions & 0 deletions pkg/event_processor/iworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (ew *eventWorker) writeEvent(e event.IEventStruct) {
// 解析类型,输出
func (ew *eventWorker) parserEvents() []byte {
ew.status = ProcessStateProcessing
tsize := int(ew.processor.truncateSize)
if tsize > 0 && ew.payload.Len() > tsize {
ew.payload.Truncate(tsize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is it just judging the size when parsing the first package? A more suitable solution should be when receiving each Event? For example
    ew.payload.Write(e.Payload())
  2. Is it the most reasonable solution to discard it directly when the payload is greater than tsize? Is it more appropriate to continue receiving after printing out?

_ = ew.writeToChan(fmt.Sprintf("Events truncated, size: %d bytes\n", tsize))
}
parser := NewParser(ew.payload.Bytes())
ew.parser = parser
n, e := ew.parser.Write(ew.payload.Bytes())
Expand Down
7 changes: 5 additions & 2 deletions pkg/event_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type EventProcessor struct {
errChan chan error

// output model
isHex bool
isHex bool
truncateSize uint64
}

func (ep *EventProcessor) GetLogger() io.Writer {
Expand Down Expand Up @@ -161,11 +162,13 @@ func (ep *EventProcessor) Close() error {
func (ep *EventProcessor) ErrorChan() chan error {
return ep.errChan
}
func NewEventProcessor(logger io.Writer, isHex bool) *EventProcessor {

func NewEventProcessor(logger io.Writer, isHex bool, truncateSize uint64) *EventProcessor {
var ep *EventProcessor
ep = &EventProcessor{}
ep.logger = logger
ep.isHex = isHex
ep.truncateSize = truncateSize
ep.isClosed = false
ep.init()
return ep
Expand Down
83 changes: 82 additions & 1 deletion pkg/event_processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestEventProcessor_Serve(t *testing.T) {
t.Fatal(e)
}
logger.SetOutput(f)
ep := NewEventProcessor(f, true)
// no truncate
ep := NewEventProcessor(f, true, 0)
go func() {
var err error
err = ep.Serve()
Expand Down Expand Up @@ -108,3 +109,83 @@ func TestEventProcessor_Serve(t *testing.T) {
//t.Log(string(bufString))
t.Log("done")
}

func Test_Truncated_EventProcessor_Serve(t *testing.T) {

logger := log.Default()
//var buf bytes.Buffer
//logger.SetOutput(&buf)
var output = "./output_truncated.log"
f, e := os.Create(output)
if e != nil {
t.Fatal(e)
}
logger.SetOutput(f)

// truncate 1000 bytes
ep := NewEventProcessor(f, true, 1000)
go func() {
var err error
err = ep.Serve()
if err != nil {
//log.Fatalf(err.Error())
t.Error(err)
return
}
}()

lines := []string{
// short, no truncated
`{"DataType":0,"Timestamp":952253597324253,"Pid":469929,"Tid":469929,"DataLen":308,"Comm":[119,103,101,116,0,0,0,0,0,0,0,0,0,0,0,0],"Fd":3,"Version":771}`,
// long, should truncated
`{"DataType":0,"Timestamp":952282712204824,"Pid":469953,"Tid":469953,"DataLen":4096,"Comm":[99,117,114,108,0,0,0,0,0,0,0,0,0,0,0,0],"Fd":5,"Version":771}`,
}

for _, line := range lines {
if line == "" {
continue
}
var eventSSL SSLDataEventTmp
err := json.Unmarshal([]byte(line), &eventSSL)
if err != nil {
t.Fatalf("json unmarshal error: %s, body:%v", err.Error(), line)
}
payloadFile := fmt.Sprintf("testdata/%d.bin", eventSSL.Timestamp)
b, e := os.ReadFile(payloadFile)
if e != nil {
t.Fatalf("read payload file error: %s, file:%s", e.Error(), payloadFile)
}
copy(eventSSL.Data[:], b)
ep.Write(&BaseEvent{DataLen: eventSSL.DataLen, Data: eventSSL.Data, DataType: eventSSL.DataType, Timestamp: eventSSL.Timestamp, Pid: eventSSL.Pid, Tid: eventSSL.Tid, Comm: eventSSL.Comm, Fd: eventSSL.Fd, Version: eventSSL.Version})
}

tick := time.NewTicker(time.Second * 10)
<-tick.C

err := ep.Close()
logger.SetOutput(io.Discard)
bufString, e := os.ReadFile(output)
if e != nil {
t.Fatal(e)
}

lines = strings.Split(string(bufString), "\n")
ok := true
for _, line := range lines {
// truncated once
if strings.Contains(line, "Events truncated, size:") {
t.Log(line)
}
}

if err != nil {
t.Fatalf("close error: %s", err.Error())
}

if !ok {
t.Fatalf("some errors occurred")
}
//t.Log(string(bufString))
t.Log("done")
}

18 changes: 15 additions & 3 deletions user/config/iconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type IConfig interface {
EnableGlobalVar() bool
// Bytes serializes the configuration to JSON bytes
Bytes() []byte
// Set/Get TruncateSize
SetTruncateSize(uint64)
GetTruncateSize() uint64
}

// TLS capture mode constants defining different output formats
Expand Down Expand Up @@ -90,9 +93,10 @@ const (

// BaseConfig implements the IConfig interface and holds the basic configuration settings
type BaseConfig struct {
Pid uint64 `json:"pid"` // Process ID to monitor
Uid uint64 `json:"uid"` // User ID to monitor
Listen string `json:"listen"` // Listen address for the server (default: 127.0.0.1:28256)
Pid uint64 `json:"pid"` // Process ID to monitor
Uid uint64 `json:"uid"` // User ID to monitor
Listen string `json:"listen"` // Listen address for the server (default: 127.0.0.1:28256)
TruncateSize uint64 `json:"truncate_size"` // truncate size in text mode

// eBPF map configuration
PerCpuMapSize int `json:"per_cpu_map_size"` // Size of eBPF map per CPU core
Expand Down Expand Up @@ -173,6 +177,14 @@ func (c *BaseConfig) SetPerCpuMapSize(size int) {
c.PerCpuMapSize = size * os.Getpagesize()
}

func (c *BaseConfig) SetTruncateSize(TruncateSize uint64) {
c.TruncateSize = TruncateSize
}

func (c *BaseConfig) GetTruncateSize() uint64 {
return c.TruncateSize
}

func (c *BaseConfig) EnableGlobalVar() bool {
kv, err := kernel.HostVersion()
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion user/module/imodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
m.isKernelLess5_2 = false //set false default
m.eventCollector = eventCollector
//var epl = epLogger{logger: logger}
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex())
tsize := conf.GetTruncateSize()
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex(), tsize)

go func() {
// 读取错误信息
Expand All @@ -129,6 +130,7 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
}

logger.Info().Int("Pid", os.Getpid()).Str("Kernel Info", kv.String()).Send()
logger.Info().Int("TruncateSize", int(tsize)).Str("Unit", "bytes").Send()

if conf.GetBTF() == config.BTFModeAutoDetect {
// 如果是自动检测模式
Expand Down
Loading