diff --git a/src/client/httplibs/juliaweb_http.jl b/src/client/httplibs/juliaweb_http.jl index 57a5759..7db730b 100644 --- a/src/client/httplibs/juliaweb_http.jl +++ b/src/client/httplibs/juliaweb_http.jl @@ -290,10 +290,16 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes open_kwargs = merge(open_kwargs, (; verbose=get(ctx.client.clntoptions, :verbose, false))) end + # Capture the streaming connection so the abort-on-close watcher below can + # unblock the read task when the consumer closes `stream_to`. (Mirrors the + # `:downloads` backend, which interrupts its download task on channel close.) + io_ref = Ref{Any}(nothing) + @sync begin - @async begin + read_task = @async begin try HTTP.open(method, url, headers; open_kwargs...) do io + io_ref[] = io write(io, body) captured_response[] = http_response = startread(io) try @@ -311,7 +317,15 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes end catch ex close(output) - rethrow(ex) + # When the consumer closes `stream_to`, the watcher task below aborts + # this read (close(io) + a scheduled InterruptException); that surfaces + # here as an IO error or InterruptException. Swallow it so the streaming + # request returns normally instead of propagating a spurious error. A + # read timeout or genuine network error arrives while `stream_to` is + # still open (and is not our InterruptException), so it still throws. + if isopen(stream_to) && !isa(ex, InterruptException) + rethrow(ex) + end end end @@ -337,6 +351,45 @@ function _http_streaming_request(ctx, method, url, headers, body, timeout, bytes close(stream_to) end end + + @async begin + # Abort-on-close watcher: when the consumer closes `stream_to` (e.g. a + # k8s watch is stopped via `close(stream)`), abort the read task above so + # it unblocks immediately instead of hanging on the socket until the + # read-idle timeout. Mirrors the `:downloads` backend, which interrupts + # its download task on channel close. + try + while isopen(stream_to) + wait(stream_to) + yield() + end + catch ex + isa(ex, InvalidStateException) || rethrow(ex) + end + # Best-effort close of the connection. On HTTP/2 this alone does NOT wake a + # body read parked on the flow-control timer, so we also forcibly interrupt + # the read task (as `:downloads` does for non-interruptible downloads). + io = io_ref[] + # `io` may be `nothing` if the consumer closed before the connection was + # established; in practice a consumer only stops after receiving an event + # (or a timer fires seconds later), so the connection is already up. This + # is the same theoretical hole the `:downloads` watcher has. + if io !== nothing + try + close(io) + catch + # already closed / natural EOF — nothing to abort + end + end + # `stream_to` also closes on natural EOF (the chunk reader closes it after + # the read task finishes); the `istaskdone` guard skips the interrupt in + # that case. For JuliaRun's watch consumers an interrupt of an already- + # finishing read is harmless anyway — it maps to `is_request_interrupted`, + # the same signal a read-idle timeout produces, which they already retry on. + if !istaskdone(read_task) + schedule(read_task, InterruptException(); error=true) + end + end end return http_response, output