Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/src/assets/modes.dot
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ digraph modes {
"read" -> "panic";

"write" -> "write";
"write" -> "stop";
"write" -> "done";
"write" -> "close";
"write" -> "panic";

"stop" -> "close";
"done" -> "close";

"start" [ shape = point ];
"idle" [ shape = circle ];
"read" [ shape = circle ];
"write" [ shape = circle ];
"stop" [ shape = circle; style=bold; ];
"done" [ shape = circle; style=bold; ];
"close" [ shape = circle; style=bold; ];
"panic" [ shape = circle; style=bold; ];
}
171 changes: 102 additions & 69 deletions docs/src/assets/modes.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion docs/src/devnotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ The `mode` field may be one of the following value:
- `:idle` : initial and intermediate mode, no buffered data
- `:read` : being ready to read data, data may be buffered
- `:write`: being ready to write data, data may be buffered
- `:stop` : transcoding is stopped, data may be buffered
- `:stop` : transcoding is stopped after read, data may be buffered
- `:done` : transcoding is stopped after write, data may be buffered
- `:close`: closed, no buffered data
- `:panic`: an exception has been thrown in codec, data may be buffered but we
cannot do anything
Expand Down
4 changes: 2 additions & 2 deletions src/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ See Developer's notes for details.
"""
mutable struct State
# current stream mode
mode::Symbol # {:idle, :read, :write, :stop, :close, :panic}
mode::Symbol # {:idle, :read, :write, :stop, :done, :close, :panic}

# return code of the last method call
code::Symbol # {:ok, :end, :error}

# flag to go :stop on :end
# flag to go :stop or :done on :end
stop_on_end::Bool

# exception thrown while data processing
Expand Down
39 changes: 23 additions & 16 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ function Base.isopen(stream::TranscodingStream)
end

function Base.close(stream::TranscodingStream)
stopped = stream.state.mode == :stop
stopped = stream.state.mode ∈ (:stop, :done)
if stream.state.mode != :panic
changemode!(stream, :close)
end
Expand Down Expand Up @@ -212,7 +212,9 @@ end
changemode!(stream, :read)
continue
elseif mode == :write
return eof(stream.stream)
return true
elseif mode == :done
return true
elseif mode == :close
return true
elseif mode == :stop
Expand Down Expand Up @@ -252,7 +254,7 @@ function Base.skip(stream::TranscodingStream, offset::Integer)
mode = stream.state.mode
buffer1 = stream.buffer1
skipped = 0
if mode == :read
if mode ∈ (:read, :stop)
while !eof(stream) && buffersize(buffer1) < offset - skipped
n = buffersize(buffer1)
emptybuffer!(buffer1)
Expand Down Expand Up @@ -325,15 +327,11 @@ end
# --------------

function Base.read(stream::TranscodingStream, ::Type{UInt8})
# eof and ready_to_read! are inlined here because ready_to_read! is very slow and eof is broken
eof = buffersize(stream.buffer1) == 0
state = stream.state
mode = state.mode
if !(mode == :read || mode == :stop)
changemode!(stream, :read)
end
if eof && sloweof(stream)
throw(EOFError())
if eof(stream)
ready_to_read!(stream)
Copy link
Contributor

@vtjnash vtjnash Feb 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be implied by eof

Suggested change
ready_to_read!(stream)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change means an EOFError is thrown when reading a byte from a closed stream.
This seems to match what happens with IOStream

julia> p, io = mktemp()
("/tmp/jl_GKa3GO", IOStream(<fd 21>))

julia> close(io)

julia> read(io, UInt8)
ERROR: EOFError: read end of file
Stacktrace:
 [1] read(s::IOStream, ::Type{UInt8})
   @ Base ./iostream.jl:400
 [2] top-level scope
   @ REPL[43]:1

But doesn't match with what IOBuffer does:

julia> stream = IOBuffer();

julia> close(stream)

julia> read(stream, UInt8)
ERROR: ArgumentError: read failed, IOBuffer is not readable
Stacktrace:
 [1] _throw_not_readable()
   @ Base ./iobuffer.jl:165
 [2] read(from::IOBuffer, ::Type{UInt8})
   @ Base ./iobuffer.jl:218
 [3] top-level scope
   @ REPL[49]:1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, though I think eof could optionally throw a state-exception in that case also sometimes. The ambiguity occurs when a stream is ambiguously readable or writable, which sometimes cannot be avoided. So seekable files usually close both ends at once (and eof is a transient condition, which changes to an error after calling close), whereas (non-seekable) streams the eof is a property of reading and isopen is a property of writing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm undoing the change because I like how IOBuffer is handling this better than IOStream.

For example, I think read(io, String) after close should error instead of returning an empty string:

julia> p, io = mktemp()

julia> close(io)

julia> read(io, String)
""
julia> stream = IOBuffer();

julia> close(stream)

julia> read(stream, String)
ERROR: ArgumentError: read failed, IOBuffer is not readable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that is already eof’s job to throw when there is a state violation, so it should not be necessary for read to also redundantly check

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would break this test that was added in #32:

stream = NoopStream(IOBuffer(""))
unsafe_write(stream, C_NULL, 0)
@test eof(stream) # write
close(stream)
@test eof(stream) # close

Copy link
Member Author

@nhz2 nhz2 Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that eof should throw if the stream is not in a readable state.

However this seems like a breaking change, so I will split this PR into smaller parts, and add the eof changes in v0.11 for v0.11 I would also like to fix #93, #109 and remove the seekend implementation.

if eof(stream)
throw(EOFError())
end
end
return readbyte!(stream.buffer1)
end
Expand Down Expand Up @@ -459,7 +457,7 @@ end
# Ready to read data from the stream.
function ready_to_read!(stream::TranscodingStream)
mode = stream.state.mode
if !(mode == :read || mode == :stop)
if mode ∉ (:read, :stop)
changemode!(stream, :read)
end
return
Expand Down Expand Up @@ -623,7 +621,7 @@ function flushbuffer(stream::TranscodingStream, all::Bool=false)
buffer1 = stream.buffer1
buffer2 = stream.buffer2
nflushed::Int = 0
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :stop
while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :done
if state.code == :end
callstartproc(stream, :write)
end
Expand Down Expand Up @@ -688,7 +686,11 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
# When no progress, expand the output buffer.
makemargin!(outbuf, max(16, marginsize(outbuf) * 2))
elseif state.code == :end && state.stop_on_end
changemode!(stream, :stop)
if stream.state.mode == :read
changemode!(stream, :stop)
else
changemode!(stream, :done)
end
end
return Δin, Δout
end
Expand Down Expand Up @@ -778,7 +780,7 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
return
end
elseif mode == :write
if newmode == :close || newmode == :stop
if newmode == :close || newmode == :done
if newmode == :close
flushbufferall(stream)
flushuntilend(stream)
Expand All @@ -792,6 +794,11 @@ function changemode!(stream::TranscodingStream, newmode::Symbol)
state.mode = newmode
return
end
elseif mode == :done
if newmode == :close
state.mode = newmode
return
end
elseif mode == :panic
throw_panic_error()
end
Expand Down
Loading