Skip to content

Conversation

cosmo0920
Copy link
Contributor

@cosmo0920 cosmo0920 commented Sep 11, 2025

This change is needed because, without this change, we just received unknown zstd frames like:

forward: len=12984, head=54 d1 00 da cc 74 2e 4e 20 d6 2a ce 01 be f2 c2 | compressed(opt)=2

But, zstd specification always needs to attach the head of magic bytes like:

forward: len=19835, head=28 b5 2f fd a0 c2 14 03 00 a4 d0 00 4a f7 30 39 | compressed(opt)=2

So, we need to attach the head of magic bytes 28 b5 2f fd in zstd compressed payloads.

This can be dumped with:

diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 4c323bb0..f192d6e3 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -672,6 +672,9 @@ module Fluent::Plugin
         sock.write @sender.forward_header                    # array, size=3
         sock.write tag.to_msgpack                            # 1. tag: String (str)
         chunk.open(compressed: @compress) do |chunk_io|
+          head = chunk_io.read(8) || ''.b
+          @log.info "debug: forward entries head", hex: head.bytes.map { |b| "%02x" % b }.join(' ')
+          chunk_io.rewind
           entries = [0xdb, chunk_io.size].pack('CN')
           sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32)
           IO.copy_stream(chunk_io, sock)                     #    writeRawBody(packed_es)

Which issue(s) this PR fixes:
None

What this PR does / why we need it:

This could be a known issue after merging #4657.
This is because with that PR patch, we wasn't able to decompress zstd compressed insisted payloads in Fluent Bit side.
In our side, we need to set up explicit zstd frames with the head of magic bytes: 28 b5 2f fd.

However, stream writer of zstd-ruby does not wrap up their compressing payloads with that zstd specific payloads.
So, we always experienced this kind of errors by using Fluent Bit's development version of in_forward with zstd compressed insisted payloads.
With gzip compressed payloads, there is no issue but the behavior differences of StreamWrite class between Gzip and Zstd could cause this issue.

The related Fluent Bit's PR is:
fluent/fluent-bit#10710

Docs Changes:

Release Note:

This change is needed because, without this change, we just received
unknown zstd frames like:

forward: len=12984, head=54 d1 00 da cc 74 2e 4e 20 d6 2a ce 01 be f2 c2 | compressed(opt)=2

But, zstd specification always needs to attach the head of magic bytes
like:

forward: len=19835, head=28 b5 2f fd a0 c2 14 03 00 a4 d0 00 4a f7 30 39 | compressed(opt)=2

So, we need to attach the head of magic bytes `28 b5 2f fd` in zstd compressed
payloads.

This can be dumped with:

```diff
diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 4c323bb..f192d6e3 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -672,6 +672,9 @@ module Fluent::Plugin
         sock.write @sender.forward_header                    # array, size=3
         sock.write tag.to_msgpack                            # 1. tag: String (str)
         chunk.open(compressed: @compress) do |chunk_io|
+          head = chunk_io.read(8) || ''.b
+          @log.info "debug: forward entries head", hex: head.bytes.map { |b| "%02x" % b }.join(' ')
+          chunk_io.rewind
           entries = [0xdb, chunk_io.size].pack('CN')
           sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32)
           IO.copy_stream(chunk_io, sock)                     #    writeRawBody(packed_es)
```

Signed-off-by: Hiroshi Hatake <[email protected]>
@cosmo0920 cosmo0920 requested review from ashie and daipom September 11, 2025 09:16
@daipom
Copy link
Contributor

daipom commented Sep 11, 2025

@cosmo0920
Thanks for this fix!
I’ve only been able to check a little so far, but does this mean that Fluentd’s zstd implementation isn’t compliant with RCF 8878 Zstandard Frames?
(Fluentd produces data without a Magic_Number?)

When I try a simple test with Zstd::StreamWriter, it includes the Magic_Number.
I wonder why.

$ irb -rzstd-ruby -rstringio
irb(main):001> io = StringIO.new
=> #<StringIO:0x00007d7e3bab3c18>
irb(main):002> stream = Zstd::StreamWriter.new(io)
=> #<Zstd::StreamWriter:0x00007d7e3632d6b0 @io=#<StringIO:0x00007d7e3bab3c18>, @stream=#<Zstd::StreamingCompress:0x00007d7e3632a258>>
irb(main):003> stream.write("abc")
=> 12
irb(main):004> stream.finish
=> 3
irb(main):005> io.rewind
=> 0
irb(main):006> d=io.read
=> "(\xB5/\xFD\u0000X\u0018\u0000\u0000abc\u0001\u0000\u0000"
irb(main):008> d.unpack("H*")
=> ["28b52ffd0058180000616263010000"]

I’ll also try to check this with Fluentd’s behavior.

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Sep 11, 2025

How about using Enumerable mixined class instances case?

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Sep 11, 2025

I’ve only been able to check a little so far, but does this mean that Fluentd’s zstd implementation isn’t compliant with RCF 8878 Zstandard Frames?
(Fluentd produces data without a Magic_Number?)

Yup, at least, out_forward does not attach such zstd standarized magic number when using zstd compression.

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Sep 11, 2025

(Fluentd produces data without a Magic_Number?)

From RFC 8878:

3.1.1. Zstandard Frames

The structure of a single Zstandard frame is as follows:

                +--------------------+------------+
                | Magic_Number       | 4 bytes    |
                +--------------------+------------+
                | Frame_Header       | 2-14 bytes |
                +--------------------+------------+
                | Data_Block         | n bytes    |
                +--------------------+------------+
                | [More Data_Blocks] |            |
                +--------------------+------------+
                | [Content_Checksum] | 4 bytes    |
                +--------------------+------------+

                    Table 1: The Structure of a
                       Single Zstandard Frame

Magic_Number: 4 bytes, little-endian format. Value: 0xFD2FB528.

Yes, Fluentd generates without this type of magic number when compressing and using zstd compression.
This could be sick for other implementation of Fluent Server.

Plus, it's little endian flag so the number of series that is 0x28 0xb5 0x2f 0xfd should be needed to include zstd compressed payloads in forward protocol to distinguish whether among plain text or gzip compressed or zstd compressed.
When using gzip compression, Fluentd already uses 0x1f 0x8b magic number for gzip compression.

From https://datatracker.ietf.org/doc/html/rfc6713, we need to use this magic number at the payloads' headers: 0x1f, 0x8b

Additional information:
Magic number(s): first two bytes are 0x1f, 0x8b.
File extension(s): gz
Macintosh file type code(s): N/A

@daipom daipom added this to the v1.19.1 milestone Sep 12, 2025
@daipom
Copy link
Contributor

daipom commented Sep 12, 2025

Sorry, I didn’t have much time today.
I also tried to reproduce it with Fluentd, but I still couldn’t.
I can confirm the magic number.

  • Fluentd v1.19.0
  • Ubuntu 22.04
  • conf:
<source>
  @type sample
  tag test.foo
</source>

<match test.**>
  @type forward
  compress zstd
  <server>
    host localhost
    port 24224
  </server>
  <buffer>
    @type memory
    flush_mode interval
    flush_interval 2s
  </buffer>
</match>

<source>
  @type forward
  @label @SERVER
</source>

<label @SERVER>
  <match **>
    @type stdout
  </match>
</label>
  • patch:
diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb
index 4c323bb0..977d99f6 100644
--- a/lib/fluent/plugin/out_forward.rb
+++ b/lib/fluent/plugin/out_forward.rb
@@ -672,6 +672,9 @@ module Fluent::Plugin
         sock.write @sender.forward_header                    # array, size=3
         sock.write tag.to_msgpack                            # 1. tag: String (str)
         chunk.open(compressed: @compress) do |chunk_io|
+          head = chunk_io.read(8) || ''.b
+          @log.warn "debug: forward entries head", hex: head.bytes.map { |b| "%02x" % b }.join(' ')
+          chunk_io.rewind
           entries = [0xdb, chunk_io.size].pack('CN')
           sock.write entries.force_encoding(Encoding::UTF_8) # 2. entries: String (str32)
           IO.copy_stream(chunk_io, sock)                     #    writeRawBody(packed_es)
  • Result:
2025-09-12 18:56:04 +0900 [info]: #0 fluentd worker is now running worker=0
2025-09-12 18:56:07 +0900 [warn]: #0 debug: forward entries head hex="28 b5 2f fd 00 58 d8 00"
2025-09-12 18:56:05.086068100 +0900 test.foo: {"message":"sample"}
2025-09-12 18:56:06.087594952 +0900 test.foo: {"message":"sample"}
2025-09-12 18:56:07.088746068 +0900 test.foo: {"message":"sample"}

@daipom
Copy link
Contributor

daipom commented Sep 12, 2025

I’ll check more patterns, including forwarding with Fluent Bit.

@cosmo0920
Copy link
Contributor Author

cosmo0920 commented Sep 12, 2025

Hi, I rechecked and found that -- when nothing to occur for not concatenated cases of zstd compression.
The current implementation is already working.
But when occurring zstd frames concatenations are occurred, the C style of concatenation is always corrupted and couldn't decompress.
So, we need to terminate the zstd compression buffers one-by-one and concatenating with C style is needed.

This could be reproduced with huge amount of lines file and head - /path/to/tailing_target and using in_tail plugin to ingest large amount of events and will be able to handle this type of high volume specific occurrences.

To reproduce this issue, it needs an ingestion of around the amount of 1700 lines of file contents at once.

@cosmo0920
Copy link
Contributor Author

How's going this PR, mate?
Should we deeply dive into the dependent gem like zstd-ruby?

@daipom
Copy link
Contributor

daipom commented Sep 19, 2025

Sorry, I haven’t been able to make time over the past few days.
Thanks for the reproduction steps!
I'll try it and review this change.

@daipom
Copy link
Contributor

daipom commented Sep 19, 2025

I could reproduce this! Thanks!
As you said, the issue can be reproduced when concatenating a large amount of data.
It does not occur with the concatenation of small data.

<source>
  @type sample
  tag test.foo
  size 5000 # This is important.
</source>

<match test.**>
  @type forward
  compress zstd
  <server>
    host localhost
    port 24224
  </server>
  <buffer>
    @type memory
    flush_mode interval
    flush_interval 2s
  </buffer>
</match>

<source>
  @type forward
  @label @SERVER
</source>

<label @SERVER>
  <match **>
    @type stdout
  </match>
</label>
2025-09-19 18:40:21 +0900 [info]: #0 fluentd worker is now running worker=0
2025-09-19 18:40:24 +0900 [warn]: #0 debug: forward entries head hex="0c 0f 00 24 1a 65 a5 bb"
2025-09-19 18:40:24 +0900 [error]: #0 unexpected error on reading data host="127.0.0.1" port=34744 error_class=RuntimeError error="decompress error error code: Unknown frame descriptor"
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/zstd-ruby-1.5.7.0/lib/zstd-ruby/stream_reader.rb:14:in `decompress'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/.rbenv/versions/3.2.2/lib/ruby/gems/3.2.0/gems/zstd-ruby-1.5.7.0/lib/zstd-ruby/stream_reader.rb:14:in `read'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/compressable.rb:86:in `block in string_decompress_zstd'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/compressable.rb:84:in `loop'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/compressable.rb:84:in `string_decompress_zstd'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/compressable.rb:97:in `string_decompress'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/compressable.rb:57:in `decompress'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:307:in `ensure_decompressed!'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/event.rb:289:in `each'
  2025-09-19 18:40:24 +0900 [error]: #0 /home/daipom/work/fluentd/fluentd/lib/fluent/plugin/in_forward.rb:373:in `check_and_skip_invalid_event'

@daipom daipom modified the milestones: v1.19.1, v1.20.0 Sep 22, 2025
@daipom daipom added the backport to v1.19 We will backport this fix to the LTS branch label Sep 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport to v1.19 We will backport this fix to the LTS branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants