From 155aa080e1966e0da70de44293aee7bfef55261b Mon Sep 17 00:00:00 2001 From: Alisue Date: Thu, 11 May 2023 23:35:02 +0900 Subject: [PATCH] :+1: Use Web Streams API for host protocol --- denops/@denops-private/cli.ts | 64 +++++++++++++------------ denops/@denops-private/host/nvim.ts | 33 ++++++++----- denops/@denops-private/host/vim.ts | 25 ++++++++-- denops/@denops-private/tee.ts | 54 --------------------- denops/@denops-private/tee_test.ts | 34 ------------- denops/@denops-private/worker/script.ts | 20 +++----- 6 files changed, 81 insertions(+), 149 deletions(-) delete mode 100644 denops/@denops-private/tee.ts delete mode 100644 denops/@denops-private/tee_test.ts diff --git a/denops/@denops-private/cli.ts b/denops/@denops-private/cli.ts index 1b558b90..f435672d 100644 --- a/denops/@denops-private/cli.ts +++ b/denops/@denops-private/cli.ts @@ -1,53 +1,31 @@ import { parse } from "https://deno.land/std@0.186.0/flags/mod.ts"; +import { pop } from "https://deno.land/x/streamtools@v0.4.1/mod.ts"; import { using } from "https://deno.land/x/disposable@v1.1.1/mod.ts#^"; import { Service } from "./service.ts"; import { Vim } from "./host/vim.ts"; import { Neovim } from "./host/nvim.ts"; -import { tee } from "./tee.ts"; type Host = typeof Vim | typeof Neovim; -async function detectHost(reader: Deno.Reader): Promise { +async function detectHost(reader: ReadableStream): Promise { const marks = new TextEncoder().encode('[{tf"0123456789'); - const chunk = new Uint8Array(1); - await reader.read(chunk); - const mark = chunk.at(0); + const mark = (await pop(reader))?.at(0); if (mark && marks.includes(mark)) { return Vim; } return Neovim; } -const { hostname, port, quiet, identity } = parse(Deno.args, { - string: ["hostname", "port"], - boolean: ["quiet", "identity"], -}); - -const listener = Deno.listen({ - hostname: hostname ?? "127.0.0.1", - port: Number(port ?? "32123"), -}); -const localAddr = listener.addr as Deno.NetAddr; - -if (identity) { - console.log(`${localAddr.hostname}:${localAddr.port}`); -} -if (!quiet) { - console.log( - `Listen denops clients on ${localAddr.hostname}:${localAddr.port}`, - ); -} - -for await (const conn of listener) { +async function handleConn(conn: Deno.Conn): Promise { const remoteAddr = conn.remoteAddr as Deno.NetAddr; - const reader = conn; - const writer = conn; + const reader = conn.readable; + const writer = conn.writable; - const [r1, r2] = tee(reader); + const [r1, r2] = reader.tee(); // Detect host from payload const hostClass = await detectHost(r1); - r1.close(); + r1.cancel(); if (!quiet) { console.log( @@ -56,7 +34,7 @@ for await (const conn of listener) { } // Create host and service - using(new hostClass(r2, writer), async (host) => { + await using(new hostClass(r2, writer), async (host) => { await using(new Service(host), async (service) => { await service.host.waitClosed(); if (!quiet) { @@ -67,3 +45,27 @@ for await (const conn of listener) { }); }); } + +const { hostname, port, quiet, identity } = parse(Deno.args, { + string: ["hostname", "port"], + boolean: ["quiet", "identity"], +}); + +const listener = Deno.listen({ + hostname: hostname ?? "127.0.0.1", + port: Number(port ?? "32123"), +}); +const localAddr = listener.addr as Deno.NetAddr; + +if (identity) { + console.log(`${localAddr.hostname}:${localAddr.port}`); +} +if (!quiet) { + console.log( + `Listen denops clients on ${localAddr.hostname}:${localAddr.port}`, + ); +} + +for await (const conn of listener) { + handleConn(conn).catch((err) => console.error("Unexpected error", err)); +} diff --git a/denops/@denops-private/host/nvim.ts b/denops/@denops-private/host/nvim.ts index f7d44fe8..ff9503a2 100644 --- a/denops/@denops-private/host/nvim.ts +++ b/denops/@denops-private/host/nvim.ts @@ -2,21 +2,30 @@ import { assertArray, assertString, } from "https://deno.land/x/unknownutil@v2.1.1/mod.ts#^"; -import { Session } from "https://deno.land/x/msgpack_rpc@v3.1.6/mod.ts#^"; -import { responseTimeout } from "../defs.ts"; +import { + Client, + Session, +} from "https://deno.land/x/messagepack_rpc@v1.0.0/mod.ts#^"; import { Invoker, isInvokerMethod } from "./invoker.ts"; import { Host } from "./base.ts"; export class Neovim implements Host { #session: Session; + #client: Client; constructor( - reader: Deno.Reader & Deno.Closer, - writer: Deno.Writer, + reader: ReadableStream, + writer: WritableStream, ) { - this.#session = new Session(reader, writer, undefined, { - responseTimeout, - }); + this.#session = new Session(reader, writer); + this.#session.onMessageError = (error, message) => { + if (error instanceof Error && error.name === "Interrupted") { + return; + } + console.error(`Failed to handle message ${message}`, error); + }; + this.#session.start(); + this.#client = new Client(this.#session); } redraw(_force?: boolean): Promise { @@ -25,13 +34,13 @@ export class Neovim implements Host { } call(fn: string, ...args: unknown[]): Promise { - return this.#session.call("nvim_call_function", fn, args); + return this.#client.call("nvim_call_function", fn, args); } async batch( ...calls: [string, ...unknown[]][] ): Promise<[unknown[], string]> { - const [ret, err] = await this.#session.call( + const [ret, err] = await this.#client.call( "nvim_call_atomic", calls.map(([fn, ...args]) => ["nvim_call_function", [fn, args]]), ) as [unknown[], [number, number, string] | null]; @@ -59,11 +68,11 @@ export class Neovim implements Host { }; } - waitClosed(): Promise { - return this.#session.waitClosed(); + async waitClosed(): Promise { + await this.#session.wait(); } dispose(): void { - this.#session.dispose(); + this.#session.shutdown(); } } diff --git a/denops/@denops-private/host/vim.ts b/denops/@denops-private/host/vim.ts index 6d67880a..4f7c60f8 100644 --- a/denops/@denops-private/host/vim.ts +++ b/denops/@denops-private/host/vim.ts @@ -1,3 +1,7 @@ +import { + readerFromStreamReader, + writerFromStreamWriter, +} from "https://deno.land/std@0.186.0/streams/mod.ts"; import { Session as VimSession, } from "https://deno.land/x/vim_channel_command@v0.7.1/mod.ts#^"; @@ -9,15 +13,24 @@ import { Invoker, isInvokerMethod } from "./invoker.ts"; import { Host } from "./base.ts"; export class Vim implements Host { + #reader: ReadableStreamDefaultReader; + #writer: WritableStreamDefaultWriter; #session: VimSession; constructor( - reader: Deno.Reader & Deno.Closer, - writer: Deno.Writer, + reader: ReadableStream, + writer: WritableStream, ) { - this.#session = new VimSession(reader, writer, undefined, { - responseTimeout, - }); + this.#reader = reader.getReader(); + this.#writer = writer.getWriter(); + this.#session = new VimSession( + readerFromStreamReader(this.#reader), + writerFromStreamWriter(this.#writer), + undefined, + { + responseTimeout, + }, + ); } redraw(force?: boolean): Promise { @@ -71,6 +84,8 @@ export class Vim implements Host { } dispose(): void { + this.#reader.releaseLock(); + this.#writer.releaseLock(); this.#session.dispose(); } } diff --git a/denops/@denops-private/tee.ts b/denops/@denops-private/tee.ts deleted file mode 100644 index 1ef2e823..00000000 --- a/denops/@denops-private/tee.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { Buffer } from "https://deno.land/std@0.186.0/io/mod.ts"; -import { writeAll } from "https://deno.land/std@0.186.0/streams/mod.ts"; - -type Reader = Deno.Reader; -type ReadCloser = Reader & Deno.Closer; - -type Status = { - halfClosed: boolean; -}; - -class Tee { - #reader: R; - #inner: Buffer; - #outer: Buffer; - #status: Status; - - constructor(reader: R, inner: Buffer, outer: Buffer, status: Status) { - this.#reader = reader; - this.#inner = inner; - this.#outer = outer; - this.#status = status; - } - - async read(p: Uint8Array): Promise { - let n = await this.#inner.read(p); - if (n) { - return n; - } - n = await this.#reader.read(p); - if (n && !this.#status.halfClosed) { - await writeAll(this.#outer, p.subarray(0, n)); - } - return n; - } - - close(): void { - if (this.#status.halfClosed) { - if ("close" in this.#reader) { - this.#reader.close(); - } - } - this.#status.halfClosed = true; - this.#inner.reset(); - } -} - -export function tee( - reader: R, -): [ReadCloser, ReadCloser] { - const lhs = new Buffer(); - const rhs = new Buffer(); - const status = { halfClosed: false }; - return [new Tee(reader, lhs, rhs, status), new Tee(reader, rhs, lhs, status)]; -} diff --git a/denops/@denops-private/tee_test.ts b/denops/@denops-private/tee_test.ts deleted file mode 100644 index a83e0df5..00000000 --- a/denops/@denops-private/tee_test.ts +++ /dev/null @@ -1,34 +0,0 @@ -import { - assert, - assertEquals, -} from "https://deno.land/std@0.186.0/testing/asserts.ts"; -import { - readAll, - writeAll, -} from "https://deno.land/std@0.186.0/streams/mod.ts"; -import { Buffer } from "https://deno.land/std@0.186.0/io/mod.ts"; -import { tee } from "./tee.ts"; - -Deno.test("tee", async (t) => { - const encoder = new TextEncoder(); - - await t.step("returns tuple of two Deno.Reader & Deno.Closer", () => { - const buffer = new Buffer(); - const [r1, r2] = tee(buffer); - assert("read" in r1 && typeof r1.read === "function"); - assert("read" in r2 && typeof r2.read === "function"); - }); - - await t.step( - "each readers are independent copy of the original reader", - async () => { - const buffer = new Buffer(); - await writeAll(buffer, encoder.encode("Hello")); - await writeAll(buffer, encoder.encode("World")); - const [r1, r2] = tee(buffer); - - assertEquals(await readAll(r1), encoder.encode("HelloWorld")); - assertEquals(await readAll(r2), encoder.encode("HelloWorld")); - }, - ); -}); diff --git a/denops/@denops-private/worker/script.ts b/denops/@denops-private/worker/script.ts index c810a7fb..c6b37c54 100644 --- a/denops/@denops-private/worker/script.ts +++ b/denops/@denops-private/worker/script.ts @@ -7,7 +7,7 @@ import { import { Client, Session, -} from "https://deno.land/x/messagepack_rpc@v0.3.1/mod.ts#^"; +} from "https://deno.land/x/messagepack_rpc@v1.0.0/mod.ts#^"; import { readableStreamFromWorker, writableStreamFromWorker, @@ -25,12 +25,13 @@ async function main( const session = new Session( readableStreamFromWorker(worker), writableStreamFromWorker(worker), - { - onRequestMessageError: onMessageError, - onResponseMessageError: onMessageError, - onNotificationMessageError: onMessageError, - }, ); + session.onMessageError = (error, message) => { + if (error instanceof Error && error.name === "Interrupted") { + return; + } + console.error(`Failed to handle message ${message}`, error); + }; session.start(); const client = new Client(session); // Protect the process itself from "Unhandled promises" @@ -100,13 +101,6 @@ async function main( self.close(); } -function onMessageError(message: unknown, error: Error): void { - if (error instanceof Error && error.name === "Interrupted") { - return; - } - console.error(`Failed to handle message ${message}`, error); -} - function isMeta(v: unknown): v is Meta { if (!isObject(v)) { return false;