Skip to content

Commit d92fd8b

Browse files
authored
Avoid double counting in stats out when writing to nested streams (#204)
1 parent 7988463 commit d92fd8b

File tree

7 files changed

+259
-91
lines changed

7 files changed

+259
-91
lines changed

docs/src/devnotes.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ default buffer size is 16KiB for each.
4141
- `error`: exception returned by the codec (`<:Error`)
4242
- `buffer1`: data buffer that is closer to the user (`<:Buffer`)
4343
- `buffer2`: data buffer that is farther to the user (`<:Buffer`)
44+
- `bytes_written_out`: number of bytes written to the underlying stream (`<:Int64`)
4445

4546
The `mode` field may be one of the following value:
4647
- `:idle` : initial and intermediate mode, no buffered data
@@ -78,8 +79,10 @@ Shared buffers
7879
Adjacent transcoding streams may share their buffers. This will reduce memory
7980
allocation and eliminate data copy between buffers.
8081

81-
`readdata!(input::IO, output::Buffer)` and `writedata!(output::IO,
82-
input::Buffer)` do the actual work of read/write data from/to the underlying
82+
If `buffer2` is shared it is considered to be owned by the underlying stream
83+
by the `stats` and `position` functions.
84+
85+
`readdata!(input::IO, output::Buffer)` and `flush_buffer2(stream::TranscodingStream)` do the actual work of read/write data from/to the underlying
8386
stream. These methods have a special pass for shared buffers.
8487

8588

fuzz/fuzz.jl

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,25 @@ end
152152
)
153153
stream = wrap_stream(kws, IOBuffer())
154154
for i in 1:length(data)
155+
position(stream) == i-1 || return false
156+
if stream isa TranscodingStream
157+
s = TranscodingStreams.stats(stream)
158+
s.in == i-1 || return false
159+
# TODO fix position(stream.stream)
160+
# s.out == position(stream.stream) || return false
161+
# s.transcoded_in == s.out || return false
162+
# s.transcoded_out == s.out || return false
163+
end
155164
write(stream, data[i]) == 1 || return false
156165
end
157-
take_all(stream) == data
166+
take_all(stream) == data || return false
167+
if stream isa TranscodingStream
168+
s = TranscodingStreams.stats(stream)
169+
s.in == length(data) || return false
170+
# TODO fix position(stream.stream)
171+
# s.out == position(stream.stream) || return false
172+
# s.transcoded_in == s.out || return false
173+
# s.transcoded_out == s.out || return false
174+
end
175+
true
158176
end

src/noop.jl

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ end
7272
function Base.seek(stream::NoopStream, pos::Integer)
7373
mode = stream.state.mode
7474
if mode === :write
75-
flushbuffer(stream)
75+
flush_buffer2(stream)
7676
end
7777
seek(stream.stream, pos)
7878
initbuffer!(stream.buffer1)
@@ -82,7 +82,7 @@ end
8282
function Base.seekstart(stream::NoopStream)
8383
mode = stream.state.mode
8484
if mode === :write
85-
flushbuffer(stream)
85+
flush_buffer2(stream)
8686
end
8787
seekstart(stream.stream)
8888
initbuffer!(stream.buffer1)
@@ -92,7 +92,7 @@ end
9292
function Base.seekend(stream::NoopStream)
9393
mode = stream.state.mode
9494
if mode === :write
95-
flushbuffer(stream)
95+
flush_buffer2(stream)
9696
end
9797
seekend(stream.stream)
9898
initbuffer!(stream.buffer1)
@@ -103,29 +103,34 @@ function Base.write(stream::NoopStream, b::UInt8)::Int
103103
changemode!(stream, :write)
104104
if has_sharedbuf(stream)
105105
# directly write data to the underlying stream
106-
n = Int(write(stream.stream, b))
107-
return n
106+
write(stream.stream, b)
107+
stream.state.bytes_written_out += 1
108+
else
109+
buffer1 = stream.buffer1
110+
marginsize(buffer1) > 0 || flush_buffer2(stream)
111+
writebyte!(buffer1, b)
108112
end
109-
buffer1 = stream.buffer1
110-
marginsize(buffer1) > 0 || flushbuffer(stream)
111-
return writebyte!(buffer1, b)
113+
return 1
112114
end
113115

114116
function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int
115117
changemode!(stream, :write)
118+
Int(nbytes) # Error if nbytes > typemax Int
116119
if has_sharedbuf(stream)
117120
# directly write data to the underlying stream
118121
n = Int(unsafe_write(stream.stream, input, nbytes))
122+
stream.state.bytes_written_out += n
119123
return n
120124
end
121125
buffer = stream.buffer1
122126
if marginsize(buffer) nbytes
123127
copydata!(buffer, input, Int(nbytes))
124128
return Int(nbytes)
125129
else
126-
flushbuffer(stream)
130+
flush_buffer2(stream)
127131
# directly write data to the underlying stream
128132
n = Int(unsafe_write(stream.stream, input, nbytes))
133+
stream.state.bytes_written_out += n
129134
return n
130135
end
131136
end
@@ -148,17 +153,20 @@ function stats(stream::NoopStream)
148153
buffer = stream.buffer1
149154
@assert buffer === stream.buffer2
150155
if mode === :idle
151-
consumed = supplied = 0
156+
in = out = 0
152157
elseif mode === :read
153-
supplied = buffer.transcoded
154-
consumed = supplied - buffersize(buffer)
158+
in = buffer.transcoded
159+
out = in - buffersize(buffer)
155160
elseif mode === :write
156-
supplied = buffer.transcoded + buffersize(buffer)
157-
consumed = buffer.transcoded
161+
out = stream.state.bytes_written_out
162+
in = out
163+
if !has_sharedbuf(stream)
164+
in += buffersize(buffer)
165+
end
158166
else
159167
throw_invalid_mode(mode)
160168
end
161-
return Stats(consumed, supplied, supplied, supplied)
169+
return Stats(in, out, out, out)
162170
end
163171

164172

@@ -190,24 +198,15 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int
190198
return nfilled
191199
end
192200

193-
function flushbuffer(stream::NoopStream, all::Bool=false)
194-
changemode!(stream, :write)
195-
buffer = stream.buffer1
196-
@assert buffer === stream.buffer2
197-
nflushed::Int = 0
198-
if all
199-
while buffersize(buffer) > 0
200-
nflushed += writedata!(stream.stream, buffer)
201-
end
202-
else
203-
nflushed += writedata!(stream.stream, buffer)
204-
makemargin!(buffer, 0)
205-
end
206-
buffer.transcoded += nflushed
207-
return nflushed
201+
# Empty buffer1 by writing out data.
202+
# `stream` must be in :write mode.
203+
# Ensure there is margin available in buffer1 for at least one byte.
204+
function flush_buffer1(stream::NoopStream)::Nothing
205+
flush_buffer2(stream)
208206
end
209207

208+
# This is always called after `flush_buffer1(stream)`
210209
function flushuntilend(stream::NoopStream)
211-
stream.buffer1.transcoded += writedata!(stream.stream, stream.buffer1)
210+
@assert iszero(buffersize(stream.buffer1))
212211
return
213212
end

src/state.jl

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@ mutable struct State
2424
buffer1::Buffer
2525
buffer2::Buffer
2626

27+
# Number of bytes written to the underlying stream
28+
bytes_written_out::Int64
29+
2730
function State(buffer1::Buffer, buffer2::Buffer)
28-
return new(:idle, :ok, false, Error(), buffer1, buffer2)
31+
return new(:idle, :ok, false, Error(), buffer1, buffer2, Int64(0))
2932
end
3033
end
3134

src/stream.jl

Lines changed: 60 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -494,21 +494,24 @@ function Base.write(stream::TranscodingStream)
494494
return 0
495495
end
496496

497-
function Base.write(stream::TranscodingStream, b::UInt8)
497+
function Base.write(stream::TranscodingStream, b::UInt8)::Int
498498
changemode!(stream, :write)
499-
if marginsize(stream.buffer1) == 0 && flushbuffer(stream) == 0
500-
return 0
501-
end
502-
return writebyte!(stream.buffer1, b)
499+
buffer1 = stream.buffer1
500+
marginsize(buffer1) > 0 || flush_buffer1(stream)
501+
writebyte!(buffer1, b)
502+
return 1
503503
end
504504

505505
function Base.unsafe_write(stream::TranscodingStream, input::Ptr{UInt8}, nbytes::UInt)
506506
changemode!(stream, :write)
507-
state = stream.state
507+
Int(nbytes) # Error if nbytes > typemax Int
508508
buffer1 = stream.buffer1
509509
p = input
510510
p_end = p + nbytes
511-
while p < p_end && (marginsize(buffer1) > 0 || flushbuffer(stream) > 0)
511+
while p < p_end
512+
if marginsize(buffer1) 0
513+
flush_buffer1(stream)
514+
end
512515
m = min(marginsize(buffer1), p_end - p)
513516
copydata!(buffer1, p, m)
514517
p += m
@@ -535,16 +538,16 @@ const TOKEN_END = EndToken()
535538

536539
function Base.write(stream::TranscodingStream, ::EndToken)
537540
changemode!(stream, :write)
538-
flushbufferall(stream)
541+
flush_buffer1(stream)
539542
flushuntilend(stream)
540543
return 0
541544
end
542545

543546
function Base.flush(stream::TranscodingStream)
544547
checkmode(stream)
545548
if stream.state.mode == :write
546-
flushbufferall(stream)
547-
writedata!(stream.stream, stream.buffer2)
549+
flush_buffer1(stream)
550+
flush_buffer2(stream)
548551
end
549552
flush(stream.stream)
550553
end
@@ -600,9 +603,12 @@ function stats(stream::TranscodingStream)
600603
out = transcoded_out - buffersize(buffer1)
601604
elseif mode === :write
602605
transcoded_in = buffer1.transcoded
603-
transcoded_out = buffer2.transcoded
606+
out = state.bytes_written_out
607+
transcoded_out = out
608+
if !has_sharedbuf(stream)
609+
transcoded_out += buffersize(buffer2)
610+
end
604611
in = transcoded_in + buffersize(buffer1)
605-
out = transcoded_out - buffersize(buffer2)
606612
else
607613
throw_invalid_mode(mode)
608614
end
@@ -633,38 +639,37 @@ function fillbuffer(stream::TranscodingStream; eager::Bool = false)
633639
return nfilled
634640
end
635641

636-
function flushbuffer(stream::TranscodingStream, all::Bool=false)
637-
changemode!(stream, :write)
642+
# Empty buffer1 by writing out data.
643+
# `stream` must be in :write mode.
644+
# Ensure there is margin available in buffer1 for at least one byte.
645+
function flush_buffer1(stream::TranscodingStream)::Nothing
638646
state = stream.state
639647
buffer1 = stream.buffer1
640648
buffer2 = stream.buffer2
641-
nflushed::Int = 0
642-
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0)
649+
while buffersize(buffer1) > 0
643650
if state.code == :end
644651
callstartproc(stream, :write)
645652
end
646-
writedata!(stream.stream, buffer2)
647-
Δin, _ = callprocess(stream, buffer1, buffer2)
648-
nflushed += Δin
653+
flush_buffer2(stream)
654+
callprocess(stream, buffer1, buffer2)
649655
end
650-
return nflushed
651-
end
652-
653-
function flushbufferall(stream::TranscodingStream)
654-
return flushbuffer(stream, true)
656+
# move positions to the start of the buffer
657+
@assert !iszero(makemargin!(buffer1, 0))
658+
return
655659
end
656660

661+
# This is always called after `flush_buffer1(stream)`
657662
function flushuntilend(stream::TranscodingStream)
658-
changemode!(stream, :write)
659663
state = stream.state
660664
buffer1 = stream.buffer1
661665
buffer2 = stream.buffer2
666+
@assert buffersize(buffer1) == 0
667+
@assert stream.state.mode === :write
662668
while state.code != :end
663-
writedata!(stream.stream, buffer2)
669+
flush_buffer2(stream)
664670
callprocess(stream, buffer1, buffer2)
665671
end
666-
writedata!(stream.stream, buffer2)
667-
@assert buffersize(buffer1) == 0
672+
flush_buffer2(stream)
668673
return
669674
end
670675

@@ -687,7 +692,7 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
687692
state = stream.state
688693
input = buffermem(inbuf)
689694
GC.@preserve inbuf makemargin!(outbuf, minoutsize(stream.codec, input))
690-
Δin, Δout, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error)
695+
Δin::Int, Δout::Int, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error)
691696
@debug(
692697
"called process()",
693698
code = state.code,
@@ -698,6 +703,12 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
698703
)
699704
consumed!(inbuf, Δin, transcode = true)
700705
supplied!(outbuf, Δout, transcode = true)
706+
if has_sharedbuf(stream)
707+
if stream.state.mode === :write
708+
# this must be updated before throwing any error if outbuf is shared.
709+
stream.state.bytes_written_out += Δout
710+
end
711+
end
701712
if state.code == :error
702713
changemode!(stream, :panic)
703714
elseif state.code == :ok && Δin == Δout == 0
@@ -745,20 +756,28 @@ function readdata!(input::IO, output::Buffer)::Int
745756
end
746757

747758
# Write all data to `output` from the buffer of `input`.
748-
function writedata!(output::IO, input::Buffer)
749-
if output isa TranscodingStream && output.buffer1 === input
759+
function flush_buffer2(stream::TranscodingStream)::Nothing
760+
output = stream.stream
761+
buffer2 = stream.buffer2
762+
state = stream.state
763+
@assert state.mode === :write
764+
if has_sharedbuf(stream)
750765
# Delegate the operation to the underlying stream for shared buffers.
751-
return flushbufferall(output)
752-
end
753-
nwritten::Int = 0
754-
while buffersize(input) > 0
755-
n = GC.@preserve input Base.unsafe_write(output, bufferptr(input), buffersize(input))
756-
consumed!(input, n)
757-
nwritten += n
766+
changemode!(output, :write)
767+
flush_buffer1(output)
768+
else
769+
while buffersize(buffer2) > 0
770+
n::Int = GC.@preserve buffer2 Base.unsafe_write(output, bufferptr(buffer2), buffersize(buffer2))
771+
n 0 && error("short write")
772+
consumed!(buffer2, n)
773+
state.bytes_written_out += n
774+
GC.safepoint()
775+
end
776+
# move positions to the start of the buffer
777+
@assert !iszero(makemargin!(buffer2, 0))
758778
GC.safepoint()
759779
end
760-
GC.safepoint()
761-
return nwritten
780+
nothing
762781
end
763782

764783

@@ -800,7 +819,7 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
800819
end
801820
elseif mode == :write
802821
if newmode == :close
803-
flushbufferall(stream)
822+
flush_buffer1(stream)
804823
flushuntilend(stream)
805824
state.mode = newmode
806825
finalize_codec(stream.codec, state.error)

0 commit comments

Comments
 (0)