Skip to content

Commit

Permalink
👍 Use Web Streams API for host protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
lambdalisue committed May 14, 2023
1 parent 9e3ecf2 commit 155aa08
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 149 deletions.
64 changes: 33 additions & 31 deletions denops/@denops-private/cli.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,31 @@
import { parse } from "https://deno.land/[email protected]/flags/mod.ts";
import { pop } from "https://deno.land/x/[email protected]/mod.ts";
import { using } from "https://deno.land/x/[email protected]/mod.ts#^";
import { Service } from "./service.ts";
import { Vim } from "./host/vim.ts";
import { Neovim } from "./host/nvim.ts";
import { tee } from "./tee.ts";

type Host = typeof Vim | typeof Neovim;

async function detectHost(reader: Deno.Reader): Promise<Host> {
async function detectHost(reader: ReadableStream<Uint8Array>): Promise<Host> {
const marks = new TextEncoder().encode('[{tf"0123456789');
const chunk = new Uint8Array(1);
await reader.read(chunk);
const mark = chunk.at(0);
const mark = (await pop(reader))?.at(0);
if (mark && marks.includes(mark)) {
return Vim;
}
return Neovim;
}

const { hostname, port, quiet, identity } = parse(Deno.args, {
string: ["hostname", "port"],
boolean: ["quiet", "identity"],
});

const listener = Deno.listen({
hostname: hostname ?? "127.0.0.1",
port: Number(port ?? "32123"),
});
const localAddr = listener.addr as Deno.NetAddr;

if (identity) {
console.log(`${localAddr.hostname}:${localAddr.port}`);
}
if (!quiet) {
console.log(
`Listen denops clients on ${localAddr.hostname}:${localAddr.port}`,
);
}

for await (const conn of listener) {
async function handleConn(conn: Deno.Conn): Promise<void> {
const remoteAddr = conn.remoteAddr as Deno.NetAddr;
const reader = conn;
const writer = conn;
const reader = conn.readable;
const writer = conn.writable;

const [r1, r2] = tee(reader);
const [r1, r2] = reader.tee();

// Detect host from payload
const hostClass = await detectHost(r1);
r1.close();
r1.cancel();

if (!quiet) {
console.log(
Expand All @@ -56,7 +34,7 @@ for await (const conn of listener) {
}

// Create host and service
using(new hostClass(r2, writer), async (host) => {
await using(new hostClass(r2, writer), async (host) => {
await using(new Service(host), async (service) => {
await service.host.waitClosed();
if (!quiet) {
Expand All @@ -67,3 +45,27 @@ for await (const conn of listener) {
});
});
}

const { hostname, port, quiet, identity } = parse(Deno.args, {
string: ["hostname", "port"],
boolean: ["quiet", "identity"],
});

const listener = Deno.listen({
hostname: hostname ?? "127.0.0.1",
port: Number(port ?? "32123"),
});
const localAddr = listener.addr as Deno.NetAddr;

if (identity) {
console.log(`${localAddr.hostname}:${localAddr.port}`);
}
if (!quiet) {
console.log(
`Listen denops clients on ${localAddr.hostname}:${localAddr.port}`,
);
}

for await (const conn of listener) {
handleConn(conn).catch((err) => console.error("Unexpected error", err));
}
33 changes: 21 additions & 12 deletions denops/@denops-private/host/nvim.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,30 @@ import {
assertArray,
assertString,
} from "https://deno.land/x/[email protected]/mod.ts#^";
import { Session } from "https://deno.land/x/[email protected]/mod.ts#^";
import { responseTimeout } from "../defs.ts";
import {
Client,
Session,
} from "https://deno.land/x/[email protected]/mod.ts#^";
import { Invoker, isInvokerMethod } from "./invoker.ts";
import { Host } from "./base.ts";

export class Neovim implements Host {
#session: Session;
#client: Client;

constructor(
reader: Deno.Reader & Deno.Closer,
writer: Deno.Writer,
reader: ReadableStream<Uint8Array>,
writer: WritableStream<Uint8Array>,
) {
this.#session = new Session(reader, writer, undefined, {
responseTimeout,
});
this.#session = new Session(reader, writer);
this.#session.onMessageError = (error, message) => {
if (error instanceof Error && error.name === "Interrupted") {
return;
}
console.error(`Failed to handle message ${message}`, error);
};
this.#session.start();
this.#client = new Client(this.#session);
}

redraw(_force?: boolean): Promise<void> {
Expand All @@ -25,13 +34,13 @@ export class Neovim implements Host {
}

call(fn: string, ...args: unknown[]): Promise<unknown> {
return this.#session.call("nvim_call_function", fn, args);
return this.#client.call("nvim_call_function", fn, args);
}

async batch(
...calls: [string, ...unknown[]][]
): Promise<[unknown[], string]> {
const [ret, err] = await this.#session.call(
const [ret, err] = await this.#client.call(
"nvim_call_atomic",
calls.map(([fn, ...args]) => ["nvim_call_function", [fn, args]]),
) as [unknown[], [number, number, string] | null];
Expand Down Expand Up @@ -59,11 +68,11 @@ export class Neovim implements Host {
};
}

waitClosed(): Promise<void> {
return this.#session.waitClosed();
async waitClosed(): Promise<void> {
await this.#session.wait();
}

dispose(): void {
this.#session.dispose();
this.#session.shutdown();
}
}
25 changes: 20 additions & 5 deletions denops/@denops-private/host/vim.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import {
readerFromStreamReader,
writerFromStreamWriter,
} from "https://deno.land/[email protected]/streams/mod.ts";
import {
Session as VimSession,
} from "https://deno.land/x/[email protected]/mod.ts#^";
Expand All @@ -9,15 +13,24 @@ import { Invoker, isInvokerMethod } from "./invoker.ts";
import { Host } from "./base.ts";

export class Vim implements Host {
#reader: ReadableStreamDefaultReader<Uint8Array>;
#writer: WritableStreamDefaultWriter<Uint8Array>;
#session: VimSession;

constructor(
reader: Deno.Reader & Deno.Closer,
writer: Deno.Writer,
reader: ReadableStream<Uint8Array>,
writer: WritableStream<Uint8Array>,
) {
this.#session = new VimSession(reader, writer, undefined, {
responseTimeout,
});
this.#reader = reader.getReader();
this.#writer = writer.getWriter();
this.#session = new VimSession(
readerFromStreamReader(this.#reader),
writerFromStreamWriter(this.#writer),
undefined,
{
responseTimeout,
},
);
}

redraw(force?: boolean): Promise<void> {
Expand Down Expand Up @@ -71,6 +84,8 @@ export class Vim implements Host {
}

dispose(): void {
this.#reader.releaseLock();
this.#writer.releaseLock();
this.#session.dispose();
}
}
Expand Down
54 changes: 0 additions & 54 deletions denops/@denops-private/tee.ts

This file was deleted.

34 changes: 0 additions & 34 deletions denops/@denops-private/tee_test.ts

This file was deleted.

20 changes: 7 additions & 13 deletions denops/@denops-private/worker/script.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import {
import {
Client,
Session,
} from "https://deno.land/x/messagepack_rpc@v0.3.1/mod.ts#^";
} from "https://deno.land/x/messagepack_rpc@v1.0.0/mod.ts#^";
import {
readableStreamFromWorker,
writableStreamFromWorker,
Expand All @@ -25,12 +25,13 @@ async function main(
const session = new Session(
readableStreamFromWorker(worker),
writableStreamFromWorker(worker),
{
onRequestMessageError: onMessageError,
onResponseMessageError: onMessageError,
onNotificationMessageError: onMessageError,
},
);
session.onMessageError = (error, message) => {
if (error instanceof Error && error.name === "Interrupted") {
return;
}
console.error(`Failed to handle message ${message}`, error);
};
session.start();
const client = new Client(session);
// Protect the process itself from "Unhandled promises"
Expand Down Expand Up @@ -100,13 +101,6 @@ async function main(
self.close();
}

function onMessageError(message: unknown, error: Error): void {
if (error instanceof Error && error.name === "Interrupted") {
return;
}
console.error(`Failed to handle message ${message}`, error);
}

function isMeta(v: unknown): v is Meta {
if (!isObject(v)) {
return false;
Expand Down

0 comments on commit 155aa08

Please sign in to comment.