From b09ebd1239abbddb99d18adff2a09f8dd7048907 Mon Sep 17 00:00:00 2001 From: Efe Karasakal Date: Sat, 4 Jul 2026 01:27:47 +0200 Subject: [PATCH] stream: preserve half-open duplexes in async iteration Signed-off-by: Efe Karasakal --- lib/internal/streams/readable.js | 9 ++- .../test-net-allow-half-open-async-iter.js | 80 +++++++++++++++++++ ...am-readable-async-iter-half-open-duplex.js | 41 ++++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) create mode 100644 test/parallel/test-net-allow-half-open-async-iter.js create mode 100644 test/parallel/test-stream-readable-async-iter-half-open-duplex.js diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js index f28e6905a9499f..b8c07443e93e2a 100644 --- a/lib/internal/streams/readable.js +++ b/lib/internal/streams/readable.js @@ -1418,9 +1418,16 @@ async function* createAsyncIterator(stream, options) { error = aggregateTwoErrors(error, err); throw error; } finally { + const preserveHalfOpenDuplex = + error === null && + stream.allowHalfOpen === true && + stream.writable === true && + stream.writableEnded !== true; + if ( (error || options?.destroyOnReturn !== false) && - (error === undefined || stream._readableState.autoDestroy) + (error === undefined || stream._readableState.autoDestroy) && + !preserveHalfOpenDuplex ) { destroyImpl.destroyer(stream, null); } else { diff --git a/test/parallel/test-net-allow-half-open-async-iter.js b/test/parallel/test-net-allow-half-open-async-iter.js new file mode 100644 index 00000000000000..30d82fc4e5b327 --- /dev/null +++ b/test/parallel/test-net-allow-half-open-async-iter.js @@ -0,0 +1,80 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const net = require('net'); + +(async function() { + let resolveServerSocket; + const serverSocketPromise = new Promise((resolve) => { + resolveServerSocket = resolve; + }); + + const server = net.createServer({ + allowHalfOpen: true, + }, common.mustCall((socket) => { + resolveServerSocket(socket); + })); + + server.on('error', common.mustNotCall()); + server.on('close', common.mustCall()); + + await new Promise((resolve) => { + server.listen(0, common.localhostIPv4, resolve); + }); + + const clientSocket = await new Promise((resolve) => { + const socket = net.createConnection({ + allowHalfOpen: true, + port: server.address().port, + host: server.address().address, + }, common.mustCall(() => { + resolve(socket); + })); + socket.on('error', common.mustNotCall()); + }); + + const serverSocket = await serverSocketPromise; + serverSocket.on('error', common.mustNotCall()); + + await new Promise((resolve, reject) => { + clientSocket.write('data written to client socket', (err) => { + if (err) reject(err); + else resolve(); + }); + }); + + await new Promise((resolve) => { + clientSocket.end(resolve); + }); + + let serverRead = ''; + for await (const chunk of serverSocket) { + serverRead += chunk; + } + + assert.strictEqual(serverRead, 'data written to client socket'); + assert.strictEqual(serverSocket.destroyed, false); + + await new Promise((resolve, reject) => { + serverSocket.write('data written to server socket', (err) => { + if (err) reject(err); + else resolve(); + }); + }); + + await new Promise((resolve) => { + serverSocket.end(resolve); + }); + + let clientRead = ''; + for await (const chunk of clientSocket) { + clientRead += chunk; + } + + assert.strictEqual(clientRead, 'data written to server socket'); + + await new Promise((resolve) => { + server.close(resolve); + }); +})().then(common.mustCall()); diff --git a/test/parallel/test-stream-readable-async-iter-half-open-duplex.js b/test/parallel/test-stream-readable-async-iter-half-open-duplex.js new file mode 100644 index 00000000000000..13e0d4b2f3dffc --- /dev/null +++ b/test/parallel/test-stream-readable-async-iter-half-open-duplex.js @@ -0,0 +1,41 @@ +'use strict'; + +const common = require('../common'); +const assert = require('assert'); +const { + Duplex, +} = require('stream'); + +{ + let written = ''; + + const duplex = new Duplex({ + allowHalfOpen: true, + read() { + this.push('hello'); + this.push(null); + }, + write(chunk, encoding, callback) { + written += chunk; + callback(); + }, + }); + + duplex.on('error', common.mustNotCall()); + duplex.on('close', common.mustCall()); + + (async () => { + let read = ''; + for await (const chunk of duplex) { + read += chunk; + } + + assert.strictEqual(read, 'hello'); + assert.strictEqual(duplex.destroyed, false); + + duplex.write('world', common.mustSucceed(() => { + assert.strictEqual(written, 'world'); + duplex.end(); + })); + })().then(common.mustCall()); +}