Skip to content

Commit

Permalink
more type cleanup
Browse files Browse the repository at this point in the history
payload returns BaseAsyncReader | Uint8Array
  • Loading branch information
ikreymer committed Sep 8, 2024
1 parent fa5f7e5 commit 1a5e6f3
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 49 deletions.
28 changes: 13 additions & 15 deletions src/archivedb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
type ResourceEntry,
} from "./types";
import { type ArchiveRequest } from "./request";
import { type BaseAsyncIterReader } from "warcio";

const MAX_FUZZY_MATCH = 128000;
const MAX_RESULTS = 16;
Expand Down Expand Up @@ -76,12 +77,12 @@ export type ADBType = {
payload: {
key: string;
value: { digest: string; payload: Uint8Array | null };
indexes: { digest: string; }
indexes: { digest: string };
};
digestRef: {
key: string;
value: DigestRefCount | null;
indexes: { digest: string; }
indexes: { digest: string };
};
};
// ===========================================================================
Expand Down Expand Up @@ -169,12 +170,15 @@ export class ArchiveDB implements DBStore {
//urlStore.createIndex("ts", "ts");
urlStore.createIndex("mimeStatusUrl", ["mime", "status", "url"]);

const payloadStore = db.createObjectStore("payload", {
keyPath: "digest",
});
payloadStore.createIndex("digest", "digest", { unique: true });

const payloadStore = db.createObjectStore("payload", { keyPath: "digest" });
payloadStore.createIndex("digest", "digest", {unique: true});

const digestRef = db.createObjectStore("digestRef", { keyPath: "digest" });
digestRef.createIndex("digest", "digest", {unique: true});
const digestRef = db.createObjectStore("digestRef", {
keyPath: "digest",
});
digestRef.createIndex("digest", "digest", { unique: true });
}
}

Expand Down Expand Up @@ -740,13 +744,7 @@ export class ArchiveDB implements DBStore {
async loadPayload(
result: ResourceEntry,
_opts: ADBOpts,
): Promise<
| AsyncIterable<Uint8Array>
| Iterable<Uint8Array>
| Uint8Array
| null
| undefined
> {
): Promise<BaseAsyncIterReader | Uint8Array | null> {
if (result.digest && !result.payload) {
if (
result.digest === EMPTY_PAYLOAD_SHA256 ||
Expand All @@ -762,7 +760,7 @@ export class ArchiveDB implements DBStore {
return payload;
}

return result.payload;
return result.payload || null;
}

isSelfRedirect(url: string, result: ResourceEntry | undefined) {
Expand Down
41 changes: 22 additions & 19 deletions src/remotearchivedb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,14 @@ export abstract class OnDemandPayloadArchiveDB extends ArchiveDB {
this.streamMap = new Map<string, ChunkStore>();
}

abstract loadRecordFromSource(cdx: RemoteResourceEntry): LoadRecordFromSourceType;
abstract loadRecordFromSource(
cdx: RemoteResourceEntry,
): LoadRecordFromSourceType;

override async loadPayload(
cdx: ResourceEntry,
opts: Opts,
): Promise<
| AsyncIterable<Uint8Array>
| Iterable<Uint8Array>
| Uint8Array
| null
| undefined
> {
): Promise<BaseAsyncIterReader | Uint8Array | null> {
let payload = await super.loadPayload(cdx, opts);
if (payload) {
if (
Expand All @@ -66,8 +62,6 @@ export abstract class OnDemandPayloadArchiveDB extends ArchiveDB {
const chunkstore = this.streamMap.get(cdx.url);
if (chunkstore) {
console.log(`Reuse stream for ${cdx.url}`);
// TODO @ikreymer in this class, this method can return a `PartialStreamReader`, where it can't (and doesn't make sense to) in ArchiveDB — is there a shared set of elements that would make more sense?
// @ts-expect-error
return new PartialStreamReader(chunkstore);
}

Expand Down Expand Up @@ -198,12 +192,10 @@ export abstract class OnDemandPayloadArchiveDB extends ArchiveDB {
// cache here only if somehow the digests don't match (wrong digest from previous versions?)
if (
origResult.digest !== remote.digest &&
// TODO @ikreymer when will payload be an async iterator?
// @ts-expect-error
!payload[Symbol.asyncIterator] &&
remote.digest
remote.digest &&
payload instanceof Uint8Array
) {
await this.commitPayload(payload as Uint8Array, remote.digest);
await this.commitPayload(payload, remote.digest);
}
}

Expand Down Expand Up @@ -231,7 +223,7 @@ export abstract class OnDemandPayloadArchiveDB extends ArchiveDB {
console.log("Not cacheing, too big: " + cdx.url);
}

payload = remote.payload;
payload = remote.payload || null;

if (!payload && !remote.reader) {
return null;
Expand Down Expand Up @@ -265,7 +257,7 @@ export abstract class OnDemandPayloadArchiveDB extends ArchiveDB {
}
}

return payload ? payload : remote.reader;
return payload ? payload : remote.reader || null;
}

async commitPayload(payload: Uint8Array | null | undefined, digest: string) {
Expand Down Expand Up @@ -378,12 +370,13 @@ export class RemotePrefixArchiveDB extends SimpleRemoteArchiveDB {
}

// ===========================================================================
class PartialStreamReader {
class PartialStreamReader extends BaseAsyncIterReader {
chunkstore: ChunkStore;
offset: number;
size: number;

constructor(chunkstore: ChunkStore) {
super();
this.chunkstore = chunkstore;
this.offset = 0;
this.size = this.chunkstore.totalLength;
Expand All @@ -400,7 +393,11 @@ class PartialStreamReader {
this.size = length;
}

getReadableStream() {
async *[Symbol.asyncIterator]() {
yield* this.chunkstore.getChunkIter();
}

override getReadableStream() {
console.log(`Offset: ${this.offset}, Size: ${this.size}`);

const reader: AsyncGenerator<Uint8Array> = this.chunkstore.getChunkIter();
Expand All @@ -413,6 +410,12 @@ class PartialStreamReader {
);
return limitreader.getReadableStream();
}

async readlineRaw(
_maxLength?: number | undefined,
): Promise<Uint8Array | null> {
throw new Error("Method not implemented.");
}
}

// ===========================================================================
Expand Down
8 changes: 3 additions & 5 deletions src/response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const encoder = new TextEncoder();
const decoder = new TextDecoder();

type ArchiveResponseOpts = {
payload: AsyncIterable<Uint8Array> | Uint8Array | null;
payload: BaseAsyncIterReader | Uint8Array | null;
status: number;
statusText?: string;
headers: Headers;
Expand Down Expand Up @@ -120,7 +120,7 @@ class ArchiveResponse {
});
}

reader: AsyncIterReader | null;
reader: BaseAsyncIterReader | null;
buffer: Uint8Array | null;

status: number;
Expand Down Expand Up @@ -153,10 +153,8 @@ class ArchiveResponse {
this.buffer = null;

if (payload && payload instanceof BaseAsyncIterReader) {
// @ts-expect-error [TODO] - TS2740 - Type 'BaseAsyncIterReader' is missing the following properties from type 'AsyncIterReader': compressed, opts, inflator, _sourceIter, and 18 more.
this.reader = payload;
} else {
// @ts-expect-error [TODO] - TS2322 - Type 'Uint8Array | AsyncIterable<Uint8Array> | null' is not assignable to type 'Uint8Array | null'.
this.buffer = payload;
}

Expand Down Expand Up @@ -224,7 +222,7 @@ class ArchiveResponse {
this.reader = null;
}

setReader(reader: AsyncIterReader | ReadableStream) {
setReader(reader: BaseAsyncIterReader | ReadableStream) {
if (reader instanceof BaseAsyncIterReader) {
this.reader = reader;
this.buffer = null;
Expand Down
3 changes: 2 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { type BaseAsyncIterReader } from "warcio";
import { type ArchiveRequest } from "./request";
import { type ArchiveResponse } from "./response";

Expand All @@ -20,7 +21,7 @@ export type ResourceEntry = {
reqHeaders?: Record<string, string> | null;
recordDigest?: string | null;
payload?: Uint8Array | null;
reader?: AsyncIterable<Uint8Array> | Iterable<Uint8Array> | null;
reader?: BaseAsyncIterReader | null;
referrer?: string | null;
// [TODO]
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand Down
7 changes: 4 additions & 3 deletions src/wacz/multiwacz.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ interface MDBType extends ADBType {
}

// ==========================================================================
export class MultiWACZ extends OnDemandPayloadArchiveDB implements WACZLoadSource {
export class MultiWACZ
extends OnDemandPayloadArchiveDB
implements WACZLoadSource
{
config: Config;
waczfiles: Record<string, WACZFile>;
waczNameForHash: Record<string, string>;
Expand Down Expand Up @@ -401,8 +404,6 @@ export class MultiWACZ extends OnDemandPayloadArchiveDB implements WACZLoadSourc
const waczname = wacz!;

const { reader, hasher } = await this.loadFileFromNamedWACZ(
// [TODO]

waczname,
"archive/" + path,
params,
Expand Down
12 changes: 6 additions & 6 deletions src/warcloader.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import { makeHeaders, Canceled, tsToDate } from "./utils";

import {
// [TODO]
// eslint-disable-next-line @typescript-eslint/no-unused-vars
AsyncIterReader,
type BaseAsyncIterReader,
type Source,
WARCParser,
type WARCRecord,
Expand Down Expand Up @@ -378,7 +376,9 @@ class WARCLoader extends BaseParser {
const digest = record.warcPayloadDigest || null;

const payload = record.payload;
const reader = payload ? null : record.reader;
const reader: BaseAsyncIterReader | null = payload
? null
: (record.reader as BaseAsyncIterReader);

const entry: ResourceEntry = {
// @ts-expect-error [TODO] - TS2322 - Type 'string | undefined' is not assignable to type 'string'.
Expand Down Expand Up @@ -486,8 +486,8 @@ class WARCLoader extends BaseParser {
>;

// [TODO]
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
if (progressUpdate && interruptLoads && this.loadId && interruptLoads[this.loadId]) {

if (progressUpdate && this.loadId && interruptLoads[this.loadId]) {
progressUpdate(
Math.round((parser.offset / totalSize) * 95.0),
"Loading Canceled",
Expand Down

0 comments on commit 1a5e6f3

Please sign in to comment.