Skip to content

Commit

Permalink
improve: provide opts to set the truncate size in text mode to reduce…
Browse files Browse the repository at this point in the history
… memory cost
  • Loading branch information
yuweizzz committed Feb 10, 2025
1 parent 0c8b6c9 commit 3bd2e08
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 9 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,7 @@ format:
@clang-format -i -style=$(STYLE) kern/openssl_masterkey_3.2.h
@clang-format -i -style=$(STYLE) kern/boringssl_masterkey.h
@clang-format -i -style=$(STYLE) utils/*.c

.PHONY: test
test:
go test -v -race ./...
7 changes: 5 additions & 2 deletions cli/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ var (
)

const (
defaultPid uint64 = 0
defaultUid uint64 = 0
defaultPid uint64 = 0
defaultUid uint64 = 0
defaultTruncateSize uint64 = 0
)

const (
Expand Down Expand Up @@ -134,6 +135,7 @@ func init() {
rootCmd.PersistentFlags().StringVarP(&globalConf.LoggerAddr, "logaddr", "l", "", "send logs to this server. -l /tmp/ecapture.log or -l tcp://127.0.0.1:8080")
rootCmd.PersistentFlags().StringVar(&globalConf.EventCollectorAddr, "eventaddr", "", "the server address that receives the captured event. --eventaddr tcp://127.0.0.1:8090, default: same as logaddr")
rootCmd.PersistentFlags().StringVar(&globalConf.Listen, "listen", eCaptureListenAddr, "listen on this address for http server, default: 127.0.0.1:28256")
rootCmd.PersistentFlags().Uint64VarP(&globalConf.TruncateSize, "tsize", "t", defaultTruncateSize, "the truncate size in text mode, default: 0, no truncate")

rootCmd.SilenceUsage = true
}
Expand All @@ -156,6 +158,7 @@ func setModConfig(globalConf config.BaseConfig, modConf config.IConfig) {
modConf.SetBTF(globalConf.BtfMode)
modConf.SetPerCpuMapSize(globalConf.PerCpuMapSize)
modConf.SetAddrType(loggerTypeStdout)
modConf.SetTruncateSize(globalConf.TruncateSize)

switch ByteCodeFiles {
case "core":
Expand Down
5 changes: 5 additions & 0 deletions pkg/event_processor/iworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (ew *eventWorker) writeEvent(e event.IEventStruct) {
// 解析类型,输出
func (ew *eventWorker) parserEvents() []byte {
ew.status = ProcessStateProcessing
tsize := int(ew.processor.truncateSize)
if tsize > 0 && ew.payload.Len() > tsize {
ew.payload.Truncate(tsize)
_ = ew.writeToChan(fmt.Sprintf("Events truncated, size: %d bytes\n", tsize))
}
parser := NewParser(ew.payload.Bytes())
ew.parser = parser
n, e := ew.parser.Write(ew.payload.Bytes())
Expand Down
7 changes: 5 additions & 2 deletions pkg/event_processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type EventProcessor struct {
errChan chan error

// output model
isHex bool
isHex bool
truncateSize uint64
}

func (ep *EventProcessor) GetLogger() io.Writer {
Expand Down Expand Up @@ -161,11 +162,13 @@ func (ep *EventProcessor) Close() error {
func (ep *EventProcessor) ErrorChan() chan error {
return ep.errChan
}
func NewEventProcessor(logger io.Writer, isHex bool) *EventProcessor {

func NewEventProcessor(logger io.Writer, isHex bool, truncateSize uint64) *EventProcessor {
var ep *EventProcessor
ep = &EventProcessor{}
ep.logger = logger
ep.isHex = isHex
ep.truncateSize = truncateSize
ep.isClosed = false
ep.init()
return ep
Expand Down
83 changes: 82 additions & 1 deletion pkg/event_processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestEventProcessor_Serve(t *testing.T) {
t.Fatal(e)
}
logger.SetOutput(f)
ep := NewEventProcessor(f, true)
// no truncate
ep := NewEventProcessor(f, true, 0)
go func() {
var err error
err = ep.Serve()
Expand Down Expand Up @@ -108,3 +109,83 @@ func TestEventProcessor_Serve(t *testing.T) {
//t.Log(string(bufString))
t.Log("done")
}

func Test_Truncated_EventProcessor_Serve(t *testing.T) {

logger := log.Default()
//var buf bytes.Buffer
//logger.SetOutput(&buf)
var output = "./output_truncated.log"
f, e := os.Create(output)
if e != nil {
t.Fatal(e)
}
logger.SetOutput(f)

// truncate 1000 bytes
ep := NewEventProcessor(f, true, 1000)
go func() {
var err error
err = ep.Serve()
if err != nil {
//log.Fatalf(err.Error())
t.Error(err)
return
}
}()

lines := []string{
// short, no truncated
`{"DataType":0,"Timestamp":952253597324253,"Pid":469929,"Tid":469929,"DataLen":308,"Comm":[119,103,101,116,0,0,0,0,0,0,0,0,0,0,0,0],"Fd":3,"Version":771}`,
// long, should truncated
`{"DataType":0,"Timestamp":952282712204824,"Pid":469953,"Tid":469953,"DataLen":4096,"Comm":[99,117,114,108,0,0,0,0,0,0,0,0,0,0,0,0],"Fd":5,"Version":771}`,
}

for _, line := range lines {
if line == "" {
continue
}
var eventSSL SSLDataEventTmp
err := json.Unmarshal([]byte(line), &eventSSL)
if err != nil {
t.Fatalf("json unmarshal error: %s, body:%v", err.Error(), line)
}
payloadFile := fmt.Sprintf("testdata/%d.bin", eventSSL.Timestamp)
b, e := os.ReadFile(payloadFile)
if e != nil {
t.Fatalf("read payload file error: %s, file:%s", e.Error(), payloadFile)
}
copy(eventSSL.Data[:], b)
ep.Write(&BaseEvent{DataLen: eventSSL.DataLen, Data: eventSSL.Data, DataType: eventSSL.DataType, Timestamp: eventSSL.Timestamp, Pid: eventSSL.Pid, Tid: eventSSL.Tid, Comm: eventSSL.Comm, Fd: eventSSL.Fd, Version: eventSSL.Version})
}

tick := time.NewTicker(time.Second * 10)
<-tick.C

err := ep.Close()
logger.SetOutput(io.Discard)
bufString, e := os.ReadFile(output)
if e != nil {
t.Fatal(e)
}

lines = strings.Split(string(bufString), "\n")
ok := true
for _, line := range lines {
// truncated once
if strings.Contains(line, "Events truncated, size:") {
t.Log(line)
}
}

if err != nil {
t.Fatalf("close error: %s", err.Error())
}

if !ok {
t.Fatalf("some errors occurred")
}
//t.Log(string(bufString))
t.Log("done")
}

18 changes: 15 additions & 3 deletions user/config/iconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type IConfig interface {
EnableGlobalVar() bool
// Bytes serializes the configuration to JSON bytes
Bytes() []byte
// Set/Get TruncateSize
SetTruncateSize(uint64)
GetTruncateSize() uint64
}

// TLS capture mode constants defining different output formats
Expand Down Expand Up @@ -90,9 +93,10 @@ const (

// BaseConfig implements the IConfig interface and holds the basic configuration settings
type BaseConfig struct {
Pid uint64 `json:"pid"` // Process ID to monitor
Uid uint64 `json:"uid"` // User ID to monitor
Listen string `json:"listen"` // Listen address for the server (default: 127.0.0.1:28256)
Pid uint64 `json:"pid"` // Process ID to monitor
Uid uint64 `json:"uid"` // User ID to monitor
Listen string `json:"listen"` // Listen address for the server (default: 127.0.0.1:28256)
TruncateSize uint64 `json:"truncate_size"` // truncate size in text mode

// eBPF map configuration
PerCpuMapSize int `json:"per_cpu_map_size"` // Size of eBPF map per CPU core
Expand Down Expand Up @@ -173,6 +177,14 @@ func (c *BaseConfig) SetPerCpuMapSize(size int) {
c.PerCpuMapSize = size * os.Getpagesize()
}

func (c *BaseConfig) SetTruncateSize(TruncateSize uint64) {
c.TruncateSize = TruncateSize
}

func (c *BaseConfig) GetTruncateSize() uint64 {
return c.TruncateSize
}

func (c *BaseConfig) EnableGlobalVar() bool {
kv, err := kernel.HostVersion()
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion user/module/imodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
m.isKernelLess5_2 = false //set false default
m.eventCollector = eventCollector
//var epl = epLogger{logger: logger}
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex())
tsize := conf.GetTruncateSize()
m.processor = event_processor.NewEventProcessor(eventCollector, conf.GetHex(), tsize)

go func() {
// 读取错误信息
Expand All @@ -129,6 +130,7 @@ func (m *Module) Init(ctx context.Context, logger *zerolog.Logger, conf config.I
}

logger.Info().Int("Pid", os.Getpid()).Str("Kernel Info", kv.String()).Send()
logger.Info().Int("TruncateSize", int(tsize)).Send()

if conf.GetBTF() == config.BTFModeAutoDetect {
// 如果是自动检测模式
Expand Down

0 comments on commit 3bd2e08

Please sign in to comment.