From 9d50adde8c2797e6c12e31923825eb405279178f Mon Sep 17 00:00:00 2001 From: Hugefiver Date: Thu, 19 Dec 2024 21:25:45 +0800 Subject: [PATCH 01/13] fix: limit ffmpeg threads --- base/sticker.go | 124 +++++++++++++++++++++--------------- util/ffconv/convert_file.go | 8 ++- 2 files changed, 78 insertions(+), 54 deletions(-) diff --git a/base/sticker.go b/base/sticker.go index f8415b16..c43f4745 100644 --- a/base/sticker.go +++ b/base/sticker.go @@ -4,6 +4,7 @@ package base import ( "archive/zip" "bytes" + "context" "errors" "fmt" "image" @@ -301,7 +302,7 @@ func sendVideoSticker(ctx tb.Context, sticker *tb.Sticker, filename string, emoj }(reader) switch f { - case "webm": + case "", "webm": sendFile := &tb.Document{ File: tb.FromReader(reader), FileName: filename + ".webm", @@ -351,7 +352,9 @@ func sendVideoSticker(ctx tb.Context, sticker *tb.Sticker, filename string, emoj return ctx.Reply(sendFile) case "mp4": ff := ffconv.FFConv{LogCmd: true} - r, errCh := ff.ConvertPipe2File(reader, "webm", nil, filename+".mp4") + cc, cancel := context.WithCancel(context.Background()) + defer cancel() + r, errCh := ff.ConvertPipe2File(cc, reader, "webm", nil, filename+".mp4") defer func() { _ = r.Close() }() @@ -363,6 +366,7 @@ func sendVideoSticker(ctx tb.Context, sticker *tb.Sticker, filename string, emoj return errors.Join(err, err1) } case <-time.After(time.Second * 30): + cancel() log.Error("wait ffmpeg exec result timeout", zap.String("filename", filename), zap.String("convert_format", f)) return ctx.Reply("convert to mp4 failed") } @@ -409,10 +413,24 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error lastNotify := time.Now() _ = ctx.Notify(tb.ChoosingSticker) - const MaxTask = 5 + const MaxTask = 2 + // const FFmpegThreadsPerTask = 2 taskGroup := sync.WaitGroup{} limitCh := make(chan struct{}, MaxTask) - errCh := make(chan error) + // errCh := make(chan error, len(stickerSet.Stickers)) + errArray := make([]error, 0, len(stickerSet.Stickers)) + errArrayLock := sync.Mutex{} + // errNotify := make(chan struct{}) + cc, cancel := context.WithCancelCause(context.Background()) + defer cancel(nil) + + reth := func(err error) { + // errCh <- err + errArrayLock.Lock() + errArray = append(errArray, err) + errArrayLock.Unlock() + cancel(err) + } replyMsg := "" replyMsgLock := sync.Mutex{} @@ -420,6 +438,17 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error for i := range stickerSet.Stickers { s := &stickerSet.Stickers[i] + // limit concurrency + limitCh <- struct{}{} + taskGroup.Add(1) + + if cc.Err() != nil { + <-limitCh + taskGroup.Done() + + break + } + now := time.Now() if now.Sub(lastNotify) > time.Second*3 { _ = ctx.Notify(tb.ChoosingSticker) @@ -433,17 +462,6 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error filename := fmt.Sprintf("%s_%03d%s%s", stickerSet.Name, i+1, emoji, opt.FileExt(s.Video)) outputFiles = append(outputFiles, filename) - // limit concurrency - limitCh <- struct{}{} - taskGroup.Add(1) - - if len(errCh) > 0 { - <-limitCh - taskGroup.Done() - - break - } - go func() { defer func() { <-limitCh @@ -456,21 +474,21 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error of, err := os.OpenFile(path.Join(tempDir, filename), os.O_CREATE|os.O_RDWR, 0o640) if err != nil { - errCh <- err + reth(err) return } defer func() { _ = of.Close() }() fileR, err := ctx.Bot().File(&s.File) if err != nil { - errCh <- err + reth(err) return } defer func() { _ = fileR.Close() }() _, err = io.Copy(of, fileR) if err != nil { - errCh <- err + reth(err) } } else if s.Video { f := opt.VideoFormat() @@ -478,10 +496,13 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error ff := ffconv.FFConv{LogCmd: true} var outputArgs []ffmpeg_go.KwArgs + // limit ffmpeg threads + // outputArgs = append(outputArgs, + // ffmpeg_go.KwArgs{"threads": FFmpegThreadsPerTask}) + fileR, err := ctx.Bot().File(&s.File) if err != nil { - // err2 := ctx.Reply("failed to get sticker file") - errCh <- err + reth(err) return } @@ -500,41 +521,38 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error case "mp4": // nothing to do } - r, errCh2 := ff.ConvertPipe2File(fileR, "", input, path.Join(tempDir, filename), outputArgs...) - defer func() { - _ = r.Close() - }() + ccc, cancel := context.WithTimeout(cc, time.Second*30) + defer cancel() + _, errCh2 := ff.ConvertPipe2File(ccc, fileR, "", input, path.Join(tempDir, filename), outputArgs...) select { - case othersErr := <-errCh: - // exit when other process failed - if othersErr != nil { - errCh <- othersErr - } case err := <-errCh2: if err != nil { log.Error("failed to convert", zap.Error(err)) - errCh <- err + reth(err) } - case <-time.After(time.Second * 30): - log.Error("wait ffmpeg exec result timeout", zap.String("filename", filename), zap.String("convert_format", f)) - - replyMsgLock.Lock() - if replyMsg == "" { - replyMsg = "convert video sticker failed" + return + case <-ccc.Done(): + if errors.Is(ccc.Err(), context.DeadlineExceeded) { + log.Error("wait ffmpeg exec result timeout", zap.String("filename", filename), zap.String("convert_format", f)) + + replyMsgLock.Lock() + if replyMsg == "" { + replyMsg = "convert video sticker failed" + } + replyMsgLock.Unlock() + + // TODO use static error + // nolint: err113 + reth(errors.New("wait ffmpeg exec result timeout")) } - replyMsgLock.Unlock() - - // TODO use static error - // nolint: err113 - errCh <- errors.New("wait ffmpeg exec result timeout") } } else { f := opt.StickerFormat() fileR, err := ctx.Bot().File(&s.File) if err != nil { - // err2 := ctx.Reply("failed to get sticker file") - errCh <- err + reth(err) + return } defer func() { @@ -549,14 +567,14 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error } replyMsgLock.Unlock() - errCh <- err1 + reth(err1) return } const ErrConvertMsg = "failed to convert image format" of, err1 := os.OpenFile(path.Join(tempDir, filename), os.O_CREATE|os.O_WRONLY, 0o640) if err1 != nil { - errCh <- errors.Join(err1 /* ctx.Reply(ErrConvertMsg) */) + reth(err1) return } defer func() { _ = of.Close() }() @@ -578,7 +596,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error // TODO use static error // nolint: err113 - errCh <- errors.New("unknown target image format") + reth(errors.New("unknown target image format")) return } @@ -589,7 +607,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error } replyMsgLock.Unlock() - errCh <- err2 + reth(err2) } } }() @@ -597,16 +615,16 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error taskGroup.Wait() - if len(errCh) > 0 { - errs := make([]error, 0, len(errCh)) - for err := range errCh { - errs = append(errs, err) - } + if cc.Err() != nil { + // errs := make([]error, 0, len(errCh)) + // for err := range errCh { + // errs = append(errs, err) + // } if replyMsg == "" { replyMsg = "process failed" } _ = ctx.Reply(replyMsg) - return errors.Join(errs...) + return errors.Join(errArray...) } if len(outputFiles) == 0 { diff --git a/util/ffconv/convert_file.go b/util/ffconv/convert_file.go index 7335a4e0..6c60e5bb 100644 --- a/util/ffconv/convert_file.go +++ b/util/ffconv/convert_file.go @@ -1,6 +1,7 @@ package ffconv import ( + "context" "csust-got/log" "io" "os" @@ -13,7 +14,7 @@ import ( // ConvertPipe2File read media file from reader `r` and convert it to file with default/provided options // return the converted data readcloser and a channel of run error, and delete the temp work dir when readcloser closed -func (c *FFConv) ConvertPipe2File(r io.Reader, inputFileType string, input *ff.Stream, +func (c *FFConv) ConvertPipe2File(ctx context.Context, r io.Reader, inputFileType string, input *ff.Stream, outputFilename string, outputArgs ...ff.KwArgs) (io.ReadCloser, <-chan error) { if input == nil { input = GetPipeInputStream(inputFileType) @@ -51,6 +52,10 @@ func (c *FFConv) ConvertPipe2File(r io.Reader, inputFileType string, input *ff.S } go func() { + if ctx != nil { + runner.Context = ctx + } + if stderrCloser != nil { defer func() { _ = stderrCloser.Close() @@ -59,6 +64,7 @@ func (c *FFConv) ConvertPipe2File(r io.Reader, inputFileType string, input *ff.S err := runner.Run() resultCh <- err }() + return &OutputFileReaderImpl{ TempWorkDir: workdir, File: outputFile, From 79e9be13f18cf29e705e4460126da6f35738199c Mon Sep 17 00:00:00 2001 From: icceey Date: Fri, 20 Dec 2024 01:36:27 +0800 Subject: [PATCH 02/13] fix deadlock and ffmpeg wrong context --- base/sticker.go | 22 +++++++++++++++------- main.go | 2 +- util/ffconv/convert_file.go | 6 +++--- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/base/sticker.go b/base/sticker.go index c43f4745..25db5e34 100644 --- a/base/sticker.go +++ b/base/sticker.go @@ -413,7 +413,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error lastNotify := time.Now() _ = ctx.Notify(tb.ChoosingSticker) - const MaxTask = 2 + const MaxTask = 5 // const FFmpegThreadsPerTask = 2 taskGroup := sync.WaitGroup{} limitCh := make(chan struct{}, MaxTask) @@ -442,12 +442,12 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error limitCh <- struct{}{} taskGroup.Add(1) - if cc.Err() != nil { - <-limitCh - taskGroup.Done() + // if cc.Err() != nil { + // <-limitCh + // taskGroup.Done() - break - } + // break + // } now := time.Now() if now.Sub(lastNotify) > time.Second*3 { @@ -662,13 +662,21 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error return errors.Join(err1, err2) } } + fileInfo, _ := packFile.Stat() _ = compress.Close() _ = packFile.Close() + cpFile := tb.FromDisk(packFile.Name()) err = ctx.Reply(&tb.Document{ FileName: fmt.Sprintf("%s-%s%s", stickerSet.Name, stickerSet.Title, ".zip"), - File: tb.FromDisk(packFile.Name()), + File: cpFile, }) + if errors.Is(err, tb.ErrTooLarge) { + if fileInfo != nil { + return ctx.Reply(fmt.Sprintf("太...太大了...有%.2fMB辣么大", float64(fileInfo.Size())/1024/1024)) + } + return ctx.Reply("太大了,反正就是大") + } return err } diff --git a/main.go b/main.go index 5547840a..5a45140b 100644 --- a/main.go +++ b/main.go @@ -78,7 +78,7 @@ func main() { func initBot() (*Bot, error) { errorHandler := func(err error, c Context) { - log.Error("bot has error", zap.Any("context", c), zap.Error(err)) + log.Error("bot has error", zap.Any("update", c.Update()), zap.Error(err)) } httpClient := http.DefaultClient diff --git a/util/ffconv/convert_file.go b/util/ffconv/convert_file.go index 6c60e5bb..3490ec0f 100644 --- a/util/ffconv/convert_file.go +++ b/util/ffconv/convert_file.go @@ -18,6 +18,9 @@ func (c *FFConv) ConvertPipe2File(ctx context.Context, r io.Reader, inputFileTyp outputFilename string, outputArgs ...ff.KwArgs) (io.ReadCloser, <-chan error) { if input == nil { input = GetPipeInputStream(inputFileType) + if ctx != nil { + input.Context = ctx + } } stderr := io.Discard @@ -52,9 +55,6 @@ func (c *FFConv) ConvertPipe2File(ctx context.Context, r io.Reader, inputFileTyp } go func() { - if ctx != nil { - runner.Context = ctx - } if stderrCloser != nil { defer func() { From 5767733a7f8be207162dd7ade9f49f521cd3dace Mon Sep 17 00:00:00 2001 From: Hugefiver Date: Fri, 20 Dec 2024 15:09:55 +0800 Subject: [PATCH 03/13] feat(sticker): add webp and [a]png format for video sticker --- base/sticker.go | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/base/sticker.go b/base/sticker.go index 25db5e34..b167349d 100644 --- a/base/sticker.go +++ b/base/sticker.go @@ -350,11 +350,26 @@ func sendVideoSticker(ctx tb.Context, sticker *tb.Sticker, filename string, emoj DisableTypeDetection: true, } return ctx.Reply(sendFile) - case "mp4": + case "mp4", "png", "apng", "webp": ff := ffconv.FFConv{LogCmd: true} + outputArgs := []ffmpeg_go.KwArgs{} + if f == "png" || f == "apng" { + outputArgs = append(outputArgs, ffmpeg_go.KwArgs{ + "plays": "0", + "f": "apng", + "c:v": "apng", + }) + } else if f == "webp" { + outputArgs = append(outputArgs, ffmpeg_go.KwArgs{ + "plays": "0", + "f": "webp", + "c:v": "libwebp", + }) + } + cc, cancel := context.WithCancel(context.Background()) defer cancel() - r, errCh := ff.ConvertPipe2File(cc, reader, "webm", nil, filename+".mp4") + r, errCh := ff.ConvertPipe2File(cc, reader, "webm", nil, filename+"."+f, outputArgs...) defer func() { _ = r.Close() }() @@ -372,7 +387,7 @@ func sendVideoSticker(ctx tb.Context, sticker *tb.Sticker, filename string, emoj } sendFile := &tb.Document{ File: tb.FromReader(r), - FileName: filename + ".mp4", + FileName: filename + "." + f, Caption: emoji, DisableTypeDetection: true, } @@ -520,6 +535,18 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error }) case "mp4": // nothing to do + case "png", "apng": + outputArgs = append(outputArgs, ffmpeg_go.KwArgs{ + "loop": "0", + "c:v": "apng", + "f": "apng", + }) + case "webp": + outputArgs = append(outputArgs, ffmpeg_go.KwArgs{ + "loop": "0", + "c:v": "libwebp", + "f": "webp", + }) } ccc, cancel := context.WithTimeout(cc, time.Second*30) defer cancel() @@ -698,7 +725,7 @@ func parseOpts(text string) (map[string]string, error) { switch strings.ToLower(k) { case "format", "f": f := strings.ToLower(v) - if slices.Contains([]string{"", "webp", "jpg", "jpeg", "png", "mp4", "gif", "webm"}, f) { + if slices.Contains([]string{"", "webp", "jpg", "jpeg", "png", "apng", "mp4", "gif", "webm"}, f) { ret[k] = v } case "pack", "p": @@ -709,7 +736,7 @@ func parseOpts(text string) (map[string]string, error) { } case "vf", "videoformat": f := strings.ToLower(v) - if slices.Contains([]string{"", "mp4", "gif", "webm"}, f) { + if slices.Contains([]string{"", "mp4", "gif", "webm", "webp", "png", "apng"}, f) { ret[k] = v } case "sf", "stickerformat": From c4ae6b044d2f6f66750b4274e24fedebf25d967c Mon Sep 17 00:00:00 2001 From: Hugefiver Date: Fri, 20 Dec 2024 15:13:56 +0800 Subject: [PATCH 04/13] update ffmpeg to 7.1 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 042f652d..2e5bf995 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,7 @@ RUN make deploy FROM --platform=$BUILDPLATFORM alpine RUN apk add --no-cache tzdata -COPY --from=hugefiver/ffmpeg:7.0.1-2 /ffmpeg /usr/local/bin/ffmpeg +COPY --from=hugefiver/ffmpeg:7.1 /ffmpeg /usr/local/bin/ffmpeg # COPY --from=hugefiver/ffmpeg:7.0.1 /ffprobe /usr/local/bin/ffprobe WORKDIR /app From 2f1b562ffe1a7482145eec68a2608a44b613e1a6 Mon Sep 17 00:00:00 2001 From: Hugefiver Date: Fri, 20 Dec 2024 15:26:12 +0800 Subject: [PATCH 05/13] smol fix --- base/sticker.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/base/sticker.go b/base/sticker.go index b167349d..11a0360e 100644 --- a/base/sticker.go +++ b/base/sticker.go @@ -249,6 +249,7 @@ func sendImageSticker(ctx tb.Context, sticker *tb.Sticker, filename string, emoj } bs := bytes.NewBuffer(nil) + f = strings.ToLower(f) switch f { case "jpg", "jpeg": filename += ".jpg" @@ -301,6 +302,7 @@ func sendVideoSticker(ctx tb.Context, sticker *tb.Sticker, filename string, emoj } }(reader) + f = strings.ToLower(f) switch f { case "", "webm": sendFile := &tb.Document{ @@ -548,7 +550,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error "f": "webp", }) } - ccc, cancel := context.WithTimeout(cc, time.Second*30) + ccc, cancel := context.WithTimeout(cc, time.Second*120) defer cancel() _, errCh2 := ff.ConvertPipe2File(ccc, fileR, "", input, path.Join(tempDir, filename), outputArgs...) select { From db3b4ca8e8faaf18f5f79a930f8d07487f43c783 Mon Sep 17 00:00:00 2001 From: Hugefiver Date: Fri, 20 Dec 2024 15:47:28 +0800 Subject: [PATCH 06/13] dockerfile: download deps before build --- Dockerfile | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2e5bf995..c3653b4f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -8,12 +8,14 @@ ARG BRANCH ARG TAG ARG RELEASE +WORKDIR /go/src/app +COPY . . +RUN make deps + ENV BRANCH=$BRANCH ENV TAG=$TAG ENV GOARCH=$TARGETARCH -WORKDIR /go/src/app -COPY . . RUN make deploy From c7d140bb95a7476657b2f734cacb1160282b9d26 Mon Sep 17 00:00:00 2001 From: icceey Date: Fri, 20 Dec 2024 22:22:30 +0800 Subject: [PATCH 07/13] feat(logging): add log file directory configuration and create logs directory Co-authored-by: Hugefiver --- .gitignore | 2 ++ config.yaml | 1 + config/config.go | 6 ++++++ log/log.go | 19 ++++++++++++------- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 6985cfb0..2f5d0f64 100644 --- a/.gitignore +++ b/.gitignore @@ -117,6 +117,7 @@ fabric.properties data/ .vscode/ +.idea/ # output got @@ -126,3 +127,4 @@ csust-got.exe a.http .idea/GitLink.xml dictionary.txt +/logs \ No newline at end of file diff --git a/config.yaml b/config.yaml index 61ddd377..62b11534 100644 --- a/config.yaml +++ b/config.yaml @@ -7,6 +7,7 @@ token: "" proxy: "" # [http:// | socks5://] host:port listen: ":7777" skip_duration: 0 # skip expired message, duration in seconds, set to 0 to disable [int] +log_file_dir: "logs" black_list: enabled: true diff --git a/config/config.go b/config/config.go index bf6291ba..c53d7efd 100644 --- a/config/config.go +++ b/config/config.go @@ -69,6 +69,7 @@ type Config struct { Listen string DebugMode bool SkipDuration int64 + LogFileDir string RedisConfig *redisConfig RestrictConfig *restrictConfig @@ -116,6 +117,7 @@ func readConfig() { BotConfig.Proxy = viper.GetString("proxy") BotConfig.Listen = viper.GetString("listen") BotConfig.SkipDuration = viper.GetInt64("skip_duration") + BotConfig.LogFileDir = viper.GetString("log_file_dir") // other BotConfig.RedisConfig.readConfig() @@ -149,6 +151,10 @@ func checkConfig() { if BotConfig.SkipDuration < 0 { BotConfig.SkipDuration = 0 } + if BotConfig.LogFileDir == "" { + BotConfig.LogFileDir = "logs" + } + BotConfig.LogFileDir = strings.TrimRight(BotConfig.LogFileDir, "/") BotConfig.RedisConfig.checkConfig() BotConfig.RestrictConfig.checkConfig() diff --git a/log/log.go b/log/log.go index 2fd9ca75..2dbd187b 100644 --- a/log/log.go +++ b/log/log.go @@ -3,6 +3,7 @@ package log import ( "csust-got/config" "csust-got/prom" + "os" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -19,6 +20,10 @@ func InitLogger() { // NewLogger new logger. func NewLogger() *zap.Logger { var logConfig zap.Config + // create log dir if not exists + if err := os.MkdirAll(config.BotConfig.LogFileDir, 0755); err != nil { + zap.L().Fatal("Create log dir failed", zap.Error(err)) + } if config.BotConfig.DebugMode { logConfig = devConfig() } else { @@ -37,7 +42,7 @@ func devConfig() zap.Config { return zap.Config{ Level: zap.NewAtomicLevelAt(zap.DebugLevel), Development: true, - Encoding: "console", + Encoding: "json", EncoderConfig: zapcore.EncoderConfig{ TimeKey: "ts", LevelKey: "level", @@ -47,13 +52,13 @@ func devConfig() zap.Config { MessageKey: "msg", StacktraceKey: "stacktrace", LineEnding: zapcore.DefaultLineEnding, - EncodeLevel: zapcore.CapitalColorLevelEncoder, + EncodeLevel: zapcore.CapitalLevelEncoder, EncodeTime: zapcore.ISO8601TimeEncoder, EncodeDuration: zapcore.StringDurationEncoder, EncodeCaller: zapcore.ShortCallerEncoder, }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, + OutputPaths: []string{"stderr", config.BotConfig.LogFileDir + "/got.log"}, + ErrorOutputPaths: []string{"stderr", config.BotConfig.LogFileDir + "/got_err.log"}, } } @@ -75,13 +80,13 @@ func prodConfig() zap.Config { MessageKey: "msg", StacktraceKey: "stacktrace", LineEnding: zapcore.DefaultLineEnding, - EncodeLevel: zapcore.LowercaseLevelEncoder, + EncodeLevel: zapcore.CapitalLevelEncoder, EncodeTime: zapcore.EpochTimeEncoder, EncodeDuration: zapcore.SecondsDurationEncoder, EncodeCaller: zapcore.ShortCallerEncoder, }, - OutputPaths: []string{"stderr"}, - ErrorOutputPaths: []string{"stderr"}, + OutputPaths: []string{"stderr", config.BotConfig.LogFileDir + "/got.log"}, + ErrorOutputPaths: []string{"stderr", config.BotConfig.LogFileDir + "/got_err.log"}, } } From b90ba353f1e51d6d9def0f95f0fcf981ba2fc271 Mon Sep 17 00:00:00 2001 From: icceey Date: Fri, 20 Dec 2024 23:22:26 +0800 Subject: [PATCH 08/13] refactor(sticker): simplify concurrency handling using errgroup Co-authored-by: Hugefiver --- base/sticker.go | 98 +++++++++++++++---------------------------------- 1 file changed, 30 insertions(+), 68 deletions(-) diff --git a/base/sticker.go b/base/sticker.go index 11a0360e..8f724893 100644 --- a/base/sticker.go +++ b/base/sticker.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "golang.org/x/sync/errgroup" "image" "image/gif" "image/jpeg" @@ -20,7 +21,7 @@ import ( "time" ffmpeg_go "github.com/u2takey/ffmpeg-go" - //nolint: revive + // nolint: revive _ "golang.org/x/image/webp" "go.uber.org/zap" @@ -431,23 +432,8 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error _ = ctx.Notify(tb.ChoosingSticker) const MaxTask = 5 - // const FFmpegThreadsPerTask = 2 - taskGroup := sync.WaitGroup{} - limitCh := make(chan struct{}, MaxTask) - // errCh := make(chan error, len(stickerSet.Stickers)) - errArray := make([]error, 0, len(stickerSet.Stickers)) - errArrayLock := sync.Mutex{} - // errNotify := make(chan struct{}) - cc, cancel := context.WithCancelCause(context.Background()) - defer cancel(nil) - - reth := func(err error) { - // errCh <- err - errArrayLock.Lock() - errArray = append(errArray, err) - errArrayLock.Unlock() - cancel(err) - } + taskGroup, cc := errgroup.WithContext(context.Background()) + taskGroup.SetLimit(MaxTask) replyMsg := "" replyMsgLock := sync.Mutex{} @@ -455,16 +441,11 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error for i := range stickerSet.Stickers { s := &stickerSet.Stickers[i] - // limit concurrency - limitCh <- struct{}{} - taskGroup.Add(1) - - // if cc.Err() != nil { - // <-limitCh - // taskGroup.Done() - - // break - // } + select { + case <-cc.Done(): + break + default: + } now := time.Now() if now.Sub(lastNotify) > time.Second*3 { @@ -479,11 +460,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error filename := fmt.Sprintf("%s_%03d%s%s", stickerSet.Name, i+1, emoji, opt.FileExt(s.Video)) outputFiles = append(outputFiles, filename) - go func() { - defer func() { - <-limitCh - taskGroup.Done() - }() + taskGroup.Go(func() error { // TODO reduce complexity by move some code to function // nolint: nestif,gocritic @@ -491,21 +468,19 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error of, err := os.OpenFile(path.Join(tempDir, filename), os.O_CREATE|os.O_RDWR, 0o640) if err != nil { - reth(err) - return + return err } defer func() { _ = of.Close() }() fileR, err := ctx.Bot().File(&s.File) if err != nil { - reth(err) - return + return err } defer func() { _ = fileR.Close() }() _, err = io.Copy(of, fileR) if err != nil { - reth(err) + return err } } else if s.Video { f := opt.VideoFormat() @@ -519,8 +494,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error fileR, err := ctx.Bot().File(&s.File) if err != nil { - reth(err) - return + return err } input := ffconv.GetPipeInputStream("webm") @@ -552,14 +526,13 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error } ccc, cancel := context.WithTimeout(cc, time.Second*120) defer cancel() - _, errCh2 := ff.ConvertPipe2File(ccc, fileR, "", input, path.Join(tempDir, filename), outputArgs...) + _, errCh := ff.ConvertPipe2File(ccc, fileR, "", input, path.Join(tempDir, filename), outputArgs...) select { - case err := <-errCh2: + case err := <-errCh: if err != nil { log.Error("failed to convert", zap.Error(err)) - reth(err) } - return + return err case <-ccc.Done(): if errors.Is(ccc.Err(), context.DeadlineExceeded) { log.Error("wait ffmpeg exec result timeout", zap.String("filename", filename), zap.String("convert_format", f)) @@ -570,9 +543,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error } replyMsgLock.Unlock() - // TODO use static error - // nolint: err113 - reth(errors.New("wait ffmpeg exec result timeout")) + return ccc.Err() } } } else { @@ -580,8 +551,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error fileR, err := ctx.Bot().File(&s.File) if err != nil { - reth(err) - return + return err } defer func() { @@ -596,15 +566,12 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error } replyMsgLock.Unlock() - reth(err1) - return + return err } - const ErrConvertMsg = "failed to convert image format" of, err1 := os.OpenFile(path.Join(tempDir, filename), os.O_CREATE|os.O_WRONLY, 0o640) if err1 != nil { - reth(err1) - return + return err } defer func() { _ = of.Close() }() @@ -625,35 +592,30 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error // TODO use static error // nolint: err113 - reth(errors.New("unknown target image format")) - return + return errors.New("unknown target image format") } if err2 != nil { replyMsgLock.Lock() if replyMsg == "" { - replyMsg = ErrConvertMsg + replyMsg = "failed to convert image format" } replyMsgLock.Unlock() - reth(err2) + return err } } - }() - } - taskGroup.Wait() + return nil + }) + } - if cc.Err() != nil { - // errs := make([]error, 0, len(errCh)) - // for err := range errCh { - // errs = append(errs, err) - // } + if err := taskGroup.Wait(); err != nil { + log.Error("failed to convert sticker", zap.Error(err)) if replyMsg == "" { replyMsg = "process failed" } - _ = ctx.Reply(replyMsg) - return errors.Join(errArray...) + return ctx.Reply(replyMsg) } if len(outputFiles) == 0 { From bd692fa53014be8303e46bb1c5b52351d20beb9e Mon Sep 17 00:00:00 2001 From: icceey Date: Fri, 20 Dec 2024 23:45:34 +0800 Subject: [PATCH 09/13] fix(sticker): improve loop control for sticker sending Co-authored-by: Hugefiver --- base/sticker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/base/sticker.go b/base/sticker.go index 8f724893..fb0fc83b 100644 --- a/base/sticker.go +++ b/base/sticker.go @@ -438,12 +438,13 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error replyMsg := "" replyMsgLock := sync.Mutex{} +loop: for i := range stickerSet.Stickers { s := &stickerSet.Stickers[i] select { case <-cc.Done(): - break + break loop default: } From a2d9d5d75c5a8ecc3733562aa73e6de5a7ea97b6 Mon Sep 17 00:00:00 2001 From: icceey Date: Sun, 22 Dec 2024 19:09:17 +0800 Subject: [PATCH 10/13] feat: support local api server --- base/helper.go | 8 ++++++++ config.yaml | 1 + config/config.go | 2 ++ main.go | 4 ++++ 4 files changed, 15 insertions(+) diff --git a/base/helper.go b/base/helper.go index dc4a861b..2dac463a 100644 --- a/base/helper.go +++ b/base/helper.go @@ -29,6 +29,14 @@ func Info(ctx Context) error { msg += fmt.Sprintf("Build Time: %s\n", buildTime) msg += fmt.Sprintf("Last Boot: %s\n", lastBoot) msg += fmt.Sprintf("Go Version: %s\n", runtime.Version()) + if ctx.Bot().URL != DefaultApiURL { + msg += fmt.Sprintf("API Server: CUSTOM\n") + } else { + msg += fmt.Sprintf("API Server: OFFICIAL\n") + } + if config.BotConfig.DebugMode { + msg += fmt.Sprintf("Debug Mode: YES\n") + } msg += "```" return ctx.Send(msg, ModeMarkdownV2) diff --git a/config.yaml b/config.yaml index 62b11534..431bfffd 100644 --- a/config.yaml +++ b/config.yaml @@ -3,6 +3,7 @@ debug: false worker: 4 # number of goroutine to recv update [int] # bot config +url: "" # optional, leave empty to use telegram official api token: "" proxy: "" # [http:// | socks5://] host:port listen: ":7777" diff --git a/config/config.go b/config/config.go index c53d7efd..85306978 100644 --- a/config/config.go +++ b/config/config.go @@ -64,6 +64,7 @@ func NewBotConfig() *Config { type Config struct { Bot *Bot + URL string Token string Proxy string Listen string @@ -113,6 +114,7 @@ func initViper(configFile, envPrefix string) { func readConfig() { // base config BotConfig.DebugMode = viper.GetBool("debug") + BotConfig.URL = viper.GetString("url") BotConfig.Token = viper.GetString("token") BotConfig.Proxy = viper.GetString("proxy") BotConfig.Listen = viper.GetString("listen") diff --git a/main.go b/main.go index 5a45140b..35295011 100644 --- a/main.go +++ b/main.go @@ -101,6 +101,10 @@ func initBot() (*Bot, error) { Verbose: false, } + if config.BotConfig.URL != "" { + settings.URL = config.BotConfig.URL + } + bot, err := NewBot(settings) if err != nil { return nil, err From ee20e911f8bdf67647679460f2b02402b3210144 Mon Sep 17 00:00:00 2001 From: icceey Date: Sun, 22 Dec 2024 19:12:35 +0800 Subject: [PATCH 11/13] remove useless log --- chat/gacha_reply.go | 7 ++++++- main.go | 4 +++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/chat/gacha_reply.go b/chat/gacha_reply.go index f1e0d234..195b5f2d 100644 --- a/chat/gacha_reply.go +++ b/chat/gacha_reply.go @@ -3,6 +3,8 @@ package chat import ( "csust-got/log" "csust-got/util/gacha" + "errors" + "github.com/redis/go-redis/v9" "strings" "go.uber.org/zap" @@ -26,7 +28,10 @@ func GachaReplyHandler(ctx telebot.Context) { result, err := gacha.PerformGaCha(ctx.Chat().ID) if err != nil { - log.Error("[GaCha]: perform gacha failed", zap.Error(err)) + // gacha may not be enabled, redis.Nil is expected, ignore it + if !errors.Is(err, redis.Nil) { + log.Error("[GaCha]: perform gacha failed", zap.Error(err)) + } return } diff --git a/main.go b/main.go index 35295011..8a8433d7 100644 --- a/main.go +++ b/main.go @@ -212,7 +212,9 @@ func customHandler(ctx Context) error { cmd := entities.FromMessage(ctx.Message()) if cmd == nil { - return errInvalidCmd + // all text message will be handled by this handler, + // but only command should be processed, so return nil for non-command message + return nil } cmdText := cmd.Name() From b8b6d1f1fc73df1215d7f82fc1d26eeeee823bc8 Mon Sep 17 00:00:00 2001 From: icceey Date: Sun, 22 Dec 2024 21:16:49 +0800 Subject: [PATCH 12/13] fmt: lint --- base/helper.go | 6 +++--- main.go | 3 --- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/base/helper.go b/base/helper.go index 2dac463a..7eacb739 100644 --- a/base/helper.go +++ b/base/helper.go @@ -30,12 +30,12 @@ func Info(ctx Context) error { msg += fmt.Sprintf("Last Boot: %s\n", lastBoot) msg += fmt.Sprintf("Go Version: %s\n", runtime.Version()) if ctx.Bot().URL != DefaultApiURL { - msg += fmt.Sprintf("API Server: CUSTOM\n") + msg += "API Server: CUSTOM\n" } else { - msg += fmt.Sprintf("API Server: OFFICIAL\n") + msg += "API Server: OFFICIAL\n" } if config.BotConfig.DebugMode { - msg += fmt.Sprintf("Debug Mode: YES\n") + msg += "Debug Mode: YES\n" } msg += "```" diff --git a/main.go b/main.go index 8a8433d7..f0e92d18 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,6 @@ import ( "csust-got/util/gacha" wordSeg "csust-got/word_seg" "encoding/json" - "errors" "fmt" "net/http" "net/url" @@ -28,8 +27,6 @@ import ( . "gopkg.in/telebot.v3" ) -var errInvalidCmd = errors.New("invalid command") - func main() { config.InitConfig("config.yaml", "BOT") log.InitLogger() From 78b80f617d75bc804c267dbd78171520d21ab948 Mon Sep 17 00:00:00 2001 From: icceey Date: Sun, 22 Dec 2024 23:49:05 +0800 Subject: [PATCH 13/13] feat: adjust sticker process num --- base/sticker.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/base/sticker.go b/base/sticker.go index fb0fc83b..a9a039c8 100644 --- a/base/sticker.go +++ b/base/sticker.go @@ -15,6 +15,7 @@ import ( "io" "os" "path" + "runtime" "slices" "strings" "sync" @@ -431,7 +432,7 @@ func sendStickerPack(ctx tb.Context, sticker *tb.Sticker, opt stickerOpts) error lastNotify := time.Now() _ = ctx.Notify(tb.ChoosingSticker) - const MaxTask = 5 + var MaxTask = runtime.NumCPU() + 2 taskGroup, cc := errgroup.WithContext(context.Background()) taskGroup.SetLimit(MaxTask)