Skip to content

Commit

Permalink
VSCODE-639: Replace regex fragment matching with streaming KMP (#837)
Browse files Browse the repository at this point in the history
* Replace regex fragment matching with streaming KMP

* Simplify content capturing

* Add an explanation for FragmentMatcher

* Handle multiple code blocks per fragment
  • Loading branch information
nirinchev authored Oct 24, 2024
1 parent cace4df commit 10be1c6
Show file tree
Hide file tree
Showing 2 changed files with 246 additions and 70 deletions.
240 changes: 170 additions & 70 deletions src/participant/streamParsing.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,166 @@
function escapeRegex(str: string): string {
return str.replace(/[.*+?^${}()|[\]\\]/g, '\\$&');
// This is a stateful streaming implementation of the Knuth-Morris-Pratt algorithm
// for substring search. It supports being invoked with multiple fragments of the
// haystack and is capable of finding matches spanning multiple fragments.
class StreamingKMP {
public needle: string;
private _lookupVector: number[];

// In cases where we are fed a string that has a suffix that matches a prefix
// of the needle, we're storing the index in the needle which we last matched.
// Then when we get a new haystack, we start matching from that needle.
private _lastMatchingIndex = 0;

constructor(needle: string) {
this.needle = needle;
this._lookupVector = this._createLookupVector();
}

private _createLookupVector(): number[] {
const vector = new Array<number>(this.needle.length);
let j = 0;
vector[0] = 0;

for (let i = 1; i < this.needle.length; i++) {
while (j > 0 && this.needle[i] !== this.needle[j]) {
j = vector[j - 1];
}

if (this.needle[i] === this.needle[j]) {
j++;
}

vector[i] = j;
}

return vector;
}

// Returns the index in the haystackFragment **after** the needle.
// This is done because the match may have occurred over multiple fragments,
// so the index of the needle start would be negative.
public match(haystackFragment: string): number {
let j = this._lastMatchingIndex; // index in needle
let i = 0; // index in haystack

while (i < haystackFragment.length) {
if (haystackFragment[i] === this.needle[j]) {
i++;
j++;
}

if (j === this.needle.length) {
this._lastMatchingIndex = 0;
return i;
}

if (
i < haystackFragment.length &&
haystackFragment[i] !== this.needle[j]
) {
if (j !== 0) {
j = this._lookupVector[j - 1];
} else {
i++;
}
}
}

this._lastMatchingIndex = j;
return -1;
}

public reset(): void {
this._lastMatchingIndex = 0;
}
}

// This class is essentially a state machine that processes a stream of text fragments
// and emitting a callback with the content between each start and end identifier. The
// two states we have are:
// 1. "waiting for start identifier" - `_matchedContent === undefined`
// 2. "waiting for end identifier" - `_matchedContent !== undefined`
// with the state transitioning from one to the other when the corresponding identifier
// is matched in the fragment stream.
class FragmentMatcher {
private _startMatcher: StreamingKMP;
private _endMatcher: StreamingKMP;
private _matchedContent?: string;
private _onContentMatched: (content: string) => void;
private _onFragmentProcessed: (content: string) => void;

constructor({
identifier,
onContentMatched,
onFragmentProcessed,
}: {
identifier: {
start: string;
end: string;
};
onContentMatched: (content: string) => void;
onFragmentProcessed: (content: string) => void;
}) {
this._startMatcher = new StreamingKMP(identifier.start);
this._endMatcher = new StreamingKMP(identifier.end);
this._onContentMatched = onContentMatched;
this._onFragmentProcessed = onFragmentProcessed;
}

private _contentMatched(): void {
const content = this._matchedContent;
if (content !== undefined) {
// Strip the trailing end identifier from the matched content
this._onContentMatched(
content.slice(0, content.length - this._endMatcher.needle.length)
);
}

this._matchedContent = undefined;
this._startMatcher.reset();
this._endMatcher.reset();
}

// This needs to be invoked every time before we call `process` recursively or when `process`
// completes processing the fragment. It will emit a notification to subscribers with the partial
// fragment we've processed, regardless of whether there's a match or not.
private _partialFragmentProcessed(
fragment: string,
index: number | undefined = undefined
): void {
this._onFragmentProcessed(
index === undefined ? fragment : fragment.slice(0, index)
);
}

public process(fragment: string): void {
if (this._matchedContent === undefined) {
// We haven't matched the start identifier yet, so try and do that
const startIndex = this._startMatcher.match(fragment);
if (startIndex !== -1) {
// We found a match for the start identifier - update `_matchedContent` to an empty string
// and recursively call `process` with the remainder of the fragment.
this._matchedContent = '';
this._partialFragmentProcessed(fragment, startIndex);
this.process(fragment.slice(startIndex));
} else {
this._partialFragmentProcessed(fragment);
}
} else {
const endIndex = this._endMatcher.match(fragment);
if (endIndex !== -1) {
// We've matched the end - emit the matched content and continue processing the partial fragment
this._matchedContent += fragment.slice(0, endIndex);
this._partialFragmentProcessed(fragment, endIndex);
this._contentMatched();
this.process(fragment.slice(endIndex));
} else {
// We haven't matched the end yet - append the fragment to the matched content and wait
// for a future fragment to contain the end identifier.
this._matchedContent += fragment;
this._partialFragmentProcessed(fragment);
}
}
}
}

/**
Expand All @@ -22,74 +183,13 @@ export async function processStreamWithIdentifiers({
end: string;
};
}): Promise<void> {
const escapedIdentifierStart = escapeRegex(identifier.start);
const escapedIdentifierEnd = escapeRegex(identifier.end);
const regex = new RegExp(
`${escapedIdentifierStart}([\\s\\S]*?)${escapedIdentifierEnd}`,
'g'
);

let contentSinceLastIdentifier = '';
for await (const fragment of inputIterable) {
contentSinceLastIdentifier += fragment;
const fragmentMatcher = new FragmentMatcher({
identifier,
onContentMatched: onStreamIdentifier,
onFragmentProcessed: processStreamFragment,
});

let lastIndex = 0;
let match: RegExpExecArray | null;
while ((match = regex.exec(contentSinceLastIdentifier)) !== null) {
const endIndex = regex.lastIndex;

// Stream content up to the end of the identifier.
const contentToStream = contentSinceLastIdentifier.slice(
lastIndex,
endIndex
);
processStreamFragment(contentToStream);

const identifierContent = match[1];
onStreamIdentifier(identifierContent);

lastIndex = endIndex;
}

if (lastIndex > 0) {
// Remove all of the processed content.
contentSinceLastIdentifier = contentSinceLastIdentifier.slice(lastIndex);
// Reset the regex.
regex.lastIndex = 0;
} else {
// Clear as much of the content as we can safely.
const maxUnprocessedLength = identifier.start.length - 1;
if (contentSinceLastIdentifier.length > maxUnprocessedLength) {
const identifierIndex = contentSinceLastIdentifier.indexOf(
identifier.start
);
if (identifierIndex > -1) {
// We have an identifier, so clear up until the identifier.
const contentToStream = contentSinceLastIdentifier.slice(
0,
identifierIndex
);
processStreamFragment(contentToStream);
contentSinceLastIdentifier =
contentSinceLastIdentifier.slice(identifierIndex);
} else {
// No identifier, so clear up until the last maxUnprocessedLength.
const processUpTo =
contentSinceLastIdentifier.length - maxUnprocessedLength;
const contentToStream = contentSinceLastIdentifier.slice(
0,
processUpTo
);
processStreamFragment(contentToStream);
contentSinceLastIdentifier =
contentSinceLastIdentifier.slice(processUpTo);
}
}
}
}

// Finish up anything not streamed yet.
if (contentSinceLastIdentifier.length > 0) {
processStreamFragment(contentSinceLastIdentifier);
for await (const fragment of inputIterable) {
fragmentMatcher.process(fragment);
}
}
76 changes: 76 additions & 0 deletions src/test/suite/participant/streamParsing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,80 @@ suite('processStreamWithIdentifiers', () => {
expect(fragmentsProcessed.join('')).to.deep.equal(inputFragments.join(''));
expect(identifiersStreamed).to.deep.equal(['\ncode1\n', '\ncode2\n']);
});

test('one fragment containing multiple code blocks emits event in correct order', async () => {
// In case we have one fragment containing multiple code blocks, we want to make sure that
// fragment notifications and identifier notifications arrive in the right order so that we're
// adding code actions after the correct subfragment.
// For example:
// 'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.'
//
// should emit:
//
// processStreamFragment: 'Text before code.\n```js\ncode1\n```'
// onStreamIdentifier: '\ncode1\n'
// processStreamFragment: '\nText between code.\n```js\ncode2\n```'
// onStreamIdentifier: '\ncode2\n'
// processStreamFragment: '\nText after code.'
//
// in that order to ensure we add each code action immediately after the code block
// rather than add both at the end.

const inputFragments = [
'Text before code.\n```js\ncode1\n```\nText between code.\n```js\ncode2\n```\nText after code.',
];

const inputIterable = asyncIterableFromArray<string>(inputFragments);
const identifier = { start: '```js', end: '```' };

const fragmentsEmitted: {
source: 'processStreamFragment' | 'onStreamIdentifier';
content: string;
}[] = [];

const getFragmentHandler = (
source: 'processStreamFragment' | 'onStreamIdentifier'
): ((fragment: string) => void) => {
return (fragment: string): void => {
// It's an implementation detail, but the way the code is structured today, we're splitting the emitted fragments
// whenever we find either a start or end identifier. This is irrelevant as long as we're emitting the entirety of
// the text until the end of the code block in `processStreamFragment` and then the code block itself in `onStreamIdentifier`.
// With the code below, we're combining all subfragments with the same source to make the test verify the desired
// behavior rather than the actual implementation.
const lastFragment = fragmentsEmitted[fragmentsEmitted.length - 1];
if (lastFragment?.source === source) {
lastFragment.content += fragment;
} else {
fragmentsEmitted.push({ source, content: fragment });
}
};
};

await processStreamWithIdentifiers({
processStreamFragment: getFragmentHandler('processStreamFragment'),
onStreamIdentifier: getFragmentHandler('onStreamIdentifier'),
inputIterable,
identifier,
});

expect(fragmentsEmitted).to.have.length(5);
expect(fragmentsEmitted[0].source).to.equal('processStreamFragment');
expect(fragmentsEmitted[0].content).to.equal(
'Text before code.\n```js\ncode1\n```'
);

expect(fragmentsEmitted[1].source).to.equal('onStreamIdentifier');
expect(fragmentsEmitted[1].content).to.equal('\ncode1\n');

expect(fragmentsEmitted[2].source).to.equal('processStreamFragment');
expect(fragmentsEmitted[2].content).to.equal(
'\nText between code.\n```js\ncode2\n```'
);

expect(fragmentsEmitted[3].source).to.equal('onStreamIdentifier');
expect(fragmentsEmitted[3].content).to.equal('\ncode2\n');

expect(fragmentsEmitted[4].source).to.equal('processStreamFragment');
expect(fragmentsEmitted[4].content).to.equal('\nText after code.');
});
});

0 comments on commit 10be1c6

Please sign in to comment.