Add stream tester app

This commit is contained in:
Jarred Sumner
2023-07-14 22:29:51 -07:00
parent ba43fdf317
commit 0fd493fd78
5 changed files with 148 additions and 81 deletions

View File

@@ -867,10 +867,16 @@ fetch: $(IO_FILES)
rm -rf $(PACKAGE_DIR)/fetch.o
.PHONY: stream-tester
stream-tester: $(IO_FILES)
stream-tester:
$(ZIG) build -Doptimize=ReleaseFast stream-tester-obj
$(CXX) $(PACKAGE_DIR)/stream_tester.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/stream_tester $(IO_FILES) $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES) $(ICU_FLAGS) $(JSC_FILES) $(JSC_BINDINGS)
rm -rf $(PACKAGE_DIR)/stream_tester.o
$(CXX) $(PACKAGE_DIR)/stream-tester.o -g $(OPTIMIZATION_LEVEL) -o ./misctools/stream-tester $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES) $(ICU_FLAGS)
rm -rf $(PACKAGE_DIR)/stream-tester.o
.PHONY: stream-tester-debug
stream-tester-debug:
$(ZIG) build stream-tester-obj -Doptimize=Debug
$(CXX) $(DEBUG_PACKAGE_DIR)/stream-tester.o -g3 -o ./misctools/stream-tester $(DEFAULT_LINKER_FLAGS) -lc $(ARCHIVE_FILES) $(ICU_FLAGS)
.PHONY: sha
sha:

View File

@@ -1784,6 +1784,16 @@ pub const ArrayBuffer = extern struct {
};
}
pub fn createFromLength(globalThis: *JSC.JSGlobalObject, len: usize, comptime kind: BinaryType) JSValue {
JSC.markBinding(@src());
return switch (comptime kind) {
.Uint8Array => Bun__createUint8ArrayForCopy(globalThis, null, len, false),
.Buffer => Bun__createUint8ArrayForCopy(globalThis, null, len, true),
.ArrayBuffer => Bun__createArrayBufferForCopy(globalThis, null, len),
else => @compileError("Not implemented yet"),
};
}
pub fn createEmpty(globalThis: *JSC.JSGlobalObject, comptime kind: JSC.JSValue.JSType) JSValue {
JSC.markBinding(@src());

View File

@@ -3,9 +3,12 @@ const std = @import("std");
const zlib = @import("./zlib.zig");
const brotli = bun.brotli;
const String = bun.String;
pub const Error = struct {
code: bun.String = bun.String.empty,
message: bun.String = bun.String.empty,
// To workaround a zig compiler bug, we avoid using String here.
code: []const u8,
message: []const u8,
};
pub const Ownership = enum { transfer, must_copy };
@@ -31,84 +34,42 @@ pub const Controller = struct {
this.pull_fn(this.ctx);
}
pub fn init(comptime Context: type, context: Context) Controller {
pub fn init(comptime ContextType: type, context: ContextType) Controller {
const Context = std.meta.Child(ContextType);
return Controller{
.ctx = @ptrCast(*anyopaque, context),
.closed = @ptrCast(*bool, &context.closed),
.receive_data_fn = @ptrCast(*const fn (*anyopaque, []const u8, Ownership, Completion) void, Context.onData),
.receive_error_fn = @ptrCast(*const fn (*anyopaque, Error) void, Context.onError),
.pull_fn = @ptrCast(*const fn (*anyopaque) void, Context.onPull),
.receive_data_fn = @ptrCast(*const fn (*anyopaque, []const u8, Ownership, Completion) void, &Context.onData),
.receive_error_fn = @ptrCast(*const fn (*anyopaque, Error) void, &Context.onError),
.pull_fn = @ptrCast(*const fn (*anyopaque) void, &Context.onPull),
};
}
};
pub const CLIFileStreamCompressor = struct {
input: std.fs.File,
output: std.fs.File,
closed: bool = false,
ready_for_more: bool = false,
has_more_output: bool = true,
pub fn controller(this: *CLIFileStreamCompressor) Controller {
return Controller.init(*CLIFileStreamCompressor, this);
}
pub fn onData(this: *CLIFileStreamCompressor, bytes: []const u8, _: Ownership, completion: Completion) void {
std.debug.assert(!this.closed);
this.output.writeAll(bytes) catch @panic("failed to write to file");
if (completion == Completion.last) {
this.ready_for_more = false;
}
}
pub fn onError(this: *CLIFileStreamCompressor, err: Error) void {
std.debug.assert(!this.closed);
std.debug.panic("Error: {}\n{}", .{ err.code, err.message });
}
pub fn onPull(this: *CLIFileStreamCompressor) void {
this.ready_for_more = true;
}
pub fn init(path: []const u8) !CLIFileStreamCompressor {
var file = try std.fs.cwd().openFile(path, .{ .mode = .read_write });
return CLIFileStreamCompressor{ .input = file, .output = std.io.getStdOut() };
}
pub fn run(this: *CLIFileStreamCompressor, stream: *Compressor) !void {
this.ready_for_more = true;
const ctrl = this.controller();
while (this.has_more_output) {
var buffer: [64 * 1024]u8 = undefined;
var to_read: []u8 = buffer[0..try this.input.readAll(&buffer)];
this.has_more_output = to_read.len != 0;
if (this.has_more_output) {
stream.write(to_read, ctrl);
}
}
stream.end(ctrl);
}
};
pub const Compressor = union(enum) {
BrotliEncoder: Brotli.Encoder,
BrotliDecoder: Brotli.Decoder,
pub fn write(this: *Compressor, data: []const u8, controller: Controller) void {
return switch (this) {
.BrotliEncoder => this.BrotliEncoder.write(data, controller),
.BrotliDecoder => this.BrotliDecoder.write(data, controller),
};
switch (this.*) {
.BrotliEncoder => {
this.BrotliEncoder.write(data, controller);
},
.BrotliDecoder => {
this.BrotliDecoder.write(data, controller);
},
}
}
pub fn end(this: *Compressor) void {
return switch (this) {
.BrotliEncoder => this.BrotliEncoder.end(),
.BrotliDecoder => this.BrotliDecoder.end(),
};
pub fn end(this: *Compressor, controller: Controller) void {
switch (this.*) {
.BrotliEncoder => {
this.BrotliEncoder.end(controller);
},
.BrotliDecoder => {
this.BrotliDecoder.end(controller);
},
}
}
pub fn initWithType(comptime Type: type, value: Type) !*Compressor {
@@ -157,7 +118,7 @@ pub const Brotli = struct {
controller.enqueue(
taken,
.must_copy,
false,
.not_last,
);
if (!state.hasMoreOutput())
@@ -195,8 +156,8 @@ pub const Brotli = struct {
.@"error" => {
const code = state.getErrorCode();
controller.fail(Error{
.code = bun.String.static(code.code()),
.message = bun.String.static(code.message()),
.code = code.code(),
.message = code.message(),
});
return;
},
@@ -233,9 +194,7 @@ pub const Brotli = struct {
pub fn end(this: *Decoder, controller: Controller) void {
var state = this.state orelse return;
this.state = null;
consume(state, controller);
std.debug.assert(state.finish(null, null, null));
consume(state, controller);
consume(state, controller, true);
state.deinit();
}
};

View File

@@ -42,16 +42,51 @@ pub const BrotliDecoderState = opaque {
return BrotliDecoderSetParameter(self, param, value) == BROTLI_TRUE;
}
pub fn write(self: *BrotliDecoderState, input: *[]const u8, output: *[]u8, total_size: ?*usize) BrotliDecoderResult {
return BrotliDecoderDecompressStream(self, &input.len, &input.ptr, &output.len, &output.ptr, total_size);
pub fn write(self: *BrotliDecoderState, input: ?*[]const u8, output: ?*[]u8, total_size: ?*usize) BrotliDecoderResult {
var input_len: usize = 0;
var input_ptr: ?[*]const u8 = null;
var output_len: usize = 0;
var output_ptr: ?[*]u8 = null;
if (input) |in| {
input_len = in.len;
input_ptr = in.ptr;
}
if (output) |out| {
output_len = out.len;
output_ptr = out.ptr;
}
defer {
if (input) |in| {
in.len = input_len;
if (input_ptr != null)
in.ptr = input_ptr.?;
}
if (output) |out| {
out.len = output_len;
if (output_ptr != null)
out.ptr = output_ptr.?;
}
}
return BrotliDecoderDecompressStream(self, &input_len, &input_ptr, &output_len, &output_ptr, total_size);
}
pub fn getErrorCode(self: *const BrotliDecoderState) BrotliDecoderErrorCode {
return BrotliDecoderGetErrorCode(self);
}
pub fn hasMoreOutput(self: *BrotliEncoderState) bool {
return BrotliEncoderHasMoreOutput(self) == BROTLI_TRUE;
pub fn hasMoreOutput(self: *const BrotliDecoderState) bool {
return BrotliDecoderHasMoreOutput(self) == BROTLI_TRUE;
}
pub fn take(self: *BrotliDecoderState, requested: usize) []const u8 {
var size: usize = requested;
var ptr = BrotliDecoderTakeOutput(self, &size) orelse return "";
return ptr[0..size];
}
};
@@ -291,7 +326,7 @@ pub extern fn BrotliDecoderAttachDictionary(state: ?*BrotliDecoderState, @"type"
pub extern fn BrotliDecoderCreateInstance(alloc_func: brotli_alloc_func, free_func: brotli_free_func, @"opaque": ?*anyopaque) *BrotliDecoderState;
pub extern fn BrotliDecoderDestroyInstance(state: ?*BrotliDecoderState) void;
pub extern fn BrotliDecoderDecompress(encoded_size: usize, encoded_buffer: [*]const u8, decoded_size: *usize, decoded_buffer: [*]u8) BrotliDecoderResult;
pub extern fn BrotliDecoderDecompressStream(state: *BrotliDecoderState, available_in: *usize, next_in: *[*]const u8, available_out: *usize, next_out: *[*]u8, total_out: ?*usize) BrotliDecoderResult;
pub extern fn BrotliDecoderDecompressStream(state: *BrotliDecoderState, available_in: *usize, next_in: *?[*]const u8, available_out: *usize, next_out: *?[*]u8, total_out: ?*usize) BrotliDecoderResult;
pub extern fn BrotliDecoderHasMoreOutput(state: *const BrotliDecoderState) c_int;
pub extern fn BrotliDecoderTakeOutput(state: *BrotliDecoderState, size: *usize) ?[*]const u8;
pub extern fn BrotliDecoderIsUsed(state: ?*const BrotliDecoderState) c_int;

View File

@@ -2,9 +2,66 @@ const compress = @import("./compress.zig");
pub const bun = @import("./bun.zig");
const std = @import("std");
const Controller = compress.Controller;
const Completion = compress.Completion;
const Ownership = compress.Ownership;
const Error = compress.Error;
pub const CLIFileStreamCompressor = struct {
input: std.fs.File,
output: std.fs.File,
closed: bool = false,
ready_for_more: bool = false,
has_more_output: bool = true,
pub fn controller(this: *CLIFileStreamCompressor) Controller {
return Controller.init(*CLIFileStreamCompressor, this);
}
pub fn onData(this: *CLIFileStreamCompressor, bytes: []const u8, _: Ownership, completion: Completion) void {
std.debug.assert(!this.closed);
this.output.writeAll(bytes) catch @panic("failed to write to file");
if (completion == Completion.last) {
this.ready_for_more = false;
}
}
pub fn onError(this: *CLIFileStreamCompressor, err: Error) void {
_ = err;
std.debug.assert(!this.closed);
// std.debug.panic("Error: {}\n{}", .{ err.code, err.message });
}
pub fn onPull(this: *CLIFileStreamCompressor) void {
this.ready_for_more = true;
}
pub fn init(path: []const u8) !CLIFileStreamCompressor {
var file = try std.fs.cwd().openFile(path, .{ .mode = .read_write });
return CLIFileStreamCompressor{ .input = file, .output = std.io.getStdOut() };
}
pub fn run(this: *CLIFileStreamCompressor, stream: *compress.Compressor) !void {
this.ready_for_more = true;
const ctrl = this.controller();
while (this.has_more_output) {
var buffer: [64 * 1024]u8 = undefined;
var to_read: []const u8 = buffer[0..try this.input.readAll(&buffer)];
this.has_more_output = to_read.len != 0;
if (this.has_more_output) {
stream.write(to_read, ctrl);
}
}
stream.end(ctrl);
}
};
pub fn main() anyerror!void {
const path: []const u8 = std.mem.span(std.os.argv[std.os.argv.len - 1]);
var file_stream = try compress.CLIFileStreamCompressor.init(path);
var file_stream = try CLIFileStreamCompressor.init(path);
var stream: *compress.Compressor = if (bun.strings.endsWith(path, ".br"))
try compress.Compressor.init(compress.Brotli.Decoder.initWithoutOptions())
else