diff --git a/src/workerd/api/streams/readable-source-adapter.c++ b/src/workerd/api/streams/readable-source-adapter.c++ index 7ee3350a83e..daf94c4bcfd 100644 --- a/src/workerd/api/streams/readable-source-adapter.c++ +++ b/src/workerd/api/streams/readable-source-adapter.c++ @@ -201,7 +201,7 @@ jsg::Promise ReadableStreamSourceJsAd // always be at least as large as minBytes. This should be handled for us by // the jsg::BufferSource, but just to be safe, we will double-check with a // debug assert here. - KJ_DASSERT(buffer.size() % elementSize == 0); + KJ_ASSERT(buffer.size() % elementSize == 0); auto minBytes = kj::min(options.minBytes.orDefault(elementSize), buffer.size()); // We want to be sure that minBytes is a multiple of the element size @@ -221,20 +221,27 @@ jsg::Promise ReadableStreamSourceJsAd // We only pass a kj::ArrayPtr to the buffer into the read call, keeping // the actual buffer instance alive by attaching it to the JS promise // chain that follows the read in order to keep it alive. + + // Unfortunately we have to allocate a kj::Array here to read the data + // into then copy that into the provided data when the read completes. + // Why? For security! The buffer provided to us is a v8 allocated buffer + // that sits within the v8 sandbox but the read happens in kj-space outside + // of the isolate lock. + // TODO(soon): Use the memory protection key to allow kj-space to write directly + // into the v8 buffer without the copy? + auto destBuffer = kj::heapArray(buffer.size()); + auto promise = active.enqueue(kj::coCapture( - [&active, buffer = buffer.asArrayPtr(), minBytes]() mutable -> kj::Promise { - // TODO(soon): The underlying kj streams API now supports passing the - // kj::ArrayPtr directly to the read call, but ReadableStreamSource has - // not yet been updated to do so. When it is, we can update this read to - // pass `buffer` directly rather than passing the begin() and size(). + [&active, buffer = destBuffer.asPtr(), minBytes]() mutable -> kj::Promise { co_return co_await active.source->read(buffer, minBytes); })); return ioContext .awaitIo(js, kj::mv(promise), - [buffer = kj::mv(buffer), self = selfRef.addRef()](jsg::Lock& js, + [destBuffer = kj::mv(destBuffer), buffer = kj::mv(buffer), self = selfRef.addRef()]( + jsg::Lock& js, size_t bytesRead) mutable -> jsg::Promise { - // If the bytesRead is 0, that indicates the stream is closed. We will - // move the stream to a closed state and return the empty buffer. + // If the bytesRead is 0, that indicates the stream is closed and nothing was + // read. We will move the stream to a closed state and return the empty buffer. if (bytesRead == 0) { self->runIfAlive([](ReadableStreamSourceJsAdapter& self) { KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) { @@ -259,8 +266,16 @@ jsg::Promise ReadableStreamSourceJsAd buffer.getElementSize()))); } + // Now prepare the actual results. auto backing = buffer.detach(js); + + // Copy the data from the temporary destBuffer into the backing store. + backing.asArrayPtr().first(bytesRead).copyFrom(destBuffer.first(bytesRead)); + + // Now trim the backing store to the actual number of bytes read so that we can + // return the correct length buffer to the caller. backing.limit(bytesRead); + return js.resolvedPromise(ReadResult{ .buffer = jsg::BufferSource(js, kj::mv(backing)), .done = false, @@ -348,6 +363,8 @@ jsg::Promise> ReadableStreamSourceJsAdapter::readAllTe auto holder = kj::heap(); auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise { + // We are consuming the entire source here within kj space to prevent having to + // hop back and forth between kj and v8 as we read chunks of text. auto str = co_await active.source->readAllText(limit); size_t amount = str.size(); holder.result = kj::mv(str); @@ -409,6 +426,8 @@ jsg::Promise ReadableStreamSourceJsAdapter::readAllBytes( auto holder = kj::heap(); auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise { + // We are consuming the entire source here within kj space to prevent having to + // hop back and forth between kj and v8 as we read chunks of text. auto str = co_await active.source->readAllBytes(limit); size_t amount = str.size(); holder.result = kj::mv(str); @@ -1082,6 +1101,10 @@ kj::Promise ReadableSourceKjAdapter::pumpToImpl( // available data from the underlying JS stream in each iteration. This minimizes // the number of isolate lock acquisitions by getting all available data at once // rather than reading into fixed-size buffers. + // + // This does mean that the pumpToImpl does NOT use the same readImpl method as the + // regular read() call, which means that the pumpToImpl does not enforce the minReadPolicy. + // Using the DrainingReader is a significant optimization for the pump case. KJ_DASSERT(active->state.is() || active->state.is(), "pumpToImpl called when stream is not in an active state."); diff --git a/src/workerd/api/streams/readable-source-adapter.h b/src/workerd/api/streams/readable-source-adapter.h index e167798bc06..807f9e5bf97 100644 --- a/src/workerd/api/streams/readable-source-adapter.h +++ b/src/workerd/api/streams/readable-source-adapter.h @@ -113,6 +113,9 @@ namespace workerd::api::streams { // └───────────────────────────────────────────┘ // Adapts a ReadableStreamSource to a JavaScript-friendly interface. +// This is considered a heap-owned object and does not need to be held by +// an IoOwn. It takes ownership of the underlying ReadableSource and will +// ensure that *it* is held by an IoOwn. class ReadableStreamSourceJsAdapter final { public: ReadableStreamSourceJsAdapter( @@ -337,7 +340,13 @@ class ReadableSourceKjAdapter final: public ReadableSource { // implemented to avoid use-after-free if the adapter is dropped while a read // is in progress. // - // The returned promise will never resolve with more than maxBytes. + // The returned promise will never resolve with more than maxBytes + // + // While we provide an implementation here, the expectation is that most callers + // are going to be using the pumpTo method to completely consume the stream rather + // than calling read repeatedly, so we don't need to optimize this for high performance. + // The main goal of this method is to provide a safe and correct implementation of the + // ReadableSource contract. kj::Promise read(kj::ArrayPtr buffer, size_t minBytes) override; // Reads all remaining bytes from the stream and returns them. @@ -352,6 +361,9 @@ class ReadableSourceKjAdapter final: public ReadableSource { // Per the contract of pumpTo, it is the caller's responsibility to ensure // that both the WritableStreamSink and this adapter remain alive until // the returned promise resolves! + // DeferredProxy is NOT used here because the data source backing it is a + // JavaScript ReadableStream. The IoContext must be kept alive until the pump + // is fully complete. kj::Promise> pumpTo(WritableSink& output, EndAfterPump end) override; // If the stream is still active, tries to get the total length, @@ -363,7 +375,7 @@ class ReadableSourceKjAdapter final: public ReadableSource { // Cancels the underlying source if it is still active. void cancel(kj::Exception reason) override; - StreamEncoding getEncoding() override { + StreamEncoding getEncoding() override final { // Our underlying ReadableStream produces non-encoded bytes (for now) return StreamEncoding::IDENTITY; }; diff --git a/src/workerd/api/streams/writable-sink-adapter-test.c++ b/src/workerd/api/streams/writable-sink-adapter-test.c++ index 2f2028683f6..74e0aa1b03a 100644 --- a/src/workerd/api/streams/writable-sink-adapter-test.c++ +++ b/src/workerd/api/streams/writable-sink-adapter-test.c++ @@ -80,7 +80,6 @@ KJ_TEST("Basic construction with default options") { "Adapter should have default highWaterMark of 16384"); auto& options = KJ_ASSERT_NONNULL(adapter->getOptions()); KJ_ASSERT(options.highWaterMark == 16384); - KJ_ASSERT(options.detachOnWrite == false); auto readyPromise = adapter->getReady(env.js); KJ_ASSERT(readyPromise.getState(env.js) == jsg::Promise::State::FULFILLED, @@ -102,21 +101,6 @@ KJ_TEST("Construction with custom highWaterMark option") { }); } -KJ_TEST("Construction with detachOnWrite=true option") { - TestFixture fixture; - - fixture.runInIoContext([&](const TestFixture::Environment& env) { - auto sink = - newIoContextWrappedWritableSink(env.context, newWritableSink(newNullOutputStream())); - auto adapter = kj::heap(env.js, env.context, kj::mv(sink), - WritableStreamSinkJsAdapter::Options{ - .detachOnWrite = true, - }); - auto& options = KJ_ASSERT_NONNULL(adapter->getOptions()); - KJ_ASSERT(options.detachOnWrite == true); - }); -} - KJ_TEST("Construction with all custom options combined") { TestFixture fixture; @@ -126,11 +110,9 @@ KJ_TEST("Construction with all custom options combined") { auto adapter = kj::heap(env.js, env.context, kj::mv(sink), WritableStreamSinkJsAdapter::Options{ .highWaterMark = 100, - .detachOnWrite = true, }); auto& options = KJ_ASSERT_NONNULL(adapter->getOptions()); KJ_ASSERT(options.highWaterMark == 100); - KJ_ASSERT(options.detachOnWrite == true); }); } @@ -643,7 +625,7 @@ KJ_TEST("writing small ArrayBuffer") { jsg::JsValue handle(source.getHandle(env.js)); auto writePromise = adapter->write(env.js, handle); - KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should not have been called"); + KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should have been called"); KJ_ASSERT(KJ_ASSERT_NONNULL(adapter->getDesiredSize()) == 0, "Adapter's desired size should be 0 after writing highWaterMark bytes"); @@ -716,6 +698,70 @@ KJ_TEST("writing large ArrayBuffer") { }); } +KJ_TEST("writing arrays") { + TestFixture fixture; + + fixture.runInIoContext([&](const TestFixture::Environment& env) { + auto recordingSink = kj::heap(); + auto& state = recordingSink->getState(); + auto adapter = kj::heap(env.js, env.context, + newWritableSink(kj::mv(recordingSink)), + WritableStreamSinkJsAdapter::Options{ + .highWaterMark = 5, + }); + + auto array = env.js.arr(env.js.str("hello"_kj), env.js.str("world"_kj)); + auto writePromise = adapter->write(env.js, array); + + KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should have been called"); + KJ_ASSERT(KJ_ASSERT_NONNULL(adapter->getDesiredSize()) == -5, + "Adapter's desired size should be negative after writing 10 bytes"); + + return env.context + .awaitJs(env.js, writePromise.then(env.js, [&state, &adapter = *adapter](jsg::Lock& js) { + KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should have been called"); + KJ_ASSERT(KJ_ASSERT_NONNULL(adapter.getDesiredSize()) == 5, + "Back to initial desired size after write completes"); + })).attach(kj::mv(adapter)); + }); +} + +KJ_TEST("throwing iterable works correctly") { + TestFixture fixture; + + fixture.runInIoContext([&](const TestFixture::Environment& env) { + auto recordingSink = kj::heap(); + auto& state = recordingSink->getState(); + auto adapter = kj::heap(env.js, env.context, + newWritableSink(kj::mv(recordingSink)), + WritableStreamSinkJsAdapter::Options{ + .highWaterMark = 5, + }); + + // We need to construct our iterable... + auto getIter = jsg::check( + v8::Function::New(env.js.v8Context(), [](const v8::FunctionCallbackInfo& args) { + auto& js = jsg::Lock::from(args.GetIsolate()); + auto fn = jsg::check( + v8::Function::New(js.v8Context(), [](const v8::FunctionCallbackInfo& args) { + args.GetIsolate()->ThrowError(v8::String::NewFromUtf8Literal(args.GetIsolate(), "Boom")); + })); + auto obj = js.obj(); + obj.set(js, js.str("next"_kj), jsg::JsValue(fn)); + v8::Local result = obj; + args.GetReturnValue().Set(result); + })); + + auto iter = env.js.obj(); + iter.set(env.js, env.js.symbolIterator(), jsg::JsValue(getIter)); + + auto writePromise = adapter->write(env.js, iter); + KJ_ASSERT(writePromise.getState(env.js) == jsg::Promise::State::REJECTED, + "Write of errored should be rejected"); + KJ_ASSERT(state.writeCalled == 0, "Underlying sink's write() should not have been called"); + }); +} + KJ_TEST("writing the wrong types reject") { TestFixture fixture; @@ -743,6 +789,11 @@ KJ_TEST("writing the wrong types reject") { auto writeObject = adapter->write(env.js, env.js.obj()); KJ_ASSERT(writeObject.getState(env.js) == jsg::Promise::State::REJECTED, "Write of plain object should be rejected"); + + auto badArray = env.js.arr(env.js.boolean(true)); + auto writeBadArray = adapter->write(env.js, badArray); + KJ_ASSERT(writeBadArray.getState(env.js) == jsg::Promise::State::REJECTED, + "Write of array with non-string, non-ArrayBuffer elements"); }); } @@ -802,56 +853,6 @@ KJ_TEST("ready promise signals backpressure correctly") { }); } -KJ_TEST("detachOnWrite option detaches ArrayBuffer before write") { - TestFixture fixture; - - fixture.runInIoContext([&](const TestFixture::Environment& env) { - auto recordingSink = kj::heap(); - auto adapter = kj::heap(env.js, env.context, - newWritableSink(kj::mv(recordingSink)), - WritableStreamSinkJsAdapter::Options{ - .detachOnWrite = true, - }); - - auto backing = jsg::BackingStore::alloc(env.js, 10); - jsg::BufferSource source(env.js, kj::mv(backing)); - KJ_ASSERT(!source.isDetached()); - jsg::JsValue handle(source.getHandle(env.js)); - - auto writePromise = adapter->write(env.js, handle); - - jsg::BufferSource source2(env.js, handle); - KJ_ASSERT(source2.size() == 0); - - return env.context.awaitJs(env.js, kj::mv(writePromise)).attach(kj::mv(adapter)); - }); -} - -KJ_TEST("detachOnWrite option detaches Uint8Array before write") { - TestFixture fixture; - - fixture.runInIoContext([&](const TestFixture::Environment& env) { - auto recordingSink = kj::heap(); - auto adapter = kj::heap(env.js, env.context, - newWritableSink(kj::mv(recordingSink)), - WritableStreamSinkJsAdapter::Options{ - .detachOnWrite = true, - }); - - auto backing = jsg::BackingStore::alloc(env.js, 10); - jsg::BufferSource source(env.js, kj::mv(backing)); - KJ_ASSERT(!source.isDetached()); - jsg::JsValue handle(source.getHandle(env.js)); - - auto writePromise = adapter->write(env.js, handle); - - jsg::BufferSource source2(env.js, handle); - KJ_ASSERT(source2.size() == 0); - - return env.context.awaitJs(env.js, kj::mv(writePromise)).attach(kj::mv(adapter)); - }); -} - KJ_TEST("Creating adapter and dropping it with pending operations") { TestFixture fixture; diff --git a/src/workerd/api/streams/writable-sink-adapter.c++ b/src/workerd/api/streams/writable-sink-adapter.c++ index 8a0d85df3f6..04a64befbdd 100644 --- a/src/workerd/api/streams/writable-sink-adapter.c++ +++ b/src/workerd/api/streams/writable-sink-adapter.c++ @@ -3,10 +3,75 @@ #include "writable.h" #include +#include #include namespace workerd::api::streams { +namespace { +kj::Maybe> getDataFromBufferSource( + jsg::Lock& js, const jsg::JsValue& value) { + KJ_IF_SOME(view, value.tryCast()) { + return kj::heapArray(view.asArrayPtr()); + } else KJ_IF_SOME(buffer, value.tryCast()) { + return kj::heapArray(buffer.asArrayPtr()); + } else KJ_IF_SOME(buffer, value.tryCast()) { + return kj::heapArray(buffer.asArrayPtr()); + } else KJ_IF_SOME(str, value.tryCast()) { + auto utf8 = str.toString(js); + if (utf8.size() == 0) return kj::Array(); + // This allows us to deal gracefully with the null-terminator + return utf8.asBytes().asConst().attach(utf8.releaseArray()); + } else if (value.isStringObject()) { + // While Boxed strings are rare, they are technically valid buffer sources, let's + // go ahead and support them by unboxing and converting to UTF-8. + auto utf8 = value.toString(js); + if (utf8.size() == 0) return kj::Array(); + // This allows us to deal gracefully with the null-terminator + return utf8.asBytes().asConst().attach(utf8.releaseArray()); + } + return kj::none; +} + +kj::Maybe>> tryGetDataFromIterable( + jsg::Lock& js, const jsg::JsValue& value) { + if (value.isString() || value.isStringObject() || value.isArrayBuffer() || + value.isArrayBufferView() || value.isSharedArrayBuffer()) { + return kj::none; + } + KJ_IF_SOME(obj, value.tryCast()) { + // Try getting the sync iterator. + auto maybeIter = obj.get(js, js.symbolIterator()); + KJ_IF_SOME(iterFn, maybeIter.tryCast()) { + auto maybeObj = iterFn.call(js, obj); + KJ_IF_SOME(iterObj, maybeObj.tryCast()) { + kj::Vector> pieces; + auto gen = jsg::Generator(js, iterObj); + // Start iterating through the iterable, converting each yielded value to bytes + // and accumulating the results in `pieces`. We only support synch iterables; + // and we require that the iterable be finite (max 64 pieces) to avoid silly + // memory exhaustion issues. If any of the yielded values are not value byte + // sources, we throw. + while (true) { + KJ_IF_SOME(next, gen.next(js)) { + JSG_REQUIRE( + pieces.size() <= 64, TypeError, "Too many pieces yielded from the iterable."); + auto data = JSG_REQUIRE_NONNULL(getDataFromBufferSource(js, next), TypeError, + "Iterable yielded a value that is not a valid buffer source or string."); + pieces.add(kj::mv(data)); + } else { + // We're done iterating. + break; + } + } + return pieces.releaseAsArray(); + } + } + } + return kj::none; +} +} // namespace + // The Active state maintains a queue of tasks, such as write or flush operations. Each task // contains a promise-returning function object and a fulfiller. When the first task is // enqueued, the active state begins processing the queue asynchronously. Each function @@ -199,107 +264,103 @@ jsg::Promise WritableStreamSinkJsAdapter::write(jsg::Lock& js, const jsg:: // Let's process our data and write it! auto& ioContext = IoContext::current(); - // We know that a WritableStreamSink only accepts bytes, so we need to - // verify that the value is a source of bytes. We accept three possible - // types: ArrayBuffer, ArrayBufferView, and String. If it is a string, - // we convert it to UTF-8 bytes. Anything else is an error. - if (value.isArrayBufferView() || value.isArrayBuffer() || value.isSharedArrayBuffer()) { - // We can just wrap the value with a jsg::BufferSource and write it. - jsg::BufferSource source(js, value); - if (active.options.detachOnWrite && source.canDetach(js)) { - // Detach from the original ArrayBuffer... - // ... and re-wrap it with a new BufferSource that we own. - source = jsg::BufferSource(js, source.detach(js)); - } - - // Zero-length writes are a no-op. - if (source.size() == 0) { - return js.resolvedPromise(); - } - - active.bytesInFlight += source.size(); - maybeSignalBackpressure(js); - // Enqueue the actual write operation into the write queue. We pass in - // two lambdas, one that does the actual write, and one that handles - // errors. If the write fails, we need to transition the adapter to the - // errored state. If the write succeeds, we need to decrement the - // bytesInFlight counter. - // - // The promise returned by enqueue is not the actual write promise but - // a branch forked off of it. We wrap that with a JS promise that waits - // for it to complete. Once it does, we check if we can release backpressure. - // This has to be done within an Isolate lock because we need to be able - // to resolve or reject the JS promises. If the write fails, we instead - // abort the backpressure state. - // - // This slight indirection does mean that the backpressure state change - // may be slightly delayed after the actual write completes but that's - // ok. - // - // Capturing active by reference here is safe because the lambda is - // held by the write queue, which is itself held by Active. If active - // is destroyed, the write queue is destroyed along with the lambda. - auto promise = - active.enqueue(kj::coCapture([&active, source = kj::mv(source)]() -> kj::Promise { - co_await active.sink->write(source.asArrayPtr()); - active.bytesInFlight -= source.size(); - })); + static const auto handleDone = [](jsg::Lock& js, IoContext& ioContext, kj::Promise promise, + auto& self) { return ioContext - .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { - // Why do we need a weak ref here? Well, because this is a JavaScript - // promise continuation. It is possible that the kj::Own holding our - // adapter can be dropped while we are waiting for the continuation - // to run. If that happens, we don't want to delay cleanup of the - // adapter just because of backpressure state management that would - // not be needed anymore, so we use a weak ref to update the backpressure - // state only if we are still alive. + .awaitIo(js, kj::mv(promise), [self = self.addRef()](jsg::Lock& js) { self->runIfAlive( [&](WritableStreamSinkJsAdapter& self) { self.maybeReleaseBackpressure(js); }); - }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value exception) { - auto error = jsg::JsValue(exception.getHandle(js)); + }).catch_(js, [self = self.addRef()](jsg::Lock& js, jsg::Value exception) { self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { + auto error = jsg::JsValue(exception.getHandle(js)); self.abort(js, error); self.backpressureState.abort(js, error); }); js.throwException(kj::mv(exception)); }); - } else if (value.isString()) { - // Also super easy! Let's just convert the string to UTF-8 - auto str = value.toString(js); + }; - // Zero-length writes are a no-op. - if (str.size() == 0) { - return js.resolvedPromise(); - } + // We know that a WritableStreamSink only accepts bytes, so we need to verify that the + // value is a source of bytes. We accept four possible types: ArrayBuffer, ArrayBufferView, + // String, and Iterables of these. If it is a string, we convert it to UTF-8 bytes. Anything + // else is an error. + // Due to V8 sandbox rules, we cannot safely directly access the memory of + // the ArrayBuffer or SharedArrayBuffer backing store(s) from outside of the + // isolate lock, instead we need to allocate copies. + // + // Because we are copying the data here, we don't need to worry about detaching + // the buffer or it being modified concurrently while we are writing it. If we + // avoid the copy later by using memory protection keys, we'll need to revisit + // this and make sure we are properly handling those cases. + // TODO(later): We can possibly optimize this by getting the memory protection key and + // avoiding the copy. + return js.tryCatch([&]() -> jsg::Promise { + // Let's check to see if this value is an iterable of buffer sources, allowing us to + // perform a vectorized write. This is a bit of a tradeoff. We pay the cost of the + // iterator protocol and some extra bookkeeping, but in exchange we limit the number of + // promises we need to create and await on, which can be a performance drain. + KJ_IF_SOME(pieces, tryGetDataFromIterable(js, value)) { + size_t totalSize = 0; + for (auto& piece: pieces) { + totalSize += piece.size(); + } - active.bytesInFlight += str.size(); - // Make sure to account for the memory used by the string while the - // write is in-flight/pending - auto accounting = js.getExternalMemoryAdjustment(str.size()); - maybeSignalBackpressure(js); - // Just like above, enqueue the write operation into the write queue, - // ensuring that we handle both the success and failure cases. - auto promise = active.enqueue(kj::coCapture( - [&active, str = kj::mv(str), accounting = kj::mv(accounting)]() -> kj::Promise { - co_await active.sink->write(str.asBytes()); - active.bytesInFlight -= str.size(); - })); - return ioContext - .awaitIo(js, kj::mv(promise), [self = selfRef.addRef()](jsg::Lock& js) { - self->runIfAlive( - [&](WritableStreamSinkJsAdapter& self) { self.maybeReleaseBackpressure(js); }); - }).catch_(js, [self = selfRef.addRef()](jsg::Lock& js, jsg::Value exception) { - auto error = jsg::JsValue(exception.getHandle(js)); - self->runIfAlive([&](WritableStreamSinkJsAdapter& self) { - self.abort(js, error); - self.backpressureState.abort(js, error); - }); - js.throwException(kj::mv(exception)); - }); - } + if (totalSize == 0) { + return js.resolvedPromise(); + } - auto err = js.typeError("This WritableStream only supports writing byte types."_kj); - return js.rejectedPromise(err); + active.bytesInFlight += totalSize; + + maybeSignalBackpressure(js); + auto promise = active.enqueue(kj::coCapture( + [&active, source = kj::mv(pieces), totalSize]() mutable -> kj::Promise { + auto pieces = KJ_MAP(piece, source) { return piece.asPtr().asConst(); }; + co_await active.sink->write(pieces); + active.bytesInFlight -= totalSize; + })); + return handleDone(js, ioContext, kj::mv(promise), selfRef); + } else KJ_IF_SOME(source, getDataFromBufferSource(js, value)) { + // Zero-length writes are a no-op. + if (source.size() == 0) { + return js.resolvedPromise(); + } + + active.bytesInFlight += source.size(); + + maybeSignalBackpressure(js); + // Enqueue the actual write operation into the write queue. We pass in + // two lambdas, one that does the actual write, and one that handles + // errors. If the write fails, we need to transition the adapter to the + // errored state. If the write succeeds, we need to decrement the + // bytesInFlight counter. + // + // The promise returned by enqueue is not the actual write promise but + // a branch forked off of it. We wrap that with a JS promise that waits + // for it to complete. Once it does, we check if we can release backpressure. + // This has to be done within an Isolate lock because we need to be able + // to resolve or reject the JS promises. If the write fails, we instead + // abort the backpressure state. + // + // This slight indirection does mean that the backpressure state change + // may be slightly delayed after the actual write completes but that's + // ok. + // + // Capturing active by reference here is safe because the lambda is + // held by the write queue, which is itself held by Active. If active + // is destroyed, the write queue is destroyed along with the lambda. + auto promise = + active.enqueue(kj::coCapture([&active, source = kj::mv(source)]() -> kj::Promise { + co_await active.sink->write(source.asPtr()); + active.bytesInFlight -= source.size(); + })); + return handleDone(js, ioContext, kj::mv(promise), selfRef); + } + + auto err = js.typeError("This WritableStream only supports writing byte types."_kj); + return js.rejectedPromise(err); + }, [&](jsg::Value exception) -> jsg::Promise { + return js.rejectedPromise(kj::mv(exception)); + }); } jsg::Promise WritableStreamSinkJsAdapter::flush(jsg::Lock& js) { diff --git a/src/workerd/api/streams/writable-sink-adapter.h b/src/workerd/api/streams/writable-sink-adapter.h index 5a1262577f6..fd8f06b503d 100644 --- a/src/workerd/api/streams/writable-sink-adapter.h +++ b/src/workerd/api/streams/writable-sink-adapter.h @@ -112,13 +112,6 @@ class WritableStreamSinkJsAdapter final { // us to be able to buffer a bit more data in flight. So we will implement // a simple high water mark mechanism. The default is 16KB. size_t highWaterMark = 16384; - - // When detachOnWrite is true, and a write() is made with an ArrayBuffer, - // or ArrayBufferView, we will attempt to detach the underlying buffer - // before writing it to the sink. Detaching is required by the - // streams spec but our original implementation does not detach - // and it turns out there are old workers depending on that behavior. - bool detachOnWrite = false; }; WritableStreamSinkJsAdapter(jsg::Lock& js, diff --git a/src/workerd/jsg/iterator.h b/src/workerd/jsg/iterator.h index 7050ca18a51..45bc6cbab59 100644 --- a/src/workerd/jsg/iterator.h +++ b/src/workerd/jsg/iterator.h @@ -37,6 +37,29 @@ static kj::Maybe tryGetGeneratorFunction( kj::Maybe>(object)); } +template +static kj::Maybe tryGetGeneratorFunctionJsValue( + Lock& js, JsObject& object, kj::StringPtr name) { + auto val = object.get(js, name); + KJ_IF_SOME(fn, val.tryCast()) { + return Signature([fn = JsRef(js, fn), obj = JsRef(js, object)]( + jsg::Lock& js, Optional arg) -> GeneratorNext { + return js.tryCatch([&]() -> GeneratorNext { + auto result = fn.getHandle(js).call(js, obj.getHandle(js), arg.orDefault(js.undefined())); + auto obj = JSG_REQUIRE_NONNULL( + result.tryCast(), TypeError, "Generator method did not return an object"); + auto done = obj.get(js, "done"_kj).tryCast().orDefault(js.boolean(false)); + auto value = obj.get(js, "value"_kj); + return GeneratorNext{ + .done = done.value(js), + .value = kj::mv(value), + }; + }, [&](Value exception) -> GeneratorNext { js.throwException(kj::mv(exception)); }); + }); + } + return kj::none; +} + template class Generator final { // See the documentation in jsg.h @@ -105,7 +128,7 @@ class Generator final { } void visitForGc(GcVisitor& visitor) { - visitForGc(maybeActive); + visitor.visit(maybeActive); } private: @@ -137,6 +160,103 @@ class Generator final { kj::Maybe maybeActive; }; +// A variation of Generator that always returns JsValue. It will never need to unwrap +// the yielded values to C++ types so it can skip the TypeWrapper ceremony. +template <> +class Generator final { + public: + Generator(Lock& js, JsObject object): maybeActive(Active(js, object)) {} + Generator(Generator&&) = default; + Generator& operator=(Generator&&) = default; + KJ_DISALLOW_COPY(Generator); + + // If nothing is returned, the generator is complete. + kj::Maybe next(Lock& js) { + KJ_IF_SOME(active, maybeActive) { + KJ_IF_SOME(nextfn, active.maybeNext) { + return js.tryCatch([&] { + auto result = nextfn(js, js.undefined()); + if (result.done || result.value == kj::none) { + maybeActive = kj::none; + return kj::Maybe(); + } + return result.value; + }, [&](Value exception) { return throw_(js, kj::mv(exception)); }); + } + maybeActive = kj::none; + } + return kj::none; + } + + // If nothing is returned, the generator is complete. + kj::Maybe return_(Lock& js, kj::Maybe maybeValue = kj::none) { + KJ_IF_SOME(active, maybeActive) { + KJ_IF_SOME(returnFn, active.maybeReturn) { + return js.tryCatch([&] { + auto result = returnFn(js, kj::mv(maybeValue)); + if (result.done || result.value == kj::none) { + maybeActive = kj::none; + } + return result.value; + }, [&](Value exception) { return throw_(js, kj::mv(exception)); }); + } + maybeActive = kj::none; + } + return kj::none; + } + + // If nothing is returned, the generator is complete. If there + // is no throw handler in the generator, the method will throw. + // It's also possible (and even likely) that the throw handler + // will just re-throw the exception. + kj::Maybe throw_(Lock& js, Value exception) { + KJ_IF_SOME(active, maybeActive) { + KJ_IF_SOME(throwFn, active.maybeThrow) { + return js.tryCatch([&] -> kj::Maybe { + auto result = throwFn(js, JsValue(exception.getHandle(js))); + if (result.done || result.value == kj::none) { + maybeActive = kj::none; + } + return result.value; + }, [&](Value exception) -> kj::Maybe { + maybeActive = kj::none; + js.throwException(kj::mv(exception)); + }); + } + } + js.throwException(kj::mv(exception)); + } + + void visitForGc(GcVisitor& visitor) { + visitor.visit(maybeActive); + } + + private: + using Next = GeneratorNext; + using NextSignature = Function)>; + using ReturnSignature = Function)>; + using ThrowSignature = Function)>; + + struct Active final { + kj::Maybe maybeNext; + kj::Maybe maybeReturn; + kj::Maybe maybeThrow; + + Active(Lock& js, JsObject object) + : maybeNext(tryGetGeneratorFunctionJsValue(js, object, "next"_kj)), + maybeReturn(tryGetGeneratorFunctionJsValue(js, object, "return"_kj)), + maybeThrow(tryGetGeneratorFunctionJsValue(js, object, "throw"_kj)) {} + Active(Active&&) = default; + Active& operator=(Active&&) = default; + KJ_DISALLOW_COPY(Active); + + void visitForGc(GcVisitor& visitor) { + visitor.visit(maybeNext, maybeReturn, maybeThrow); + } + }; + kj::Maybe maybeActive; +}; + template class AsyncGenerator final { // See the documentation in jsg.h diff --git a/src/workerd/jsg/jsvalue.h b/src/workerd/jsg/jsvalue.h index 61481f4521d..da8c21cf9b1 100644 --- a/src/workerd/jsg/jsvalue.h +++ b/src/workerd/jsg/jsvalue.h @@ -238,6 +238,18 @@ class JsArrayBuffer final: public JsBase { using JsBase::JsBase; }; +class JsSharedArrayBuffer final: public JsBase { + public: + kj::ArrayPtr asArrayPtr() { + v8::Local inner = *this; + void* data = inner->GetBackingStore()->Data(); + size_t length = inner->ByteLength(); + return kj::ArrayPtr(static_cast(data), length); + } + + using JsBase::JsBase; +}; + class JsArrayBufferView final: public JsBase { public: template