Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions src/workerd/api/streams/readable-source-adapter.c++
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ jsg::Promise<ReadableStreamSourceJsAdapter::ReadResult> ReadableStreamSourceJsAd
// always be at least as large as minBytes. This should be handled for us by
// the jsg::BufferSource, but just to be safe, we will double-check with a
// debug assert here.
KJ_DASSERT(buffer.size() % elementSize == 0);
KJ_ASSERT(buffer.size() % elementSize == 0);

auto minBytes = kj::min(options.minBytes.orDefault(elementSize), buffer.size());
// We want to be sure that minBytes is a multiple of the element size
Expand All @@ -221,20 +221,27 @@ jsg::Promise<ReadableStreamSourceJsAdapter::ReadResult> ReadableStreamSourceJsAd
// We only pass a kj::ArrayPtr to the buffer into the read call, keeping
// the actual buffer instance alive by attaching it to the JS promise
// chain that follows the read in order to keep it alive.

// Unfortunately we have to allocate a kj::Array here to read the data
// into then copy that into the provided data when the read completes.
// Why? For security! The buffer provided to us is a v8 allocated buffer
// that sits within the v8 sandbox but the read happens in kj-space outside
// of the isolate lock.
// TODO(soon): Use the memory protection key to allow kj-space to write directly
// into the v8 buffer without the copy?
auto destBuffer = kj::heapArray<kj::byte>(buffer.size());

auto promise = active.enqueue(kj::coCapture(
[&active, buffer = buffer.asArrayPtr(), minBytes]() mutable -> kj::Promise<size_t> {
// TODO(soon): The underlying kj streams API now supports passing the
// kj::ArrayPtr directly to the read call, but ReadableStreamSource has
// not yet been updated to do so. When it is, we can update this read to
// pass `buffer` directly rather than passing the begin() and size().
[&active, buffer = destBuffer.asPtr(), minBytes]() mutable -> kj::Promise<size_t> {
co_return co_await active.source->read(buffer, minBytes);
}));
return ioContext
.awaitIo(js, kj::mv(promise),
[buffer = kj::mv(buffer), self = selfRef.addRef()](jsg::Lock& js,
[destBuffer = kj::mv(destBuffer), buffer = kj::mv(buffer), self = selfRef.addRef()](
jsg::Lock& js,
size_t bytesRead) mutable -> jsg::Promise<ReadableStreamSourceJsAdapter::ReadResult> {
// If the bytesRead is 0, that indicates the stream is closed. We will
// move the stream to a closed state and return the empty buffer.
// If the bytesRead is 0, that indicates the stream is closed and nothing was
// read. We will move the stream to a closed state and return the empty buffer.
if (bytesRead == 0) {
self->runIfAlive([](ReadableStreamSourceJsAdapter& self) {
KJ_IF_SOME(open, self.state.tryGetActiveUnsafe()) {
Expand All @@ -259,8 +266,16 @@ jsg::Promise<ReadableStreamSourceJsAdapter::ReadResult> ReadableStreamSourceJsAd
buffer.getElementSize())));
}

// Now prepare the actual results.
auto backing = buffer.detach(js);

// Copy the data from the temporary destBuffer into the backing store.
backing.asArrayPtr().first(bytesRead).copyFrom(destBuffer.first(bytesRead));

// Now trim the backing store to the actual number of bytes read so that we can
// return the correct length buffer to the caller.
backing.limit(bytesRead);

return js.resolvedPromise(ReadResult{
.buffer = jsg::BufferSource(js, kj::mv(backing)),
.done = false,
Expand Down Expand Up @@ -348,6 +363,8 @@ jsg::Promise<jsg::JsRef<jsg::JsString>> ReadableStreamSourceJsAdapter::readAllTe
auto holder = kj::heap<Holder>();

auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise<size_t> {
// We are consuming the entire source here within kj space to prevent having to
// hop back and forth between kj and v8 as we read chunks of text.
auto str = co_await active.source->readAllText(limit);
size_t amount = str.size();
holder.result = kj::mv(str);
Expand Down Expand Up @@ -409,6 +426,8 @@ jsg::Promise<jsg::BufferSource> ReadableStreamSourceJsAdapter::readAllBytes(
auto holder = kj::heap<Holder>();

auto promise = active.enqueue([&active, &holder = *holder, limit]() -> kj::Promise<size_t> {
// We are consuming the entire source here within kj space to prevent having to
// hop back and forth between kj and v8 as we read chunks of text.
auto str = co_await active.source->readAllBytes(limit);
size_t amount = str.size();
holder.result = kj::mv(str);
Expand Down Expand Up @@ -1082,6 +1101,10 @@ kj::Promise<void> ReadableSourceKjAdapter::pumpToImpl(
// available data from the underlying JS stream in each iteration. This minimizes
// the number of isolate lock acquisitions by getting all available data at once
// rather than reading into fixed-size buffers.
//
// This does mean that the pumpToImpl does NOT use the same readImpl method as the
// regular read() call, which means that the pumpToImpl does not enforce the minReadPolicy.
// Using the DrainingReader is a significant optimization for the pump case.

KJ_DASSERT(active->state.is<Active::Idle>() || active->state.is<Active::Readable>(),
"pumpToImpl called when stream is not in an active state.");
Expand Down
16 changes: 14 additions & 2 deletions src/workerd/api/streams/readable-source-adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ namespace workerd::api::streams {
// └───────────────────────────────────────────┘

// Adapts a ReadableStreamSource to a JavaScript-friendly interface.
// This is considered a heap-owned object and does not need to be held by
// an IoOwn. It takes ownership of the underlying ReadableSource and will
// ensure that *it* is held by an IoOwn.
class ReadableStreamSourceJsAdapter final {
public:
ReadableStreamSourceJsAdapter(
Expand Down Expand Up @@ -337,7 +340,13 @@ class ReadableSourceKjAdapter final: public ReadableSource {
// implemented to avoid use-after-free if the adapter is dropped while a read
// is in progress.
//
// The returned promise will never resolve with more than maxBytes.
// The returned promise will never resolve with more than maxBytes
//
// While we provide an implementation here, the expectation is that most callers
// are going to be using the pumpTo method to completely consume the stream rather
// than calling read repeatedly, so we don't need to optimize this for high performance.
// The main goal of this method is to provide a safe and correct implementation of the
// ReadableSource contract.
kj::Promise<size_t> read(kj::ArrayPtr<kj::byte> buffer, size_t minBytes) override;

// Reads all remaining bytes from the stream and returns them.
Expand All @@ -352,6 +361,9 @@ class ReadableSourceKjAdapter final: public ReadableSource {
// Per the contract of pumpTo, it is the caller's responsibility to ensure
// that both the WritableStreamSink and this adapter remain alive until
// the returned promise resolves!
// DeferredProxy is NOT used here because the data source backing it is a
// JavaScript ReadableStream. The IoContext must be kept alive until the pump
// is fully complete.
kj::Promise<DeferredProxy<void>> pumpTo(WritableSink& output, EndAfterPump end) override;

// If the stream is still active, tries to get the total length,
Expand All @@ -363,7 +375,7 @@ class ReadableSourceKjAdapter final: public ReadableSource {
// Cancels the underlying source if it is still active.
void cancel(kj::Exception reason) override;

StreamEncoding getEncoding() override {
StreamEncoding getEncoding() override final {
// Our underlying ReadableStream produces non-encoded bytes (for now)
return StreamEncoding::IDENTITY;
};
Expand Down
139 changes: 70 additions & 69 deletions src/workerd/api/streams/writable-sink-adapter-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ KJ_TEST("Basic construction with default options") {
"Adapter should have default highWaterMark of 16384");
auto& options = KJ_ASSERT_NONNULL(adapter->getOptions());
KJ_ASSERT(options.highWaterMark == 16384);
KJ_ASSERT(options.detachOnWrite == false);

auto readyPromise = adapter->getReady(env.js);
KJ_ASSERT(readyPromise.getState(env.js) == jsg::Promise<void>::State::FULFILLED,
Expand All @@ -102,21 +101,6 @@ KJ_TEST("Construction with custom highWaterMark option") {
});
}

KJ_TEST("Construction with detachOnWrite=true option") {
TestFixture fixture;

fixture.runInIoContext([&](const TestFixture::Environment& env) {
auto sink =
newIoContextWrappedWritableSink(env.context, newWritableSink(newNullOutputStream()));
auto adapter = kj::heap<WritableStreamSinkJsAdapter>(env.js, env.context, kj::mv(sink),
WritableStreamSinkJsAdapter::Options{
.detachOnWrite = true,
});
auto& options = KJ_ASSERT_NONNULL(adapter->getOptions());
KJ_ASSERT(options.detachOnWrite == true);
});
}

KJ_TEST("Construction with all custom options combined") {
TestFixture fixture;

Expand All @@ -126,11 +110,9 @@ KJ_TEST("Construction with all custom options combined") {
auto adapter = kj::heap<WritableStreamSinkJsAdapter>(env.js, env.context, kj::mv(sink),
WritableStreamSinkJsAdapter::Options{
.highWaterMark = 100,
.detachOnWrite = true,
});
auto& options = KJ_ASSERT_NONNULL(adapter->getOptions());
KJ_ASSERT(options.highWaterMark == 100);
KJ_ASSERT(options.detachOnWrite == true);
});
}

Expand Down Expand Up @@ -643,7 +625,7 @@ KJ_TEST("writing small ArrayBuffer") {
jsg::JsValue handle(source.getHandle(env.js));

auto writePromise = adapter->write(env.js, handle);
KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should not have been called");
KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should have been called");
KJ_ASSERT(KJ_ASSERT_NONNULL(adapter->getDesiredSize()) == 0,
"Adapter's desired size should be 0 after writing highWaterMark bytes");

Expand Down Expand Up @@ -716,6 +698,70 @@ KJ_TEST("writing large ArrayBuffer") {
});
}

KJ_TEST("writing arrays") {
TestFixture fixture;

fixture.runInIoContext([&](const TestFixture::Environment& env) {
auto recordingSink = kj::heap<SimpleEventRecordingSink>();
auto& state = recordingSink->getState();
auto adapter = kj::heap<WritableStreamSinkJsAdapter>(env.js, env.context,
newWritableSink(kj::mv(recordingSink)),
WritableStreamSinkJsAdapter::Options{
.highWaterMark = 5,
});

auto array = env.js.arr(env.js.str("hello"_kj), env.js.str("world"_kj));
auto writePromise = adapter->write(env.js, array);

KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should have been called");
KJ_ASSERT(KJ_ASSERT_NONNULL(adapter->getDesiredSize()) == -5,
"Adapter's desired size should be negative after writing 10 bytes");

return env.context
.awaitJs(env.js, writePromise.then(env.js, [&state, &adapter = *adapter](jsg::Lock& js) {
KJ_ASSERT(state.writeCalled == 1, "Underlying sink's write() should have been called");
KJ_ASSERT(KJ_ASSERT_NONNULL(adapter.getDesiredSize()) == 5,
"Back to initial desired size after write completes");
})).attach(kj::mv(adapter));
});
}

KJ_TEST("throwing iterable works correctly") {
TestFixture fixture;

fixture.runInIoContext([&](const TestFixture::Environment& env) {
auto recordingSink = kj::heap<SimpleEventRecordingSink>();
auto& state = recordingSink->getState();
auto adapter = kj::heap<WritableStreamSinkJsAdapter>(env.js, env.context,
newWritableSink(kj::mv(recordingSink)),
WritableStreamSinkJsAdapter::Options{
.highWaterMark = 5,
});

// We need to construct our iterable...
auto getIter = jsg::check(
v8::Function::New(env.js.v8Context(), [](const v8::FunctionCallbackInfo<v8::Value>& args) {
auto& js = jsg::Lock::from(args.GetIsolate());
auto fn = jsg::check(
v8::Function::New(js.v8Context(), [](const v8::FunctionCallbackInfo<v8::Value>& args) {
args.GetIsolate()->ThrowError(v8::String::NewFromUtf8Literal(args.GetIsolate(), "Boom"));
}));
auto obj = js.obj();
obj.set(js, js.str("next"_kj), jsg::JsValue(fn));
v8::Local<v8::Value> result = obj;
args.GetReturnValue().Set(result);
}));

auto iter = env.js.obj();
iter.set(env.js, env.js.symbolIterator(), jsg::JsValue(getIter));

auto writePromise = adapter->write(env.js, iter);
KJ_ASSERT(writePromise.getState(env.js) == jsg::Promise<void>::State::REJECTED,
"Write of errored should be rejected");
KJ_ASSERT(state.writeCalled == 0, "Underlying sink's write() should not have been called");
});
}

KJ_TEST("writing the wrong types reject") {
TestFixture fixture;

Expand Down Expand Up @@ -743,6 +789,11 @@ KJ_TEST("writing the wrong types reject") {
auto writeObject = adapter->write(env.js, env.js.obj());
KJ_ASSERT(writeObject.getState(env.js) == jsg::Promise<void>::State::REJECTED,
"Write of plain object should be rejected");

auto badArray = env.js.arr(env.js.boolean(true));
auto writeBadArray = adapter->write(env.js, badArray);
KJ_ASSERT(writeBadArray.getState(env.js) == jsg::Promise<void>::State::REJECTED,
"Write of array with non-string, non-ArrayBuffer elements");
});
}

Expand Down Expand Up @@ -802,56 +853,6 @@ KJ_TEST("ready promise signals backpressure correctly") {
});
}

KJ_TEST("detachOnWrite option detaches ArrayBuffer before write") {
TestFixture fixture;

fixture.runInIoContext([&](const TestFixture::Environment& env) {
auto recordingSink = kj::heap<SimpleEventRecordingSink>();
auto adapter = kj::heap<WritableStreamSinkJsAdapter>(env.js, env.context,
newWritableSink(kj::mv(recordingSink)),
WritableStreamSinkJsAdapter::Options{
.detachOnWrite = true,
});

auto backing = jsg::BackingStore::alloc<v8::ArrayBuffer>(env.js, 10);
jsg::BufferSource source(env.js, kj::mv(backing));
KJ_ASSERT(!source.isDetached());
jsg::JsValue handle(source.getHandle(env.js));

auto writePromise = adapter->write(env.js, handle);

jsg::BufferSource source2(env.js, handle);
KJ_ASSERT(source2.size() == 0);

return env.context.awaitJs(env.js, kj::mv(writePromise)).attach(kj::mv(adapter));
});
}

KJ_TEST("detachOnWrite option detaches Uint8Array before write") {
TestFixture fixture;

fixture.runInIoContext([&](const TestFixture::Environment& env) {
auto recordingSink = kj::heap<SimpleEventRecordingSink>();
auto adapter = kj::heap<WritableStreamSinkJsAdapter>(env.js, env.context,
newWritableSink(kj::mv(recordingSink)),
WritableStreamSinkJsAdapter::Options{
.detachOnWrite = true,
});

auto backing = jsg::BackingStore::alloc<v8::Uint8Array>(env.js, 10);
jsg::BufferSource source(env.js, kj::mv(backing));
KJ_ASSERT(!source.isDetached());
jsg::JsValue handle(source.getHandle(env.js));

auto writePromise = adapter->write(env.js, handle);

jsg::BufferSource source2(env.js, handle);
KJ_ASSERT(source2.size() == 0);

return env.context.awaitJs(env.js, kj::mv(writePromise)).attach(kj::mv(adapter));
});
}

KJ_TEST("Creating adapter and dropping it with pending operations") {
TestFixture fixture;

Expand Down
Loading
Loading