Compare commits

...

3 Commits

Author SHA1 Message Date
Cursor Agent
4465b11b95 Add CompressionStream and DecompressionStream support 2025-06-24 00:41:37 +00:00
Cursor Agent
42a360f192 Checkpoint before follow-up message 2025-06-24 00:38:47 +00:00
Cursor Agent
300f3ecc87 Add CompressionStream and DecompressionStream Web API support 2025-06-24 00:33:50 +00:00
18 changed files with 2121 additions and 1 deletions

View File

@@ -1668,6 +1668,32 @@ declare var TextEncoderStream: Bun.__internal.UseLibDomIfAvailable<
{ prototype: TextEncoderStream; new (): TextEncoderStream }
>;
type CompressionFormat = "gzip" | "deflate" | "deflate-raw";
interface CompressionStream {
readonly readable: ReadableStream<Uint8Array>;
readonly writable: WritableStream<BufferSource>;
}
declare var CompressionStream: Bun.__internal.UseLibDomIfAvailable<
"CompressionStream",
{
prototype: CompressionStream;
new (format?: CompressionFormat): CompressionStream;
}
>;
interface DecompressionStream {
readonly readable: ReadableStream<Uint8Array>;
readonly writable: WritableStream<BufferSource>;
}
declare var DecompressionStream: Bun.__internal.UseLibDomIfAvailable<
"DecompressionStream",
{
prototype: DecompressionStream;
new (format?: CompressionFormat): DecompressionStream;
}
>;
interface URLSearchParams {}
declare var URLSearchParams: Bun.__internal.UseLibDomIfAvailable<
"URLSearchParams",

View File

@@ -125,6 +125,8 @@
#include "JSTextEncoderStream.h"
#include "JSTextDecoderStream.h"
#include "JSTransformStream.h"
#include "JSCompressionStream.h"
#include "JSDecompressionStream.h"
#include "JSTransformStreamDefaultController.h"
#include "JSURLSearchParams.h"
#include "JSWebSocket.h"
@@ -1482,6 +1484,8 @@ WEBCORE_GENERATED_CONSTRUCTOR_GETTER(TextEncoder);
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(TextEncoderStream);
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(TextDecoderStream);
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(TransformStream)
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(CompressionStream);
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(DecompressionStream);
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(TransformStreamDefaultController)
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(URLSearchParams);
WEBCORE_GENERATED_CONSTRUCTOR_GETTER(WebSocket);

View File

@@ -506,6 +506,8 @@ public:
std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForTextEncoder;
std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForTextEncoderStream;
// std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForTextEncoderStreamEncoder;
std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForCompressionStream;
std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForDecompressionStream;
// std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForTextEvent;
// std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForTransitionEvent;
// std::unique_ptr<GCClient::IsoSubspace> m_clientSubspaceForTreeWalker;

View File

@@ -492,6 +492,8 @@ public:
std::unique_ptr<IsoSubspace> m_subspaceForTextEncoder;
std::unique_ptr<IsoSubspace> m_subspaceForTextEncoderStream;
// std::unique_ptr<IsoSubspace> m_subspaceForTextEncoderStreamEncoder;
std::unique_ptr<IsoSubspace> m_subspaceForCompressionStream;
std::unique_ptr<IsoSubspace> m_subspaceForDecompressionStream;
// std::unique_ptr<IsoSubspace> m_subspaceForTextEvent;
// std::unique_ptr<IsoSubspace> m_subspaceForTransitionEvent;
// std::unique_ptr<IsoSubspace> m_subspaceForTreeWalker;

View File

@@ -0,0 +1,188 @@
#include "config.h"
#include "JSCompressionStream.h"
#include "ExtendedDOMClientIsoSubspaces.h"
#include "ExtendedDOMIsoSubspaces.h"
#include "JSDOMAttribute.h"
#include "JSDOMBinding.h"
#include "JSDOMBuiltinConstructor.h"
#include "JSDOMExceptionHandling.h"
#include "JSDOMGlobalObjectInlines.h"
#include "JSDOMWrapperCache.h"
#include "WebCoreJSClientData.h"
#include <JavaScriptCore/FunctionPrototype.h>
#include <JavaScriptCore/JSCInlines.h>
#include <JavaScriptCore/JSDestructibleObjectHeapCellType.h>
#include <JavaScriptCore/SlotVisitorMacros.h>
#include <JavaScriptCore/SubspaceInlines.h>
#include <wtf/GetPtr.h>
#include <wtf/PointerPreparations.h>
namespace WebCore {
using namespace JSC;
// Import from the Zig side
extern "C" JSC::EncodedJSValue CompressionStream__construct(JSC::JSGlobalObject*, JSC::CallFrame*);
// Attributes
static JSC_DECLARE_CUSTOM_GETTER(jsCompressionStreamConstructor);
class JSCompressionStreamPrototype final : public JSC::JSNonFinalObject {
public:
using Base = JSC::JSNonFinalObject;
static JSCompressionStreamPrototype* create(JSC::VM& vm, JSDOMGlobalObject* globalObject, JSC::Structure* structure)
{
JSCompressionStreamPrototype* ptr = new (NotNull, JSC::allocateCell<JSCompressionStreamPrototype>(vm)) JSCompressionStreamPrototype(vm, globalObject, structure);
ptr->finishCreation(vm);
return ptr;
}
DECLARE_INFO;
template<typename CellType, JSC::SubspaceAccess>
static JSC::GCClient::IsoSubspace* subspaceFor(JSC::VM& vm)
{
STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSCompressionStreamPrototype, Base);
return &vm.plainObjectSpace();
}
static JSC::Structure* createStructure(JSC::VM& vm, JSC::JSGlobalObject* globalObject, JSC::JSValue prototype)
{
return JSC::Structure::create(vm, globalObject, prototype, JSC::TypeInfo(JSC::ObjectType, StructureFlags), info());
}
private:
JSCompressionStreamPrototype(JSC::VM& vm, JSC::JSGlobalObject*, JSC::Structure* structure)
: JSC::JSNonFinalObject(vm, structure)
{
}
void finishCreation(JSC::VM&);
};
STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSCompressionStreamPrototype, JSCompressionStreamPrototype::Base);
using JSCompressionStreamDOMConstructor = JSDOMBuiltinConstructor<JSCompressionStream>;
template<> const ClassInfo JSCompressionStreamDOMConstructor::s_info = { "CompressionStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSCompressionStreamDOMConstructor) };
template<> JSValue JSCompressionStreamDOMConstructor::prototypeForStructure(JSC::VM& vm, const JSDOMGlobalObject& globalObject)
{
UNUSED_PARAM(vm);
return globalObject.functionPrototype();
}
template<> void JSCompressionStreamDOMConstructor::initializeProperties(VM& vm, JSDOMGlobalObject& globalObject)
{
putDirect(vm, vm.propertyNames->length, jsNumber(0), JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum);
JSString* nameString = jsNontrivialString(vm, "CompressionStream"_s);
m_originalName.set(vm, this, nameString);
putDirect(vm, vm.propertyNames->name, nameString, JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum);
putDirect(vm, vm.propertyNames->prototype, JSCompressionStream::prototype(vm, globalObject), JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum | JSC::PropertyAttribute::DontDelete);
}
template<> FunctionExecutable* JSCompressionStreamDOMConstructor::initializeExecutable(VM& vm)
{
return compressionStreamInitializeCompressionStreamCodeGenerator(vm);
}
// Custom constructor that calls into Zig
template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSCompressionStreamDOMConstructor::construct(JSGlobalObject* lexicalGlobalObject, CallFrame* callFrame)
{
auto& vm = lexicalGlobalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// Get the constructor and newTarget
auto* castedThis = jsCast<JSCompressionStreamDOMConstructor*>(callFrame->jsCallee());
ASSERT(castedThis);
// Create structure for the new instance
auto* structure = castedThis->getDOMStructureForJSObject(lexicalGlobalObject, asObject(callFrame->newTarget()));
if (!structure) [[unlikely]]
return {};
// Create the JS object
auto* jsObject = JSCompressionStream::create(structure, castedThis->globalObject());
// Call the JS initializer function with the object as 'this'
JSC::call(lexicalGlobalObject, castedThis->initializeFunction(), jsObject, JSC::ArgList(callFrame), "This error should never occur: initialize function is guaranteed to be callable."_s);
RETURN_IF_EXCEPTION(scope, {});
// Now call the Zig constructor to set up the native parts
JSC::MarkedArgumentBuffer args;
args.append(jsObject);
for (size_t i = 0; i < callFrame->argumentCount(); ++i) {
args.append(callFrame->argument(i));
}
auto result = CompressionStream__construct(lexicalGlobalObject, callFrame);
RETURN_IF_EXCEPTION(scope, {});
return JSValue::encode(jsObject);
}
/* Hash table for prototype */
static const HashTableValue JSCompressionStreamPrototypeTableValues[] = {
{ "constructor"_s, static_cast<unsigned>(PropertyAttribute::DontEnum), NoIntrinsic, { HashTableValue::GetterSetterType, jsCompressionStreamConstructor, 0 } },
{ "readable"_s, JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::Accessor | JSC::PropertyAttribute::Builtin, NoIntrinsic, { HashTableValue::BuiltinAccessorType, compressionStreamReadableCodeGenerator, 0 } },
{ "writable"_s, JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::Accessor | JSC::PropertyAttribute::Builtin, NoIntrinsic, { HashTableValue::BuiltinAccessorType, compressionStreamWritableCodeGenerator, 0 } },
};
const ClassInfo JSCompressionStreamPrototype::s_info = { "CompressionStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSCompressionStreamPrototype) };
void JSCompressionStreamPrototype::finishCreation(VM& vm)
{
Base::finishCreation(vm);
reifyStaticProperties(vm, JSCompressionStream::info(), JSCompressionStreamPrototypeTableValues, *this);
JSC_TO_STRING_TAG_WITHOUT_TRANSITION();
}
const ClassInfo JSCompressionStream::s_info = { "CompressionStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSCompressionStream) };
JSCompressionStream::JSCompressionStream(Structure* structure, JSDOMGlobalObject& globalObject)
: JSDOMObject(structure, globalObject)
{
}
JSObject* JSCompressionStream::createPrototype(VM& vm, JSDOMGlobalObject& globalObject)
{
auto* structure = JSCompressionStreamPrototype::createStructure(vm, &globalObject, globalObject.objectPrototype());
structure->setMayBePrototype(true);
return JSCompressionStreamPrototype::create(vm, &globalObject, structure);
}
JSObject* JSCompressionStream::prototype(VM& vm, JSDOMGlobalObject& globalObject)
{
return getDOMPrototype<JSCompressionStream>(vm, globalObject);
}
JSValue JSCompressionStream::getConstructor(VM& vm, const JSGlobalObject* globalObject)
{
return getDOMConstructor<JSCompressionStreamDOMConstructor, DOMConstructorID::CompressionStream>(vm, *jsCast<const JSDOMGlobalObject*>(globalObject));
}
void JSCompressionStream::destroy(JSC::JSCell* cell)
{
JSCompressionStream* thisObject = static_cast<JSCompressionStream*>(cell);
thisObject->JSCompressionStream::~JSCompressionStream();
}
JSC_DEFINE_CUSTOM_GETTER(jsCompressionStreamConstructor, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName))
{
auto& vm = JSC::getVM(lexicalGlobalObject);
auto throwScope = DECLARE_THROW_SCOPE(vm);
auto* prototype = jsDynamicCast<JSCompressionStreamPrototype*>(JSValue::decode(thisValue));
if (!prototype) [[unlikely]]
return throwVMTypeError(lexicalGlobalObject, throwScope);
return JSValue::encode(JSCompressionStream::getConstructor(vm, prototype->globalObject()));
}
JSC::GCClient::IsoSubspace* JSCompressionStream::subspaceForImpl(JSC::VM& vm)
{
return WebCore::subspaceForImpl<JSCompressionStream, UseCustomHeapCellType::No>(
vm,
[](auto& spaces) { return spaces.m_clientSubspaceForCompressionStream.get(); },
[](auto& spaces, auto&& space) { spaces.m_clientSubspaceForCompressionStream = std::forward<decltype(space)>(space); },
[](auto& spaces) { return spaces.m_subspaceForCompressionStream.get(); },
[](auto& spaces, auto&& space) { spaces.m_subspaceForCompressionStream = std::forward<decltype(space)>(space); });
}
} // namespace WebCore

View File

@@ -0,0 +1,47 @@
#pragma once
#include "JSDOMObject.h"
#include <JavaScriptCore/JSGlobalObject.h>
#include <JavaScriptCore/JSObject.h>
#include <JavaScriptCore/JSValue.h>
namespace WebCore {
class JSCompressionStream : public JSDOMObject {
public:
using Base = JSDOMObject;
static JSCompressionStream* create(JSC::Structure* structure, JSDOMGlobalObject* globalObject)
{
auto& vm = JSC::getVM(globalObject);
JSCompressionStream* ptr = new (NotNull, JSC::allocateCell<JSCompressionStream>(vm)) JSCompressionStream(structure, *globalObject);
ptr->finishCreation(vm);
return ptr;
}
static JSC::JSObject* createPrototype(JSC::VM&, JSDOMGlobalObject&);
static JSC::JSObject* prototype(JSC::VM&, JSDOMGlobalObject&);
static void destroy(JSC::JSCell*);
DECLARE_INFO;
static JSC::Structure* createStructure(JSC::VM& vm, JSC::JSGlobalObject* globalObject, JSC::JSValue prototype)
{
return JSC::Structure::create(vm, globalObject, prototype, JSC::TypeInfo(JSC::ObjectType, StructureFlags), info(), JSC::NonArray);
}
static JSC::JSValue getConstructor(JSC::VM&, const JSC::JSGlobalObject*);
template<typename, JSC::SubspaceAccess mode> static JSC::GCClient::IsoSubspace* subspaceFor(JSC::VM& vm)
{
if constexpr (mode == JSC::SubspaceAccess::Concurrently)
return nullptr;
return subspaceForImpl(vm);
}
static JSC::GCClient::IsoSubspace* subspaceForImpl(JSC::VM& vm);
protected:
JSCompressionStream(JSC::Structure*, JSDOMGlobalObject&);
DECLARE_DEFAULT_FINISH_CREATION;
};
} // namespace WebCore

View File

@@ -0,0 +1,188 @@
#include "config.h"
#include "JSDecompressionStream.h"
#include "ExtendedDOMClientIsoSubspaces.h"
#include "ExtendedDOMIsoSubspaces.h"
#include "JSDOMAttribute.h"
#include "JSDOMBinding.h"
#include "JSDOMBuiltinConstructor.h"
#include "JSDOMExceptionHandling.h"
#include "JSDOMGlobalObjectInlines.h"
#include "JSDOMWrapperCache.h"
#include "WebCoreJSClientData.h"
#include <JavaScriptCore/FunctionPrototype.h>
#include <JavaScriptCore/JSCInlines.h>
#include <JavaScriptCore/JSDestructibleObjectHeapCellType.h>
#include <JavaScriptCore/SlotVisitorMacros.h>
#include <JavaScriptCore/SubspaceInlines.h>
#include <wtf/GetPtr.h>
#include <wtf/PointerPreparations.h>
namespace WebCore {
using namespace JSC;
// Import from the Zig side
extern "C" JSC::EncodedJSValue DecompressionStream__construct(JSC::JSGlobalObject*, JSC::CallFrame*);
// Attributes
static JSC_DECLARE_CUSTOM_GETTER(jsDecompressionStreamConstructor);
class JSDecompressionStreamPrototype final : public JSC::JSNonFinalObject {
public:
using Base = JSC::JSNonFinalObject;
static JSDecompressionStreamPrototype* create(JSC::VM& vm, JSDOMGlobalObject* globalObject, JSC::Structure* structure)
{
JSDecompressionStreamPrototype* ptr = new (NotNull, JSC::allocateCell<JSDecompressionStreamPrototype>(vm)) JSDecompressionStreamPrototype(vm, globalObject, structure);
ptr->finishCreation(vm);
return ptr;
}
DECLARE_INFO;
template<typename CellType, JSC::SubspaceAccess>
static JSC::GCClient::IsoSubspace* subspaceFor(JSC::VM& vm)
{
STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSDecompressionStreamPrototype, Base);
return &vm.plainObjectSpace();
}
static JSC::Structure* createStructure(JSC::VM& vm, JSC::JSGlobalObject* globalObject, JSC::JSValue prototype)
{
return JSC::Structure::create(vm, globalObject, prototype, JSC::TypeInfo(JSC::ObjectType, StructureFlags), info());
}
private:
JSDecompressionStreamPrototype(JSC::VM& vm, JSC::JSGlobalObject*, JSC::Structure* structure)
: JSC::JSNonFinalObject(vm, structure)
{
}
void finishCreation(JSC::VM&);
};
STATIC_ASSERT_ISO_SUBSPACE_SHARABLE(JSDecompressionStreamPrototype, JSDecompressionStreamPrototype::Base);
using JSDecompressionStreamDOMConstructor = JSDOMBuiltinConstructor<JSDecompressionStream>;
template<> const ClassInfo JSDecompressionStreamDOMConstructor::s_info = { "DecompressionStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSDecompressionStreamDOMConstructor) };
template<> JSValue JSDecompressionStreamDOMConstructor::prototypeForStructure(JSC::VM& vm, const JSDOMGlobalObject& globalObject)
{
UNUSED_PARAM(vm);
return globalObject.functionPrototype();
}
template<> void JSDecompressionStreamDOMConstructor::initializeProperties(VM& vm, JSDOMGlobalObject& globalObject)
{
putDirect(vm, vm.propertyNames->length, jsNumber(0), JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum);
JSString* nameString = jsNontrivialString(vm, "DecompressionStream"_s);
m_originalName.set(vm, this, nameString);
putDirect(vm, vm.propertyNames->name, nameString, JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum);
putDirect(vm, vm.propertyNames->prototype, JSDecompressionStream::prototype(vm, globalObject), JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::DontEnum | JSC::PropertyAttribute::DontDelete);
}
template<> FunctionExecutable* JSDecompressionStreamDOMConstructor::initializeExecutable(VM& vm)
{
return decompressionStreamInitializeDecompressionStreamCodeGenerator(vm);
}
// Custom constructor that calls into Zig
template<> EncodedJSValue JSC_HOST_CALL_ATTRIBUTES JSDecompressionStreamDOMConstructor::construct(JSGlobalObject* lexicalGlobalObject, CallFrame* callFrame)
{
auto& vm = lexicalGlobalObject->vm();
auto scope = DECLARE_THROW_SCOPE(vm);
// Get the constructor and newTarget
auto* castedThis = jsCast<JSDecompressionStreamDOMConstructor*>(callFrame->jsCallee());
ASSERT(castedThis);
// Create structure for the new instance
auto* structure = castedThis->getDOMStructureForJSObject(lexicalGlobalObject, asObject(callFrame->newTarget()));
if (!structure) [[unlikely]]
return {};
// Create the JS object
auto* jsObject = JSDecompressionStream::create(structure, castedThis->globalObject());
// Call the JS initializer function with the object as 'this'
JSC::call(lexicalGlobalObject, castedThis->initializeFunction(), jsObject, JSC::ArgList(callFrame), "This error should never occur: initialize function is guaranteed to be callable."_s);
RETURN_IF_EXCEPTION(scope, {});
// Now call the Zig constructor to set up the native parts
JSC::MarkedArgumentBuffer args;
args.append(jsObject);
for (size_t i = 0; i < callFrame->argumentCount(); ++i) {
args.append(callFrame->argument(i));
}
auto result = DecompressionStream__construct(lexicalGlobalObject, callFrame);
RETURN_IF_EXCEPTION(scope, {});
return JSValue::encode(jsObject);
}
/* Hash table for prototype */
static const HashTableValue JSDecompressionStreamPrototypeTableValues[] = {
{ "constructor"_s, static_cast<unsigned>(PropertyAttribute::DontEnum), NoIntrinsic, { HashTableValue::GetterSetterType, jsDecompressionStreamConstructor, 0 } },
{ "readable"_s, JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::Accessor | JSC::PropertyAttribute::Builtin, NoIntrinsic, { HashTableValue::BuiltinAccessorType, decompressionStreamReadableCodeGenerator, 0 } },
{ "writable"_s, JSC::PropertyAttribute::ReadOnly | JSC::PropertyAttribute::Accessor | JSC::PropertyAttribute::Builtin, NoIntrinsic, { HashTableValue::BuiltinAccessorType, decompressionStreamWritableCodeGenerator, 0 } },
};
const ClassInfo JSDecompressionStreamPrototype::s_info = { "DecompressionStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSDecompressionStreamPrototype) };
void JSDecompressionStreamPrototype::finishCreation(VM& vm)
{
Base::finishCreation(vm);
reifyStaticProperties(vm, JSDecompressionStream::info(), JSDecompressionStreamPrototypeTableValues, *this);
JSC_TO_STRING_TAG_WITHOUT_TRANSITION();
}
const ClassInfo JSDecompressionStream::s_info = { "DecompressionStream"_s, &Base::s_info, nullptr, nullptr, CREATE_METHOD_TABLE(JSDecompressionStream) };
JSDecompressionStream::JSDecompressionStream(Structure* structure, JSDOMGlobalObject& globalObject)
: JSDOMObject(structure, globalObject)
{
}
JSObject* JSDecompressionStream::createPrototype(VM& vm, JSDOMGlobalObject& globalObject)
{
auto* structure = JSDecompressionStreamPrototype::createStructure(vm, &globalObject, globalObject.objectPrototype());
structure->setMayBePrototype(true);
return JSDecompressionStreamPrototype::create(vm, &globalObject, structure);
}
JSObject* JSDecompressionStream::prototype(VM& vm, JSDOMGlobalObject& globalObject)
{
return getDOMPrototype<JSDecompressionStream>(vm, globalObject);
}
JSValue JSDecompressionStream::getConstructor(VM& vm, const JSGlobalObject* globalObject)
{
return getDOMConstructor<JSDecompressionStreamDOMConstructor, DOMConstructorID::DecompressionStream>(vm, *jsCast<const JSDOMGlobalObject*>(globalObject));
}
void JSDecompressionStream::destroy(JSC::JSCell* cell)
{
JSDecompressionStream* thisObject = static_cast<JSDecompressionStream*>(cell);
thisObject->JSDecompressionStream::~JSDecompressionStream();
}
JSC_DEFINE_CUSTOM_GETTER(jsDecompressionStreamConstructor, (JSGlobalObject * lexicalGlobalObject, EncodedJSValue thisValue, PropertyName))
{
auto& vm = JSC::getVM(lexicalGlobalObject);
auto throwScope = DECLARE_THROW_SCOPE(vm);
auto* prototype = jsDynamicCast<JSDecompressionStreamPrototype*>(JSValue::decode(thisValue));
if (!prototype) [[unlikely]]
return throwVMTypeError(lexicalGlobalObject, throwScope);
return JSValue::encode(JSDecompressionStream::getConstructor(vm, prototype->globalObject()));
}
JSC::GCClient::IsoSubspace* JSDecompressionStream::subspaceForImpl(JSC::VM& vm)
{
return WebCore::subspaceForImpl<JSDecompressionStream, UseCustomHeapCellType::No>(
vm,
[](auto& spaces) { return spaces.m_clientSubspaceForDecompressionStream.get(); },
[](auto& spaces, auto&& space) { spaces.m_clientSubspaceForDecompressionStream = std::forward<decltype(space)>(space); },
[](auto& spaces) { return spaces.m_subspaceForDecompressionStream.get(); },
[](auto& spaces, auto&& space) { spaces.m_subspaceForDecompressionStream = std::forward<decltype(space)>(space); });
}
} // namespace WebCore

View File

@@ -0,0 +1,47 @@
#pragma once
#include "JSDOMObject.h"
#include <JavaScriptCore/JSGlobalObject.h>
#include <JavaScriptCore/JSObject.h>
#include <JavaScriptCore/JSValue.h>
namespace WebCore {
class JSDecompressionStream : public JSDOMObject {
public:
using Base = JSDOMObject;
static JSDecompressionStream* create(JSC::Structure* structure, JSDOMGlobalObject* globalObject)
{
auto& vm = JSC::getVM(globalObject);
JSDecompressionStream* ptr = new (NotNull, JSC::allocateCell<JSDecompressionStream>(vm)) JSDecompressionStream(structure, *globalObject);
ptr->finishCreation(vm);
return ptr;
}
static JSC::JSObject* createPrototype(JSC::VM&, JSDOMGlobalObject&);
static JSC::JSObject* prototype(JSC::VM&, JSDOMGlobalObject&);
static void destroy(JSC::JSCell*);
DECLARE_INFO;
static JSC::Structure* createStructure(JSC::VM& vm, JSC::JSGlobalObject* globalObject, JSC::JSValue prototype)
{
return JSC::Structure::create(vm, globalObject, prototype, JSC::TypeInfo(JSC::ObjectType, StructureFlags), info(), JSC::NonArray);
}
static JSC::JSValue getConstructor(JSC::VM&, const JSC::JSGlobalObject*);
template<typename, JSC::SubspaceAccess mode> static JSC::GCClient::IsoSubspace* subspaceFor(JSC::VM& vm)
{
if constexpr (mode == JSC::SubspaceAccess::Concurrently)
return nullptr;
return subspaceForImpl(vm);
}
static JSC::GCClient::IsoSubspace* subspaceForImpl(JSC::VM& vm);
protected:
JSDecompressionStream(JSC::Structure*, JSDOMGlobalObject&);
DECLARE_DEFAULT_FINISH_CREATION;
};
} // namespace WebCore

View File

@@ -0,0 +1,242 @@
const CompressionSink = @This();
const std = @import("std");
const bun = @import("bun");
const webcore = bun.webcore;
const streams = webcore.streams;
const jsc = bun.jsc;
const Output = bun.Output;
const JSC = bun.JSC;
const JSValue = jsc.JSValue;
const Encoder = @import("./CompressionStreamEncoder.zig").Encoder;
const log = Output.scoped(.CompressionSink, false);
pub const Sink = struct {
// Pointer to the readable side's source logic
encoder: *Encoder,
// JSSink vtable fields
signal: streams.Signal = .{},
has: streams.Has = .{},
done: bool = false,
pending: streams.Result.Writable.Pending = .{},
bytes_written: usize = 0,
allocator: std.mem.Allocator,
pub fn connect(this: *Sink, signal: streams.Signal) void {
this.signal = signal;
}
pub fn start(this: *Sink, stream_start: streams.Start) JSC.Maybe(void) {
_ = stream_start;
this.signal.start();
return .{ .result = {} };
}
pub fn write(this: *Sink, data: streams.Result) streams.Result.Writable {
log("write({d} bytes)", .{data.slice().len});
if (this.done) {
return .{ .done = {} };
}
const chunk = data.slice();
if (chunk.len == 0) {
return .{ .owned = 0 };
}
// Write to the encoder
this.encoder.write(chunk) catch |err| {
return .{ .err = bun.sys.Error.fromCode(.INVAL, .write) };
};
this.bytes_written += chunk.len;
// Return how many bytes we consumed
return .{ .owned = @truncate(chunk.len) };
}
pub fn writeBytes(this: *Sink, data: streams.Result) streams.Result.Writable {
return this.write(data);
}
pub fn writeLatin1(this: *Sink, data: streams.Result) streams.Result.Writable {
return this.write(data);
}
pub fn writeUTF16(this: *Sink, data: streams.Result) streams.Result.Writable {
// Convert UTF16 to UTF8 first
// For now, just treat it as bytes
return this.write(data);
}
pub fn end(this: *Sink, err: ?bun.sys.Error) JSC.Maybe(void) {
log("end()", .{});
if (this.done) {
return .{ .result = {} };
}
this.done = true;
if (err) |e| {
_ = e;
// If there's an error, we should notify the encoder
this.encoder.onCancel();
} else {
// Normal end - flush the encoder
this.encoder.flush() catch |flush_err| {
_ = flush_err;
return .{ .err = bun.sys.Error.fromCode(.INVAL, .flush) };
};
}
this.signal.close(err);
return .{ .result = {} };
}
pub fn endFromJS(this: *Sink, globalObject: *JSC.JSGlobalObject) JSC.Maybe(JSValue) {
_ = globalObject;
log("endFromJS()", .{});
if (this.done) {
return .{ .result = .true };
}
switch (this.end(null)) {
.err => |err| return .{ .err = err },
.result => return .{ .result = .true },
}
}
pub fn flush(this: *Sink) JSC.Maybe(void) {
log("flush()", .{});
// For compression stream, flush doesn't do anything special
// The actual flushing happens in end()
return .{ .result = {} };
}
pub fn flushFromJS(this: *Sink, globalObject: *JSC.JSGlobalObject, wait: bool) JSC.Maybe(JSValue) {
_ = globalObject;
_ = wait;
return .{ .result = .undefined };
}
pub fn finalize(this: *Sink) void {
log("finalize()", .{});
if (!this.done) {
this.done = true;
this.encoder.onCancel();
}
this.pending.deinit();
this.deref();
}
pub fn init(allocator: std.mem.Allocator, encoder: *Encoder) *Sink {
const sink = allocator.create(Sink) catch bun.outOfMemory();
sink.* = .{
.encoder = encoder,
.allocator = allocator,
};
return sink;
}
pub fn construct(this: *Sink, allocator: std.mem.Allocator) void {
_ = allocator;
_ = this;
// This shouldn't be called for CompressionSink
@panic("CompressionSink.construct should not be called");
}
pub fn deinit(this: *Sink) void {
log("deinit()", .{});
this.allocator.destroy(this);
}
// JSSink interface requirements
pub const ref = JSC.Codegen.JSCompressionSink.ref;
pub const deref = JSC.Codegen.JSCompressionSink.deref;
pub const updateRef = JSC.Codegen.JSCompressionSink.updateRef;
pub fn toJS(this: *Sink, globalObject: *JSC.JSGlobalObject) JSValue {
return JSC.Codegen.JSCompressionSink.toJS(this, globalObject);
}
pub fn detach(this: *Sink) void {
log("detach()", .{});
if (!this.done) {
this.done = true;
this.encoder.onCancel();
}
}
pub fn onClose(_: *Sink) void {
log("onClose()", .{});
}
pub fn onReady(_: *Sink) void {
log("onReady()", .{});
}
pub fn onError(this: *Sink, err: bun.sys.Error) void {
log("onError()", .{});
_ = this.end(err);
}
};
pub fn CompressionSink__updateRef(ptr: *anyopaque, value: bool) callconv(.C) void {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
if (value) {
sink.ref();
} else {
sink.deref();
}
}
pub fn CompressionSink__write(
ptr: *anyopaque,
data: StreamResult,
globalThis: *JSC.JSGlobalObject,
) callconv(.C) StreamResult.Writable {
_ = globalThis;
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return sink.write(data);
}
pub fn CompressionSink__close(globalThis: *JSC.JSGlobalObject, ptr: *anyopaque) callconv(.C) void {
_ = globalThis;
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
sink.detach();
}
pub fn CompressionSink__endWithSink(ptr: *anyopaque, globalThis: *JSC.JSGlobalObject) callconv(.C) JSValue {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return sink.endFromJS(globalThis).toJS(globalThis);
}
pub fn CompressionSink__flushFromJS(
ptr: *anyopaque,
globalThis: *JSC.JSGlobalObject,
wait: bool,
) callconv(.C) JSValue {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return sink.flushFromJS(globalThis, wait).toJS(globalThis);
}
pub fn CompressionSink__memoryCost(ptr: *anyopaque) callconv(.C) usize {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return @sizeOf(Sink) + sink.bytes_written;
}
const StreamResult = webcore.StreamResult;
// Export the sink type for use in other modules
pub const CompressionStreamSink = Sink;

View File

@@ -0,0 +1,130 @@
const CompressionStream = @This();
const std = @import("std");
const bun = @import("bun");
const webcore = bun.webcore;
const JSC = bun.JSC;
const JSValue = JSC.JSValue;
const JSGlobalObject = JSC.JSGlobalObject;
const CompressionStreamEncoder = @import("./CompressionStreamEncoder.zig");
const CompressionSink = @import("./CompressionSink.zig");
const DecompressionStreamEncoder = @import("./DecompressionStreamEncoder.zig");
const DecompressionSink = @import("./DecompressionSink.zig");
const log = bun.Output.scoped(.CompressionStream, false);
// Constructor implementation called from C++
pub export fn CompressionStream__construct(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) JSValue {
const vm = globalThis.bunVM();
const arguments = callFrame.arguments(2);
// First argument is the 'this' object that was already created
const this_value = arguments.ptr[0];
// Second argument is the format parameter (optional, defaults to "gzip")
const format_arg = if (arguments.len > 1) arguments.ptr[1] else .undefined;
var format_slice: []const u8 = "gzip";
if (!format_arg.isUndefined()) {
const format_str = format_arg.getZigString(globalThis);
if (format_str.len == 0) {
globalThis.throwInvalidArguments("format parameter must not be empty", .{});
return .zero;
}
format_slice = format_str.slice();
}
// Parse the algorithm
const algorithm = CompressionStreamEncoder.Algorithm.fromString(format_slice) orelse {
globalThis.throwInvalidArguments("Unsupported compression format: {s}", .{format_slice});
return .zero;
};
// Create the encoder
var encoder = CompressionStreamEncoder.Source.new(.{
.globalThis = globalThis,
.context = .{
.ref_count = .init(),
.allocator = bun.default_allocator,
.state = .{ .uninit = algorithm },
.buffer = .{},
.pending = .{},
.is_closed = false,
},
});
// Create the sink and link it to the encoder
var sink = CompressionSink.Sink.init(bun.default_allocator, &encoder.context);
// Create the ReadableStream with the encoder as the native source
const readable = encoder.toReadableStream(globalThis);
// Create the WritableStream with the sink
const writable = JSC.WebCore.WritableStream.fromSink(globalThis, sink, null);
// Store the streams on the JS object using putDirectPrivate
const names = bun.String.fromBytes;
this_value.putDirect(globalThis.vm(), JSC.ZigString.static("readable").toIdentifier(globalThis), readable, .{ .PrivateName = true });
this_value.putDirect(globalThis.vm(), JSC.ZigString.static("writable").toIdentifier(globalThis), writable, .{ .PrivateName = true });
return this_value;
}
// DecompressionStream constructor
pub export fn DecompressionStream__construct(globalThis: *JSGlobalObject, callFrame: *JSC.CallFrame) JSValue {
const vm = globalThis.bunVM();
const arguments = callFrame.arguments(2);
// First argument is the 'this' object that was already created
const this_value = arguments.ptr[0];
// Second argument is the format parameter (optional, defaults to "gzip")
const format_arg = if (arguments.len > 1) arguments.ptr[1] else .undefined;
var format_slice: []const u8 = "gzip";
if (!format_arg.isUndefined()) {
const format_str = format_arg.getZigString(globalThis);
if (format_str.len == 0) {
globalThis.throwInvalidArguments("format parameter must not be empty", .{});
return .zero;
}
format_slice = format_str.slice();
}
// Parse the algorithm
const algorithm = DecompressionStreamEncoder.Algorithm.fromString(format_slice) orelse {
globalThis.throwInvalidArguments("Unsupported decompression format: {s}", .{format_slice});
return .zero;
};
// Create the encoder
var encoder = DecompressionStreamEncoder.Source.new(.{
.globalThis = globalThis,
.context = .{
.ref_count = .init(),
.allocator = bun.default_allocator,
.state = .{ .uninit = algorithm },
.buffer = .{},
.pending = .{},
.is_closed = false,
},
});
// Create the sink and link it to the encoder
var sink = DecompressionSink.Sink.init(bun.default_allocator, &encoder.context);
// Create the ReadableStream with the encoder as the native source
const readable = encoder.toReadableStream(globalThis);
// Create the WritableStream with the sink
const writable = JSC.WebCore.WritableStream.fromSink(globalThis, sink, null);
// Store the streams on the JS object using putDirectPrivate
const names = bun.String.fromBytes;
this_value.putDirect(globalThis.vm(), JSC.ZigString.static("readable").toIdentifier(globalThis), readable, .{ .PrivateName = true });
this_value.putDirect(globalThis.vm(), JSC.ZigString.static("writable").toIdentifier(globalThis), writable, .{ .PrivateName = true });
return this_value;
}

View File

@@ -0,0 +1,328 @@
const CompressionStreamEncoder = @This();
const std = @import("std");
const bun = @import("bun");
const webcore = bun.webcore;
const streams = webcore.streams;
const jsc = bun.jsc;
const Output = bun.Output;
const Blob = webcore.Blob;
const ByteList = bun.ByteList;
const JSC = bun.JSC;
const JSValue = jsc.JSValue;
const log = Output.scoped(.CompressionStreamEncoder, false);
pub const Algorithm = enum {
gzip,
deflate,
deflate_raw,
pub fn fromString(str: []const u8) ?Algorithm {
if (bun.strings.eqlComptime(str, "gzip")) return .gzip;
if (bun.strings.eqlComptime(str, "deflate")) return .deflate;
if (bun.strings.eqlComptime(str, "deflate-raw")) return .deflate_raw;
return null;
}
pub fn toWindowBits(this: Algorithm) c_int {
return switch (this) {
.gzip => 15 + 16, // Add 16 for gzip encoding
.deflate => 15, // Standard deflate
.deflate_raw => -15, // Raw deflate (no header)
};
}
};
pub const State = union(enum) {
uninit: Algorithm,
inflate: bun.zlib.z_stream,
err: bun.sys.Error,
};
pub const Encoder = struct {
ref_count: bun.ptr.RefCount(Encoder, "ref_count", deinit, .{}),
allocator: std.mem.Allocator,
state: State,
// Output buffer for compressed data
buffer: bun.ByteList,
// Handles async pull requests when buffer is empty
pending: streams.Result.Pending = .{},
// Track if we've received the final flush
is_closed: bool = false,
// Internal methods to be called by the linked CompressionSink
pub fn write(this: *Encoder, chunk: []const u8) !void {
if (this.is_closed or this.state == .err) return;
var stream = switch (this.state) {
.uninit => |algo| blk: {
var stream = std.mem.zeroes(bun.zlib.z_stream);
const rc = bun.zlib.deflateInit2(
&stream,
bun.zlib.Z_DEFAULT_COMPRESSION,
bun.zlib.Z_DEFLATED,
algo.toWindowBits(),
8, // Default memory level
bun.zlib.Z_DEFAULT_STRATEGY,
);
if (rc != bun.zlib.Z_OK) {
this.state = .{ .err = bun.sys.Error.fromCode(.INVAL, .deflateInit2) };
return error.CompressionInitFailed;
}
this.state = .{ .inflate = stream };
break :blk &this.state.inflate;
},
.inflate => |*s| s,
.err => return error.CompressionError,
};
stream.next_in = @constCast(chunk.ptr);
stream.avail_in = @intCast(chunk.len);
// Compress the data
while (stream.avail_in > 0) {
const initial_buffer_len = this.buffer.len;
try this.buffer.ensureUnusedCapacity(this.allocator, 4096);
stream.next_out = this.buffer.ptr + this.buffer.len;
stream.avail_out = @intCast(this.buffer.capacity - this.buffer.len);
const rc = bun.zlib.deflate(stream, bun.zlib.Z_NO_FLUSH);
const compressed_bytes = (this.buffer.capacity - this.buffer.len) - stream.avail_out;
this.buffer.len += compressed_bytes;
if (rc != bun.zlib.Z_OK and rc != bun.zlib.Z_BUF_ERROR) {
this.state = .{ .err = bun.sys.Error.fromCode(.INVAL, .deflate) };
return error.CompressionFailed;
}
}
// If we have a pending pull request and now have data, fulfill it
if (this.pending.state == .pending and this.buffer.len > 0) {
const to_copy = @min(this.pending_buffer.len, this.buffer.len);
@memcpy(this.pending_buffer[0..to_copy], this.buffer.items[0..to_copy]);
// Shift remaining data
if (to_copy < this.buffer.len) {
std.mem.copyForwards(u8, this.buffer.items[0..this.buffer.len - to_copy], this.buffer.items[to_copy..this.buffer.len]);
}
this.buffer.len -= to_copy;
this.pending.result = .{
.into_array = .{
.value = this.pending_value.get() orelse .zero,
.len = @truncate(to_copy),
},
};
this.pending_buffer = &.{};
this.pending_value.clear();
this.pending.run();
}
}
pub fn flush(this: *Encoder) !void {
if (this.is_closed or this.state == .err) return;
this.is_closed = true;
var stream = switch (this.state) {
.inflate => |*s| s,
.uninit => {
// If we never initialized, we're done
return;
},
.err => return,
};
// Flush remaining compressed data
stream.next_in = null;
stream.avail_in = 0;
while (true) {
try this.buffer.ensureUnusedCapacity(this.allocator, 4096);
stream.next_out = this.buffer.ptr + this.buffer.len;
stream.avail_out = @intCast(this.buffer.capacity - this.buffer.len);
const rc = bun.zlib.deflate(stream, bun.zlib.Z_FINISH);
const compressed_bytes = (this.buffer.capacity - this.buffer.len) - stream.avail_out;
this.buffer.len += compressed_bytes;
if (rc == bun.zlib.Z_STREAM_END) {
break;
}
if (rc != bun.zlib.Z_OK and rc != bun.zlib.Z_BUF_ERROR) {
this.state = .{ .err = bun.sys.Error.fromCode(.INVAL, .deflate) };
return error.CompressionFailed;
}
}
// Clean up zlib stream
_ = bun.zlib.deflateEnd(stream);
// If we have a pending pull request, fulfill it with the final data
if (this.pending.state == .pending) {
if (this.buffer.len > 0) {
const to_copy = @min(this.pending_buffer.len, this.buffer.len);
@memcpy(this.pending_buffer[0..to_copy], this.buffer.items[0..to_copy]);
// Shift remaining data
if (to_copy < this.buffer.len) {
std.mem.copyForwards(u8, this.buffer.items[0..this.buffer.len - to_copy], this.buffer.items[to_copy..this.buffer.len]);
}
this.buffer.len -= to_copy;
this.pending.result = if (this.buffer.len == 0) .{
.into_array_and_done = .{
.value = this.pending_value.get() orelse .zero,
.len = @truncate(to_copy),
},
} else .{
.into_array = .{
.value = this.pending_value.get() orelse .zero,
.len = @truncate(to_copy),
},
};
} else {
this.pending.result = .{ .done = {} };
}
this.pending_buffer = &.{};
this.pending_value.clear();
this.pending.run();
}
}
// Store pending pull request info
pending_buffer: []u8 = &.{},
pending_value: jsc.Strong.Optional = .empty,
pub fn deinit(this: *Encoder) void {
if (this.state == .inflate) {
_ = bun.zlib.deflateEnd(&this.state.inflate);
}
this.buffer.deinitWithAllocator(this.allocator);
this.pending_value.deinit();
this.parent().deinit();
}
pub fn parent(this: *Encoder) *Source {
return @fieldParentPtr("context", this);
}
pub fn setRef(this: *Encoder, ref: bool) void {
if (ref) {
_ = this.parent().incrementCount();
} else {
_ = this.parent().decrementCount();
}
}
};
pub fn onStart(this: *Encoder) streams.Start {
log("onStart()", .{});
return .{ .ready = {} };
}
pub fn onPull(this: *Encoder, buffer: []u8, view: JSValue) streams.Result {
log("onPull({d})", .{buffer.len});
if (this.buffer.len > 0) {
const to_copy = @min(buffer.len, this.buffer.len);
@memcpy(buffer[0..to_copy], this.buffer.items[0..to_copy]);
// Shift remaining data
if (to_copy < this.buffer.len) {
std.mem.copyForwards(u8, this.buffer.items[0..this.buffer.len - to_copy], this.buffer.items[to_copy..this.buffer.len]);
}
this.buffer.len -= to_copy;
if (this.is_closed and this.buffer.len == 0) {
return .{
.into_array_and_done = .{
.value = view,
.len = @truncate(to_copy),
},
};
}
return .{
.into_array = .{
.value = view,
.len = @truncate(to_copy),
},
};
}
if (this.is_closed) {
return .{ .done = {} };
}
// Store the pending request
this.pending_buffer = buffer;
this.pending_value.set(this.parent().globalThis, view);
return .{ .pending = &this.pending };
}
pub fn onCancel(this: *Encoder) void {
log("onCancel()", .{});
this.is_closed = true;
if (this.state == .inflate) {
_ = bun.zlib.deflateEnd(&this.state.inflate);
this.state = .{ .uninit = .gzip };
}
this.buffer.clearAndFree(this.allocator);
if (this.pending.state == .pending) {
this.pending.result = .{ .done = {} };
this.pending_buffer = &.{};
this.pending_value.clear();
this.pending.run();
}
}
pub fn drain(this: *Encoder) bun.ByteList {
if (this.buffer.len > 0) {
const out = this.buffer;
this.buffer = .{};
return out;
}
return .{};
}
pub fn memoryCost(this: *const Encoder) usize {
return this.buffer.capacity;
}
pub fn toBufferedValue(this: *Encoder, globalThis: *jsc.JSGlobalObject, action: streams.BufferAction.Tag) bun.JSError!jsc.JSValue {
_ = this;
_ = globalThis;
_ = action;
return .zero;
}
// Implement the ReadableStream.Source interface for Encoder
pub const Source = webcore.ReadableStream.NewSource(
Encoder,
"CompressionStream",
onStart,
onPull,
onCancel,
deinit,
setRef,
drain,
memoryCost,
toBufferedValue,
);

View File

@@ -0,0 +1,242 @@
const DecompressionSink = @This();
const std = @import("std");
const bun = @import("bun");
const webcore = bun.webcore;
const streams = webcore.streams;
const jsc = bun.jsc;
const Output = bun.Output;
const JSC = bun.JSC;
const JSValue = jsc.JSValue;
const Encoder = @import("./DecompressionStreamEncoder.zig").Encoder;
const log = Output.scoped(.DecompressionSink, false);
pub const Sink = struct {
// Pointer to the readable side's source logic
encoder: *Encoder,
// JSSink vtable fields
signal: streams.Signal = .{},
has: streams.Has = .{},
done: bool = false,
pending: streams.Result.Writable.Pending = .{},
bytes_written: usize = 0,
allocator: std.mem.Allocator,
pub fn connect(this: *Sink, signal: streams.Signal) void {
this.signal = signal;
}
pub fn start(this: *Sink, stream_start: streams.Start) JSC.Maybe(void) {
_ = stream_start;
this.signal.start();
return .{ .result = {} };
}
pub fn write(this: *Sink, data: streams.Result) streams.Result.Writable {
log("write({d} bytes)", .{data.slice().len});
if (this.done) {
return .{ .done = {} };
}
const chunk = data.slice();
if (chunk.len == 0) {
return .{ .owned = 0 };
}
// Write to the encoder
this.encoder.write(chunk) catch |err| {
return .{ .err = bun.sys.Error.fromCode(.INVAL, .write) };
};
this.bytes_written += chunk.len;
// Return how many bytes we consumed
return .{ .owned = @truncate(chunk.len) };
}
pub fn writeBytes(this: *Sink, data: streams.Result) streams.Result.Writable {
return this.write(data);
}
pub fn writeLatin1(this: *Sink, data: streams.Result) streams.Result.Writable {
return this.write(data);
}
pub fn writeUTF16(this: *Sink, data: streams.Result) streams.Result.Writable {
// Convert UTF16 to UTF8 first
// For now, just treat it as bytes
return this.write(data);
}
pub fn end(this: *Sink, err: ?bun.sys.Error) JSC.Maybe(void) {
log("end()", .{});
if (this.done) {
return .{ .result = {} };
}
this.done = true;
if (err) |e| {
_ = e;
// If there's an error, we should notify the encoder
this.encoder.onCancel();
} else {
// Normal end - flush the encoder
this.encoder.flush() catch |flush_err| {
_ = flush_err;
return .{ .err = bun.sys.Error.fromCode(.INVAL, .flush) };
};
}
this.signal.close(err);
return .{ .result = {} };
}
pub fn endFromJS(this: *Sink, globalObject: *JSC.JSGlobalObject) JSC.Maybe(JSValue) {
_ = globalObject;
log("endFromJS()", .{});
if (this.done) {
return .{ .result = .true };
}
switch (this.end(null)) {
.err => |err| return .{ .err = err },
.result => return .{ .result = .true },
}
}
pub fn flush(this: *Sink) JSC.Maybe(void) {
log("flush()", .{});
// For decompression stream, flush doesn't do anything special
// The actual flushing happens in end()
return .{ .result = {} };
}
pub fn flushFromJS(this: *Sink, globalObject: *JSC.JSGlobalObject, wait: bool) JSC.Maybe(JSValue) {
_ = globalObject;
_ = wait;
return .{ .result = .undefined };
}
pub fn finalize(this: *Sink) void {
log("finalize()", .{});
if (!this.done) {
this.done = true;
this.encoder.onCancel();
}
this.pending.deinit();
this.deref();
}
pub fn init(allocator: std.mem.Allocator, encoder: *Encoder) *Sink {
const sink = allocator.create(Sink) catch bun.outOfMemory();
sink.* = .{
.encoder = encoder,
.allocator = allocator,
};
return sink;
}
pub fn construct(this: *Sink, allocator: std.mem.Allocator) void {
_ = allocator;
_ = this;
// This shouldn't be called for DecompressionSink
@panic("DecompressionSink.construct should not be called");
}
pub fn deinit(this: *Sink) void {
log("deinit()", .{});
this.allocator.destroy(this);
}
// JSSink interface requirements
pub const ref = JSC.Codegen.JSDecompressionSink.ref;
pub const deref = JSC.Codegen.JSDecompressionSink.deref;
pub const updateRef = JSC.Codegen.JSDecompressionSink.updateRef;
pub fn toJS(this: *Sink, globalObject: *JSC.JSGlobalObject) JSValue {
return JSC.Codegen.JSDecompressionSink.toJS(this, globalObject);
}
pub fn detach(this: *Sink) void {
log("detach()", .{});
if (!this.done) {
this.done = true;
this.encoder.onCancel();
}
}
pub fn onClose(_: *Sink) void {
log("onClose()", .{});
}
pub fn onReady(_: *Sink) void {
log("onReady()", .{});
}
pub fn onError(this: *Sink, err: bun.sys.Error) void {
log("onError()", .{});
_ = this.end(err);
}
};
pub fn DecompressionSink__updateRef(ptr: *anyopaque, value: bool) callconv(.C) void {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
if (value) {
sink.ref();
} else {
sink.deref();
}
}
pub fn DecompressionSink__write(
ptr: *anyopaque,
data: StreamResult,
globalThis: *JSC.JSGlobalObject,
) callconv(.C) StreamResult.Writable {
_ = globalThis;
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return sink.write(data);
}
pub fn DecompressionSink__close(globalThis: *JSC.JSGlobalObject, ptr: *anyopaque) callconv(.C) void {
_ = globalThis;
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
sink.detach();
}
pub fn DecompressionSink__endWithSink(ptr: *anyopaque, globalThis: *JSC.JSGlobalObject) callconv(.C) JSValue {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return sink.endFromJS(globalThis).toJS(globalThis);
}
pub fn DecompressionSink__flushFromJS(
ptr: *anyopaque,
globalThis: *JSC.JSGlobalObject,
wait: bool,
) callconv(.C) JSValue {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return sink.flushFromJS(globalThis, wait).toJS(globalThis);
}
pub fn DecompressionSink__memoryCost(ptr: *anyopaque) callconv(.C) usize {
const sink = @as(*Sink, @ptrCast(@alignCast(ptr)));
return @sizeOf(Sink) + sink.bytes_written;
}
const StreamResult = webcore.StreamResult;
// Export the sink type for use in other modules
pub const DecompressionStreamSink = Sink;

View File

@@ -0,0 +1,306 @@
const DecompressionStreamEncoder = @This();
const std = @import("std");
const bun = @import("bun");
const webcore = bun.webcore;
const streams = webcore.streams;
const jsc = bun.jsc;
const Output = bun.Output;
const Blob = webcore.Blob;
const ByteList = bun.ByteList;
const JSC = bun.JSC;
const JSValue = jsc.JSValue;
const log = Output.scoped(.DecompressionStreamEncoder, false);
pub const Algorithm = enum {
gzip,
deflate,
deflate_raw,
pub fn fromString(str: []const u8) ?Algorithm {
if (bun.strings.eqlComptime(str, "gzip")) return .gzip;
if (bun.strings.eqlComptime(str, "deflate")) return .deflate;
if (bun.strings.eqlComptime(str, "deflate-raw")) return .deflate_raw;
return null;
}
pub fn toWindowBits(this: Algorithm) c_int {
return switch (this) {
.gzip => 15 + 16, // Add 16 for gzip decoding
.deflate => 15, // Standard deflate
.deflate_raw => -15, // Raw deflate (no header)
};
}
};
pub const State = union(enum) {
uninit: Algorithm,
inflate: bun.zlib.z_stream,
err: bun.sys.Error,
};
pub const Encoder = struct {
ref_count: bun.ptr.RefCount(Encoder, "ref_count", deinit, .{}),
allocator: std.mem.Allocator,
state: State,
// Output buffer for decompressed data
buffer: bun.ByteList,
// Handles async pull requests when buffer is empty
pending: streams.Result.Pending = .{},
// Track if we've received the final flush
is_closed: bool = false,
// Internal methods to be called by the linked DecompressionSink
pub fn write(this: *Encoder, chunk: []const u8) !void {
if (this.is_closed or this.state == .err) return;
var stream = switch (this.state) {
.uninit => |algo| blk: {
var stream = std.mem.zeroes(bun.zlib.z_stream);
const rc = bun.zlib.inflateInit2(
&stream,
algo.toWindowBits(),
);
if (rc != bun.zlib.Z_OK) {
this.state = .{ .err = bun.sys.Error.fromCode(.INVAL, .inflateInit2) };
return error.DecompressionInitFailed;
}
this.state = .{ .inflate = stream };
break :blk &this.state.inflate;
},
.inflate => |*s| s,
.err => return error.DecompressionError,
};
stream.next_in = @constCast(chunk.ptr);
stream.avail_in = @intCast(chunk.len);
// Decompress the data
while (stream.avail_in > 0) {
const initial_buffer_len = this.buffer.len;
try this.buffer.ensureUnusedCapacity(this.allocator, 4096);
stream.next_out = this.buffer.ptr + this.buffer.len;
stream.avail_out = @intCast(this.buffer.capacity - this.buffer.len);
const rc = bun.zlib.inflate(stream, bun.zlib.Z_NO_FLUSH);
const decompressed_bytes = (this.buffer.capacity - this.buffer.len) - stream.avail_out;
this.buffer.len += decompressed_bytes;
if (rc == bun.zlib.Z_STREAM_END) {
// Stream has ended, mark as closed
this.is_closed = true;
break;
}
if (rc != bun.zlib.Z_OK and rc != bun.zlib.Z_BUF_ERROR) {
this.state = .{ .err = bun.sys.Error.fromCode(.INVAL, .inflate) };
return error.DecompressionFailed;
}
}
// If we have a pending pull request and now have data, fulfill it
if (this.pending.state == .pending and this.buffer.len > 0) {
const to_copy = @min(this.pending_buffer.len, this.buffer.len);
@memcpy(this.pending_buffer[0..to_copy], this.buffer.items[0..to_copy]);
// Shift remaining data
if (to_copy < this.buffer.len) {
std.mem.copyForwards(u8, this.buffer.items[0..this.buffer.len - to_copy], this.buffer.items[to_copy..this.buffer.len]);
}
this.buffer.len -= to_copy;
this.pending.result = .{
.into_array = .{
.value = this.pending_value.get() orelse .zero,
.len = @truncate(to_copy),
},
};
this.pending_buffer = &.{};
this.pending_value.clear();
this.pending.run();
}
}
pub fn flush(this: *Encoder) !void {
if (this.state == .err) return;
this.is_closed = true;
var stream = switch (this.state) {
.inflate => |*s| s,
.uninit => {
// If we never initialized, we're done
return;
},
.err => return,
};
// For decompression, we don't need to do anything special on flush
// Just clean up the zlib stream
_ = bun.zlib.inflateEnd(stream);
// If we have a pending pull request, fulfill it with final status
if (this.pending.state == .pending) {
if (this.buffer.len > 0) {
const to_copy = @min(this.pending_buffer.len, this.buffer.len);
@memcpy(this.pending_buffer[0..to_copy], this.buffer.items[0..to_copy]);
// Shift remaining data
if (to_copy < this.buffer.len) {
std.mem.copyForwards(u8, this.buffer.items[0..this.buffer.len - to_copy], this.buffer.items[to_copy..this.buffer.len]);
}
this.buffer.len -= to_copy;
this.pending.result = if (this.buffer.len == 0) .{
.into_array_and_done = .{
.value = this.pending_value.get() orelse .zero,
.len = @truncate(to_copy),
},
} else .{
.into_array = .{
.value = this.pending_value.get() orelse .zero,
.len = @truncate(to_copy),
},
};
} else {
this.pending.result = .{ .done = {} };
}
this.pending_buffer = &.{};
this.pending_value.clear();
this.pending.run();
}
}
// Store pending pull request info
pending_buffer: []u8 = &.{},
pending_value: jsc.Strong.Optional = .empty,
pub fn deinit(this: *Encoder) void {
if (this.state == .inflate) {
_ = bun.zlib.inflateEnd(&this.state.inflate);
}
this.buffer.deinitWithAllocator(this.allocator);
this.pending_value.deinit();
this.parent().deinit();
}
pub fn parent(this: *Encoder) *Source {
return @fieldParentPtr("context", this);
}
pub fn setRef(this: *Encoder, ref: bool) void {
if (ref) {
_ = this.parent().incrementCount();
} else {
_ = this.parent().decrementCount();
}
}
};
pub fn onStart(this: *Encoder) streams.Start {
log("onStart()", .{});
return .{ .ready = {} };
}
pub fn onPull(this: *Encoder, buffer: []u8, view: JSValue) streams.Result {
log("onPull({d})", .{buffer.len});
if (this.buffer.len > 0) {
const to_copy = @min(buffer.len, this.buffer.len);
@memcpy(buffer[0..to_copy], this.buffer.items[0..to_copy]);
// Shift remaining data
if (to_copy < this.buffer.len) {
std.mem.copyForwards(u8, this.buffer.items[0..this.buffer.len - to_copy], this.buffer.items[to_copy..this.buffer.len]);
}
this.buffer.len -= to_copy;
if (this.is_closed and this.buffer.len == 0) {
return .{
.into_array_and_done = .{
.value = view,
.len = @truncate(to_copy),
},
};
}
return .{
.into_array = .{
.value = view,
.len = @truncate(to_copy),
},
};
}
if (this.is_closed) {
return .{ .done = {} };
}
// Store the pending request
this.pending_buffer = buffer;
this.pending_value.set(this.parent().globalThis, view);
return .{ .pending = &this.pending };
}
pub fn onCancel(this: *Encoder) void {
log("onCancel()", .{});
this.is_closed = true;
if (this.state == .inflate) {
_ = bun.zlib.inflateEnd(&this.state.inflate);
this.state = .{ .uninit = .gzip };
}
this.buffer.clearAndFree(this.allocator);
if (this.pending.state == .pending) {
this.pending.result = .{ .done = {} };
this.pending_buffer = &.{};
this.pending_value.clear();
this.pending.run();
}
}
pub fn drain(this: *Encoder) bun.ByteList {
if (this.buffer.len > 0) {
const out = this.buffer;
this.buffer = .{};
return out;
}
return .{};
}
pub fn memoryCost(this: *const Encoder) usize {
return this.buffer.capacity;
}
pub fn toBufferedValue(this: *Encoder, globalThis: *jsc.JSGlobalObject, action: streams.BufferAction.Tag) bun.JSError!jsc.JSValue {
_ = this;
_ = globalThis;
_ = action;
return .zero;
}
// Implement the ReadableStream.Source interface for Encoder
pub const Source = webcore.ReadableStream.NewSource(
Encoder,
"DecompressionStream",
onStart,
onPull,
onCancel,
deinit,
setRef,
drain,
memoryCost,
toBufferedValue,
);

View File

@@ -130,6 +130,12 @@ pub fn done(this: *const ReadableStream, globalThis: *JSGlobalObject) void {
.Bytes => |source| {
source.parent().cancel();
},
.CompressionStream => |source| {
source.parent().cancel();
},
.DecompressionStream => |source| {
source.parent().cancel();
},
else => {},
}
this.detachIfPossible(globalThis);
@@ -181,6 +187,9 @@ pub const Tag = enum(i32) {
Direct = 3,
Bytes = 4,
CompressionStream = 5,
DecompressionStream = 6,
};
pub const Source = union(Tag) {
@@ -202,6 +211,9 @@ pub const Source = union(Tag) {
Direct: void,
Bytes: *webcore.ByteStream,
CompressionStream: *webcore.CompressionStreamEncoder.Encoder,
DecompressionStream: *webcore.DecompressionStreamEncoder.Encoder,
};
extern fn ReadableStreamTag__tagged(globalObject: *JSGlobalObject, possibleReadableStream: *JSValue, ptr: *?*anyopaque) Tag;
@@ -267,6 +279,20 @@ pub fn fromJS(value: JSValue, globalThis: *JSGlobalObject) ?ReadableStream {
},
},
.CompressionStream => ReadableStream{
.value = out,
.ptr = .{
.CompressionStream = @ptrCast(@alignCast(ptr.?)),
},
},
.DecompressionStream => ReadableStream{
.value = out,
.ptr = .{
.DecompressionStream = @ptrCast(@alignCast(ptr.?)),
},
},
// .HTTPRequest => ReadableStream{
// .value = out,
// .ptr = .{

View File

@@ -1,6 +1,6 @@
import { join, resolve } from "path";
const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink", "NetworkSink"];
const classes = ["ArrayBufferSink", "FileSink", "HTTPResponseSink", "HTTPSResponseSink", "NetworkSink", "CompressionSink", "DecompressionSink"];
function names(name) {
return {

View File

@@ -0,0 +1,42 @@
// CompressionStream implementation
$getter;
export function readable(this: CompressionStream): ReadableStream {
const stream = $getByIdDirectPrivate(this, "readable");
if (!stream) throw $ERR_INVALID_THIS("CompressionStream");
return stream as ReadableStream;
}
$getter;
export function writable(this: CompressionStream): WritableStream {
const stream = $getByIdDirectPrivate(this, "writable");
if (!stream) throw $ERR_INVALID_THIS("CompressionStream");
return stream as WritableStream;
}
$constructor;
export function CompressionStream(this: CompressionStream, format: string = "gzip") {
// This will be called from the C++ constructor
// The C++ side will:
// 1. Create the native Encoder
// 2. Create the native Sink and link it to the Encoder
// 3. Create a native-backed ReadableStream with the Encoder
// 4. Create a native-backed WritableStream with the Sink
// 5. Store both streams using putDirectPrivate
// The constructor is implemented in C++, but we need this JS wrapper
// to properly handle the format parameter and throw appropriate errors
if (typeof format !== "string") {
throw $ERR_INVALID_ARG_TYPE("format", "string", format);
}
// Validate format
if (format !== "gzip" && format !== "deflate" && format !== "deflate-raw") {
throw $ERR_INVALID_ARG_VALUE("format", format, "must be 'gzip', 'deflate', or 'deflate-raw'");
}
// The actual initialization happens in the C++ constructor
// This is just a placeholder that will be replaced by the bindings
return this;
}

View File

@@ -0,0 +1,42 @@
// DecompressionStream implementation
$getter;
export function readable(this: DecompressionStream): ReadableStream {
const stream = $getByIdDirectPrivate(this, "readable");
if (!stream) throw $ERR_INVALID_THIS("DecompressionStream");
return stream as ReadableStream;
}
$getter;
export function writable(this: DecompressionStream): WritableStream {
const stream = $getByIdDirectPrivate(this, "writable");
if (!stream) throw $ERR_INVALID_THIS("DecompressionStream");
return stream as WritableStream;
}
$constructor;
export function DecompressionStream(this: DecompressionStream, format: string = "gzip") {
// This will be called from the C++ constructor
// The C++ side will:
// 1. Create the native Encoder
// 2. Create the native Sink and link it to the Encoder
// 3. Create a native-backed ReadableStream with the Encoder
// 4. Create a native-backed WritableStream with the Sink
// 5. Store both streams using putDirectPrivate
// The constructor is implemented in C++, but we need this JS wrapper
// to properly handle the format parameter and throw appropriate errors
if (typeof format !== "string") {
throw $ERR_INVALID_ARG_TYPE("format", "string", format);
}
// Validate format
if (format !== "gzip" && format !== "deflate" && format !== "deflate-raw") {
throw $ERR_INVALID_ARG_VALUE("format", format, "must be 'gzip', 'deflate', or 'deflate-raw'");
}
// The actual initialization happens in the C++ constructor
// This is just a placeholder that will be replaced by the bindings
return this;
}

View File

@@ -0,0 +1,258 @@
import { describe, it, expect, test } from "bun:test";
describe("CompressionStream", () => {
it("should be defined globally", () => {
expect(CompressionStream).toBeDefined();
expect(DecompressionStream).toBeDefined();
});
it("should create with default gzip format", () => {
const cs = new CompressionStream();
expect(cs).toBeInstanceOf(CompressionStream);
expect(cs.readable).toBeInstanceOf(ReadableStream);
expect(cs.writable).toBeInstanceOf(WritableStream);
});
it("should accept valid formats", () => {
const formats: CompressionFormat[] = ["gzip", "deflate", "deflate-raw"];
for (const format of formats) {
const cs = new CompressionStream(format);
expect(cs).toBeInstanceOf(CompressionStream);
}
});
it("should reject invalid formats", () => {
expect(() => new CompressionStream("invalid" as CompressionFormat)).toThrow();
expect(() => new CompressionStream("brotli" as CompressionFormat)).toThrow(); // Not implemented yet
});
it("should compress and decompress data roundtrip", async () => {
const input = "Hello, World! ".repeat(100);
const encoder = new TextEncoder();
const decoder = new TextDecoder();
// Compress
const cs = new CompressionStream("gzip");
const writer = cs.writable.getWriter();
const reader = cs.readable.getReader();
writer.write(encoder.encode(input));
writer.close();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
const compressed = new Blob(chunks);
expect(compressed.size).toBeLessThan(input.length);
// Decompress
const ds = new DecompressionStream("gzip");
const decompressWriter = ds.writable.getWriter();
const decompressReader = ds.readable.getReader();
for (const chunk of chunks) {
await decompressWriter.write(chunk);
}
await decompressWriter.close();
const decompressedChunks: Uint8Array[] = [];
while (true) {
const { done, value } = await decompressReader.read();
if (done) break;
decompressedChunks.push(value);
}
const decompressed = new Blob(decompressedChunks);
const result = decoder.decode(await decompressed.arrayBuffer());
expect(result).toBe(input);
});
it("should handle empty input", async () => {
const cs = new CompressionStream();
const writer = cs.writable.getWriter();
const reader = cs.readable.getReader();
await writer.close();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks.length).toBeGreaterThan(0); // Should have gzip headers at least
});
test("should work with pipeTo/pipeThrough", async () => {
const input = "Test data for piping";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const inputStream = new ReadableStream({
start(controller) {
controller.enqueue(encoder.encode(input));
controller.close();
},
});
const cs = new CompressionStream();
const ds = new DecompressionStream();
const result = await inputStream
.pipeThrough(cs)
.pipeThrough(ds)
.pipeThrough(new TextDecoderStream());
const reader = result.getReader();
let output = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
output += value;
}
expect(output).toBe(input);
});
test("deflate format roundtrip", async () => {
const input = "Deflate compression test";
const encoder = new TextEncoder();
const decoder = new TextDecoder();
// Compress with deflate
const cs = new CompressionStream("deflate");
const writer = cs.writable.getWriter();
const reader = cs.readable.getReader();
await writer.write(encoder.encode(input));
await writer.close();
const chunks: Uint8Array[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
// Decompress with deflate
const ds = new DecompressionStream("deflate");
const decompressWriter = ds.writable.getWriter();
const decompressReader = ds.readable.getReader();
for (const chunk of chunks) {
await decompressWriter.write(chunk);
}
await decompressWriter.close();
const decompressedChunks: Uint8Array[] = [];
while (true) {
const { done, value } = await decompressReader.read();
if (done) break;
decompressedChunks.push(value);
}
const decompressed = new Blob(decompressedChunks);
const result = decoder.decode(await decompressed.arrayBuffer());
expect(result).toBe(input);
});
test("should handle large data", async () => {
const largeData = "x".repeat(1024 * 1024); // 1MB of data
const encoder = new TextEncoder();
const cs = new CompressionStream();
const writer = cs.writable.getWriter();
const reader = cs.readable.getReader();
await writer.write(encoder.encode(largeData));
await writer.close();
let compressedSize = 0;
while (true) {
const { done, value } = await reader.read();
if (done) break;
compressedSize += value.length;
}
// Compression should significantly reduce size for repetitive data
expect(compressedSize).toBeLessThan(largeData.length / 10);
});
test("should handle multiple writes", async () => {
const chunks = ["Hello", " ", "World", "!"];
const encoder = new TextEncoder();
const decoder = new TextDecoder();
const cs = new CompressionStream();
const ds = new DecompressionStream();
const writer = cs.writable.getWriter();
for (const chunk of chunks) {
await writer.write(encoder.encode(chunk));
}
await writer.close();
await cs.readable.pipeTo(ds.writable);
const reader = ds.readable.getReader();
let result = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
result += decoder.decode(value, { stream: true });
}
expect(result).toBe(chunks.join(""));
});
});
describe("DecompressionStream", () => {
it("should reject invalid compressed data", async () => {
const ds = new DecompressionStream();
const writer = ds.writable.getWriter();
const reader = ds.readable.getReader();
// Write invalid data
await writer.write(new Uint8Array([1, 2, 3, 4, 5]));
await writer.close();
// Should fail when trying to decompress
await expect(reader.read()).rejects.toThrow();
});
it("should handle format mismatch", async () => {
const encoder = new TextEncoder();
// Compress as gzip
const cs = new CompressionStream("gzip");
const writer = cs.writable.getWriter();
await writer.write(encoder.encode("test"));
await writer.close();
const chunks: Uint8Array[] = [];
const reader = cs.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
// Try to decompress as deflate (should fail)
const ds = new DecompressionStream("deflate");
const dsWriter = ds.writable.getWriter();
const dsReader = ds.readable.getReader();
for (const chunk of chunks) {
await dsWriter.write(chunk);
}
await dsWriter.close();
// Should fail when reading
await expect(dsReader.read()).rejects.toThrow();
});
});