fix(WebSocket) improve WebSocket Client (#7371)

* improvements

* autobahn tests

* add all tests

* check if docker is active move autobahn to a new file

* fix non SIMD UTF8 validation

* use no trim to catch utf8 issues

* fix extended payload fragmentation

* fmt

* Update src/string_immutable.zig

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>

---------

Co-authored-by: Jarred Sumner <jarred@jarredsumner.com>
This commit is contained in:
Ciro Spaciari
2023-11-30 23:00:41 -03:00
committed by GitHub
parent 54df5c032d
commit 906ba8b2a0
5 changed files with 376 additions and 71 deletions

View File

@@ -42,8 +42,7 @@ pub const WebsocketHeader = packed struct {
len: u7,
mask: bool,
opcode: Opcode,
rsv3: u1 = 0,
rsv2: u1 = 0,
rsv: u2 = 0, //rsv2 and rsv3
compressed: bool = false, // rsv1
final: bool = true,
@@ -90,6 +89,16 @@ pub const WebsocketHeader = packed struct {
pub fn frameSizeIncludingMask(byte_length: usize) usize {
return frameSize(byte_length) + mask_length;
}
pub fn slice(self: WebsocketHeader) [2]u8 {
if (native_endian == .big) return @as([2]u8, @as(u16, @bitCast(self)));
return @as([2]u8, @bitCast(@byteSwap(@as(u16, @bitCast(self)))));
}
pub fn fromSlice(bytes: [2]u8) WebsocketHeader {
if (native_endian == .big) return @as(WebsocketHeader, @bitCast(@as(u16, @bitCast(bytes))));
return @as(WebsocketHeader, @bitCast(@byteSwap(@as(u16, @bitCast(bytes)))));
}
};
pub const WebsocketDataFrame = struct {

View File

@@ -755,7 +755,7 @@ fn parseWebSocketHeader(
// + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
// | Payload Data continued ... |
// +---------------------------------------------------------------+
const header = @as(WebsocketHeader, @bitCast(@byteSwap(@as(u16, @bitCast(bytes)))));
const header = WebsocketHeader.fromSlice(bytes);
const payload = @as(usize, header.len);
payload_length.* = payload;
receiving_type.* = header.opcode;
@@ -765,10 +765,13 @@ fn parseWebSocketHeader(
} or !header.final;
is_final.* = header.final;
need_compression.* = header.compressed;
if (header.mask and (header.opcode == .Text or header.opcode == .Binary)) {
return .need_mask;
}
// reserved bits must be 0
if (header.rsv != 0) {
return .fail;
}
return switch (header.opcode) {
.Text, .Continue, .Binary => if (payload <= 125)
@@ -890,12 +893,14 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
outgoing_websocket: ?*CppWebSocket = null,
receive_state: ReceiveState = ReceiveState.need_header,
receive_header: WebsocketHeader = @as(WebsocketHeader, @bitCast(@as(u16, 0))),
receiving_type: Opcode = Opcode.ResB,
// we need to start with final so we validate the first frame
receiving_is_final: bool = true,
ping_frame_bytes: [128 + 6]u8 = [_]u8{0} ** (128 + 6),
ping_len: u8 = 0,
ping_received: bool = false,
close_received: bool = false,
receive_frame: usize = 0,
receive_body_remain: usize = 0,
@@ -907,6 +912,8 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
globalThis: *JSC.JSGlobalObject,
poll_ref: Async.KeepAlive = Async.KeepAlive.init(),
header_fragment: ?u8 = null,
initial_data_handler: ?*InitialDataHandler = null,
pub const name = if (ssl) "WebSocketClientTLS" else "WebSocketClient";
@@ -1051,7 +1058,8 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
.Text => {
// this function encodes to UTF-16 if > 127
// so we don't need to worry about latin1 non-ascii code points
const utf16_bytes_ = strings.toUTF16Alloc(bun.default_allocator, data_, true) catch {
// we avoid trim since we wanna keep the utf8 validation intact
const utf16_bytes_ = strings.toUTF16AllocNoTrim(bun.default_allocator, data_, true) catch {
this.terminate(ErrorCode.invalid_utf8);
return;
};
@@ -1071,7 +1079,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
JSC.markBinding(@src());
out.didReceiveBytes(data_.ptr, data_.len, @as(u8, @intFromEnum(kind)));
},
else => unreachable,
else => {
this.terminate(ErrorCode.unexpected_opcode);
},
}
}
@@ -1081,12 +1091,18 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
// did all the data fit in the buffer?
// we can avoid copying & allocating a temporary buffer
if (is_final and data_.len == left_in_fragment and this.receive_pending_chunk_len == 0) {
this.dispatchData(data_, kind);
return data_.len;
if (this.receive_buffer.count == 0) {
this.dispatchData(data_, kind);
return data_.len;
} else if (data_.len == 0) {
this.dispatchData(this.receive_buffer.readableSlice(0), kind);
this.clearReceiveBuffers(false);
return 0;
}
}
// this must come after the above check
std.debug.assert(data_.len > 0);
if (data_.len == 0) return 0;
var writable = this.receive_buffer.writableWithSize(data_.len) catch unreachable;
@memcpy(writable[0..data_.len], data_);
@@ -1094,8 +1110,11 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
if (left_in_fragment >= data_.len and left_in_fragment - data_.len - this.receive_pending_chunk_len == 0) {
this.receive_pending_chunk_len = 0;
this.dispatchData(this.receive_buffer.readableSlice(0), kind);
this.clearReceiveBuffers(false);
this.receive_body_remain = 0;
if (is_final) {
this.dispatchData(this.receive_buffer.readableSlice(0), kind);
this.clearReceiveBuffers(false);
}
} else {
this.receive_pending_chunk_len -|= left_in_fragment;
}
@@ -1103,6 +1122,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
}
pub fn handleData(this: *WebSocket, socket: Socket, data_: []const u8) void {
// after receiving close we should ignore the data
if (this.close_received) return;
// This is the start of a task, so we need to drain the microtask queue at the end
defer JSC.VirtualMachine.get().drainMicrotasks();
@@ -1130,21 +1152,16 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
var is_fragmented = false;
var receiving_type = this.receiving_type;
var receive_body_remain = this.receive_body_remain;
var is_final = false;
var is_final = this.receiving_is_final;
var last_receive_data_type = receiving_type;
defer {
if (!terminated) {
if (terminated) {
this.close_received = true;
} else {
this.receive_state = receive_state;
this.receiving_type = last_receive_data_type;
this.receive_body_remain = receive_body_remain;
// if we receive multiple pings in a row
// we just send back the last one
if (this.ping_received) {
_ = this.sendPong(socket);
this.ping_received = false;
}
}
}
@@ -1173,12 +1190,23 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
// +---------------------------------------------------------------+
.need_header => {
if (data.len < 2) {
this.terminate(ErrorCode.control_frame_is_fragmented);
terminated = true;
break;
std.debug.assert(data.len > 0);
if (this.header_fragment == null) {
this.header_fragment = data[0];
break;
}
}
header_bytes[0..2].* = data[0..2].*;
if (this.header_fragment) |header_fragment| {
header_bytes[0] = header_fragment;
header_bytes[1] = data[0];
data = data[1..];
} else {
header_bytes[0..2].* = data[0..2].*;
data = data[2..];
}
this.header_fragment = null;
receive_body_remain = 0;
var need_compression = false;
is_final = false;
@@ -1191,16 +1219,27 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
&is_final,
&need_compression,
);
last_receive_data_type =
if (receiving_type == .Text or receiving_type == .Binary)
receiving_type
else
last_receive_data_type;
data = data[2..];
if (receiving_type.isControl() and is_fragmented) {
if (receiving_type == .Continue) {
// if is final is true continue is invalid
if (this.receiving_is_final) {
// nothing to continue here
this.terminate(ErrorCode.unexpected_opcode);
terminated = true;
break;
}
// only update final if is a valid continue
this.receiving_is_final = is_final;
} else if (receiving_type == .Text or receiving_type == .Binary) {
// if the last one is not final this is invalid because we are waiting a continue
if (!this.receiving_is_final) {
this.terminate(ErrorCode.unexpected_opcode);
terminated = true;
break;
}
// for text and binary frames we need to keep track of final and type
this.receiving_is_final = is_final;
last_receive_data_type = receiving_type;
} else if (receiving_type.isControl() and is_fragmented) {
// Control frames must not be fragmented.
this.terminate(ErrorCode.control_frame_is_fragmented);
terminated = true;
@@ -1257,6 +1296,8 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
.extended_payload_length_16 => @as(usize, 2),
else => unreachable,
};
// we need to wait for more data
if (data.len == 0) return;
if (data.len < byte_size) {
this.terminate(ErrorCode.control_frame_is_fragmented);
@@ -1282,21 +1323,42 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
}
},
.ping => {
const ping_len = @min(data.len, @min(receive_body_remain, 125));
this.ping_len = ping_len;
this.ping_received = true;
this.dispatchData(data[0..ping_len], .Ping);
if (ping_len > 0) {
@memcpy(this.ping_frame_bytes[6..][0..ping_len], data[0..ping_len]);
data = data[ping_len..];
if (!this.ping_received) {
if (receive_body_remain > 125) {
this.terminate(ErrorCode.invalid_control_frame);
terminated = true;
break;
}
this.ping_len = @truncate(receive_body_remain);
receive_body_remain = 0;
this.ping_received = true;
}
const ping_len = this.ping_len;
if (data.len > 0) {
// copy the data to the ping frame
const total_received = @min(ping_len, receive_body_remain + data.len);
const slice = this.ping_frame_bytes[6..][receive_body_remain..total_received];
@memcpy(slice, data[0..slice.len]);
receive_body_remain = total_received;
data = data[slice.len..];
}
const pending_body = ping_len - receive_body_remain;
if (pending_body > 0) {
// wait for more data it can be fragmented
break;
}
const ping_data = this.ping_frame_bytes[6..][0..ping_len];
this.dispatchData(ping_data, .Ping);
receive_state = .need_header;
receive_body_remain = 0;
receiving_type = last_receive_data_type;
this.ping_received = false;
// we need to send all pongs to pass autobahn tests
_ = this.sendPong(socket);
if (data.len == 0) break;
},
.pong => {
@@ -1312,22 +1374,9 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
if (data.len == 0) break;
},
.need_body => {
// Empty messages are valid, but we handle that earlier in the flow.
if (receive_body_remain == 0 and data.len > 0) {
this.terminate(ErrorCode.expected_control_frame);
terminated = true;
break;
}
if (data.len == 0) return;
const to_consume = @min(receive_body_remain, data.len);
const consumed = this.consume(data[0..to_consume], receive_body_remain, last_receive_data_type, is_final);
if (consumed == 0 and last_receive_data_type == .Text) {
this.terminate(ErrorCode.invalid_utf8);
terminated = true;
break;
}
receive_body_remain -= consumed;
data = data[to_consume..];
@@ -1340,12 +1389,37 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
},
.close => {
// closing frame data is text only.
this.close_received = true;
// 2 byte close code
if (data.len > 2 and receive_body_remain >= 2) {
_ = this.consume(data[2..receive_body_remain], receive_body_remain - 2, .Text, true);
// invalid close frame with 1 byte
if (data.len == 1 and receive_body_remain == 1) {
this.terminate(ErrorCode.invalid_control_frame);
terminated = true;
break;
}
// 2 byte close code and optional reason
if (data.len >= 2 and receive_body_remain >= 2) {
var code = std.mem.readInt(u16, data[0..2], .big);
log("Received close with code {d}", .{code});
if (code == 1001) {
// going away actual sends 1000 (normal close)
code = 1000;
} else if ((code < 1000) or (code >= 1004 and code < 1007) or (code >= 1016 and code <= 2999)) {
// invalid codes must clean close with 1002
code = 1002;
}
const reason_len = receive_body_remain - 2;
if (reason_len > 125) {
this.terminate(ErrorCode.invalid_control_frame);
terminated = true;
break;
}
var close_reason_buf: [125]u8 = undefined;
@memcpy(close_reason_buf[0..reason_len], data[2..receive_body_remain]);
this.sendCloseWithBody(socket, code, &close_reason_buf, reason_len);
data = data[receive_body_remain..];
terminated = true;
break;
}
this.sendClose();
@@ -1362,7 +1436,7 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
}
pub fn sendClose(this: *WebSocket) void {
this.sendCloseWithBody(this.tcp, 1001, null, 0);
this.sendCloseWithBody(this.tcp, 1000, null, 0);
}
fn enqueueEncodedBytes(
@@ -1448,15 +1522,16 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
var to_mask = this.ping_frame_bytes[6..][0..this.ping_len];
header.mask = to_mask.len > 0;
header.mask = true;
header.len = @as(u7, @truncate(this.ping_len));
this.ping_frame_bytes[0..2].* = @as([2]u8, @bitCast(header));
this.ping_frame_bytes[0..2].* = header.slice();
if (to_mask.len > 0) {
Mask.fill(this.globalThis, this.ping_frame_bytes[2..6], to_mask, to_mask);
return this.enqueueEncodedBytes(socket, this.ping_frame_bytes[0 .. 6 + @as(usize, this.ping_len)]);
} else {
return this.enqueueEncodedBytes(socket, this.ping_frame_bytes[0..2]);
@memset(this.ping_frame_bytes[2..6], 0); //autobahn tests require that we mask empty pongs
return this.enqueueEncodedBytes(socket, this.ping_frame_bytes[0..6]);
}
}
@@ -1481,15 +1556,21 @@ pub fn NewWebSocketClient(comptime ssl: bool) type {
header.opcode = .Close;
header.mask = true;
header.len = @as(u7, @truncate(body_len + 2));
final_body_bytes[0..2].* = @as([2]u8, @bitCast(@as(u16, @bitCast(header))));
final_body_bytes[0..2].* = header.slice();
var mask_buf: *[4]u8 = final_body_bytes[2..6];
final_body_bytes[6..8].* = @bitCast(@byteSwap(code));
final_body_bytes[6..8].* = if (native_endian == .big) @bitCast(code) else @bitCast(@byteSwap(code));
var reason = bun.String.empty;
if (body) |data| {
if (body_len > 0) {
reason = bun.String.create(data[0..body_len]);
@memcpy(final_body_bytes[8..][0..body_len], data[0..body_len]);
const body_slice = data[0..body_len];
// close is always utf8
if (!strings.isValidUTF8(body_slice)) {
this.terminate(ErrorCode.invalid_utf8);
return;
}
reason = bun.String.create(body_slice);
@memcpy(final_body_bytes[8..][0..body_len], body_slice);
}
}

View File

@@ -3336,6 +3336,58 @@ pub const AsciiVectorU1Small = @Vector(8, u1);
pub const AsciiVectorU16U1 = @Vector(ascii_u16_vector_size, u1);
pub const AsciiU16Vector = @Vector(ascii_u16_vector_size, u16);
pub const max_4_ascii: @Vector(4, u8) = @splat(@as(u8, 127));
const UTF8_ACCEPT: u8 = 0;
const UTF8_REJECT: u8 = 12;
const utf8d: [364]u8 = .{
// The first part of the table maps bytes to character classes that
// to reduce the size of the transition table and create bitmasks.
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9, 9,
7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
8, 8, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
10, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 3, 3, 11, 6, 6, 6, 5, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8,
// The second part is a transition table that maps a combination
// of a state of the automaton and a character class to a state.
0, 12, 24, 36, 60, 96, 84, 12, 12, 12, 48, 72, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12, 0, 12, 12, 12, 12, 12, 0,
12, 0, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 24, 12, 12, 12, 12, 12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 24, 12, 12,
12, 12, 12, 12, 12, 24, 12, 12, 12, 12, 12, 12, 12, 12, 12, 36, 12, 36, 12, 12, 12, 36, 12, 12, 12, 12, 12, 36, 12, 36, 12, 12,
12, 36, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12,
};
pub fn decodeCheck(state: u8, byte: u8) u8 {
const char_type: u32 = utf8d[byte];
// we dont care about the codep
// codep = if (*state != UTF8_ACCEPT) (byte & 0x3f) | (*codep << 6) else (0xff >> char_type) & (byte);
const value = @as(u32, 256) + state + char_type;
if (value >= utf8d.len) return UTF8_REJECT;
return utf8d[value];
}
// Copyright (c) 2008-2009 Bjoern Hoehrmann <bjoern@hoehrmann.de>
// See http://bjoern.hoehrmann.de/utf-8/decoder/dfa/ for details.
pub fn isValidUTF8WithoutSIMD(slice: []const u8) bool {
var state: u8 = 0;
for (slice) |byte| {
state = decodeCheck(state, byte);
}
return state == UTF8_ACCEPT;
}
pub fn isValidUTF8(slice: []const u8) bool {
if (bun.FeatureFlags.use_simdutf)
return bun.simdutf.validate.utf8(slice);
return isValidUTF8WithoutSIMD(slice);
}
pub fn isAllASCII(slice: []const u8) bool {
if (bun.FeatureFlags.use_simdutf)
return bun.simdutf.validate.ascii(slice);

View File

@@ -0,0 +1,164 @@
import { describe, it, expect, afterAll } from "bun:test";
import { which } from "bun";
import { tempDirWithFiles } from "harness";
import child_process from "child_process";
const dockerCLI = which("docker") as string;
function isDockerEnabled(): boolean {
if (!dockerCLI) {
return false;
}
try {
const info = child_process.execSync(`${dockerCLI} info`, { stdio: "ignore" });
return info.toString().indexOf("Server Version:") !== -1;
} catch {
return false;
}
}
describe.if(isDockerEnabled())("autobahn", async () => {
const url = "ws://localhost:9001";
const agent = encodeURIComponent("bun/1.0.0");
let docker: child_process.ChildProcessWithoutNullStreams | null = null;
const { promise, resolve } = Promise.withResolvers();
// we can exclude cases by adding them to the exclude-cases array
// "exclude-cases": [
// "9.*"
// ],
const CWD = tempDirWithFiles("autobahn", {
"fuzzingserver.json": `{
"url": "ws://127.0.0.1:9001",
"outdir": "./",
"cases": ["*"],
"exclude-agent-cases": {}
}`,
"index.json": "{}",
});
docker = child_process.spawn(
dockerCLI,
[
"run",
"-t",
"--rm",
"-v",
`${CWD}:/config`,
"-v",
`${CWD}:/reports`,
"-p",
"9001:9001",
"--name",
"fuzzingserver",
"crossbario/autobahn-testsuite",
],
{
cwd: CWD,
stdout: "pipe",
stderr: "pipe",
},
) as child_process.ChildProcessWithoutNullStreams;
let out = "";
let pending = true;
docker.stdout.on("data", data => {
out += data;
if (pending) {
if (out.indexOf("Autobahn WebSocket") !== -1) {
pending = false;
resolve(true);
}
}
});
docker.on("close", code => {
if (pending) {
pending = false;
resolve(false);
}
});
const cases = await promise;
if (!cases) {
throw new Error("Autobahn WebSocket not detected");
}
function getCaseStatus(testID: number) {
return new Promise((resolve, reject) => {
const socket = new WebSocket(`${url}/getCaseStatus?case=${testID}&agent=${agent}`);
socket.binaryType = "arraybuffer";
socket.addEventListener("message", event => {
resolve(JSON.parse(event.data as string));
});
socket.addEventListener("error", event => {
reject(event);
});
});
}
function getTestCaseCount() {
return new Promise((resolve, reject) => {
const socket = new WebSocket(`${url}/getCaseCount`);
let count: number | null = null;
socket.addEventListener("message", event => {
count = parseInt(event.data as string, 10);
});
socket.addEventListener("close", event => {
if (!count) {
reject("No test count received");
}
resolve(count);
});
});
}
function getCaseInfo(testID: number) {
return new Promise((resolve, reject) => {
const socket = new WebSocket(`${url}/getCaseInfo?case=${testID}`);
socket.binaryType = "arraybuffer";
socket.addEventListener("message", event => {
resolve(JSON.parse(event.data as string));
});
socket.addEventListener("error", event => {
reject(event);
});
});
}
function runTestCase(testID: number) {
return new Promise((resolve, reject) => {
const socket = new WebSocket(`${url}/runCase?case=${testID}&agent=${agent}`);
socket.binaryType = "arraybuffer";
socket.addEventListener("message", event => {
socket.send(event.data);
});
socket.addEventListener("close", event => {
resolve(undefined);
});
socket.addEventListener("error", event => {
reject(event);
});
});
}
const count = (await getTestCaseCount()) as number;
it("should have test cases", () => {
expect(count).toBeGreaterThan(0);
});
for (let i = 1; i <= count; i++) {
const info = (await getCaseInfo(i)) as { id: string; description: string };
const test = parseInt(info.id.split(".")[0]) > 10 ? it.todo : it;
// tests > 10 are compression tests, which are not supported yet
test(`Running test case ${info.id}: ${info.description}`, async () => {
await runTestCase(i);
const result = (await getCaseStatus(i)) as { behavior: string };
expect(["OK", "INFORMATIONAL", "NON-STRICT"]).toContain(result.behavior);
});
}
afterAll(() => {
docker?.kill();
});
});

View File

@@ -1,5 +1,4 @@
import { describe, it, expect } from "bun:test";
import { unsafe, spawn, readableStreamToText } from "bun";
import { bunExe, bunEnv, gc } from "harness";
import { readFileSync } from "fs";
import { join } from "path";
@@ -170,7 +169,7 @@ describe("WebSocket", () => {
const client = WebSocket(url, { tls: { rejectUnauthorized: false } });
const { result, messages } = await testClient(client);
expect(["Hello from Bun!", "Hello from client!"]).toEqual(messages);
expect(result.code).toBe(1001);
expect(result.code).toBe(1000);
}
} finally {
server.stop(true);