Skip to content

Commit

Permalink
Truncate values in the messages list, retrieve up to 50k of data when…
Browse files Browse the repository at this point in the history
… checking a message details (streamshub#104)
  • Loading branch information
riccardo-forina authored Oct 24, 2023
1 parent 02bb1a0 commit 474532f
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 18 deletions.
26 changes: 26 additions & 0 deletions ui/api/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export async function getTopicMessages(
value: string;
}
| undefined;
maxValueLength: number | undefined;
},
): Promise<Message[]> {
const sp = new URLSearchParams(
Expand All @@ -55,6 +56,7 @@ export async function getTopicMessages(
? "gte," + params.filter?.value
: undefined,
"page[size]": params.pageSize,
maxValueLength: Math.min(params.maxValueLength || 150, 50000),
}),
);
const consumeRecordsQuery = sp.toString();
Expand All @@ -75,3 +77,27 @@ export async function getTopicMessages(
// );
return MessageApiResponse.parse(rawData).data;
}

export async function getTopicMessage(
kafkaId: string,
topicId: string,
params: {
partition: number;
offset: number;
},
): Promise<Message | undefined> {
log.info({ kafkaId, topicId, params }, "getTopicMessage response");
const messages = await getTopicMessages(kafkaId, topicId, {
pageSize: 1,
partition: params.partition,
filter: {
type: "offset",
value: params.offset,
},
maxValueLength: 50000,
});

log.debug({ messages }, "getTopicMessage response");

return messages.length === 1 ? messages[0] : undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import { MessagesTable } from "./_components/MessagesTable";

export function ConnectedMessagesTable({
messages,
selectedMessage,
partitions,
params,
}: {
messages: Message[];
selectedMessage: Message | undefined;
partitions: number;
params: {
selectedOffset: number | undefined;
selected: string | undefined;
partition: number | undefined;
"filter[offset]": number | undefined;
"filter[timestamp]": string | undefined;
Expand Down Expand Up @@ -91,7 +93,7 @@ export function ConnectedMessagesTable({
startTransition(() => {
updateUrl({
...params,
selectedOffset: message.attributes.offset,
selected: `${message.attributes.partition}:${message.attributes.offset}`,
});
});
}
Expand All @@ -100,7 +102,7 @@ export function ConnectedMessagesTable({
startTransition(() => {
updateUrl({
...params,
selectedOffset: undefined,
selected: undefined,
});
});
}
Expand All @@ -111,10 +113,6 @@ export function ConnectedMessagesTable({
});
}

const selectedMessage = messages.find(
(m) => m.attributes.offset === params.selectedOffset,
);

useEffect(() => {
let interval: ReturnType<typeof setInterval>;
if (refreshInterval) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import { LimitSelector } from "./LimitSelector";
import { MessageDetails, MessageDetailsProps } from "./MessageDetails";
import { NoDataCell } from "./NoDataCell";
import { UnknownValuePreview } from "./UnknownValuePreview";
import { beautifyUnknownValue, isSameMessage } from "./utils";
import { isSameMessage } from "./utils";

const columns = [
// "partition",
Expand Down Expand Up @@ -203,9 +203,8 @@ export function MessagesTable({
case "value":
return row.attributes.value ? (
<UnknownValuePreview
value={beautifyUnknownValue(
row.attributes.value || "",
)}
value={row.attributes.value}
truncateAt={149}
onClick={() => {
setDefaultTab("value");
onSelectMessage(row);
Expand Down
26 changes: 22 additions & 4 deletions ui/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/page.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { getTopicMessages } from "@/api/messages";
import { getTopicMessage, getTopicMessages } from "@/api/messages";
import { getTopic } from "@/api/topics";
import { NoDataEmptyState } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/_components/NoDataEmptyState";
import { ConnectedMessagesTable } from "@/app/[locale]/kafka/[kafkaId]/topics/[topicId]/messages/ConnectedMessagesTable";
Expand All @@ -18,13 +18,30 @@ export default async function MessagesPage({
searchParams: MessagesSearchParams;
}) {
const topic = await getTopic(kafkaId, topicId);
const { limit, partition, filter, selectedOffset, offset, timestamp, epoch } =
parseSearchParams(searchParams);
const {
limit,
partition,
filter,
selectedOffset,
selectedPartition,
offset,
timestamp,
epoch,
} = parseSearchParams(searchParams);

const messages = await getTopicMessages(kafkaId, topicId, {
pageSize: limit,
partition,
filter,
maxValueLength: 150,
});
const selectedMessage =
selectedOffset !== undefined && selectedPartition !== undefined
? await getTopicMessage(kafkaId, topicId, {
offset: selectedOffset,
partition: selectedPartition,
})
: undefined;

switch (true) {
case messages === null || messages.length === 0:
Expand All @@ -33,11 +50,12 @@ export default async function MessagesPage({
return (
<ConnectedMessagesTable
messages={messages}
selectedMessage={selectedMessage}
partitions={topic.attributes.partitions.length}
params={{
limit,
partition,
selectedOffset,
selected: searchParams.selected,
"filter[timestamp]": timestamp,
"filter[epoch]": epoch,
"filter[offset]": offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { stringToInt } from "@/utils/stringToInt";
export type MessagesSearchParams = {
limit: string | undefined;
partition: string | undefined;
selectedOffset: string | undefined;
selected: string | undefined;
"filter[offset]": string | undefined;
"filter[timestamp]": string | undefined;
"filter[epoch]": string | undefined;
Expand All @@ -14,7 +14,7 @@ export function parseSearchParams(searchParams: MessagesSearchParams) {
const offset = stringToInt(searchParams["filter[offset]"]);
const ts = stringToInt(searchParams["filter[timestamp]"]);
const epoch = stringToInt(searchParams["filter[epoch]"]);
const selectedOffset = stringToInt(searchParams.selectedOffset);
const selected = searchParams.selected;
const partition = stringToInt(searchParams.partition);

const timeFilter = epoch ? epoch * 1000 : ts;
Expand All @@ -27,5 +27,18 @@ export function parseSearchParams(searchParams: MessagesSearchParams) {
? { type: "timestamp" as const, value: timestamp }
: undefined;

return { limit, offset, timestamp, epoch, selectedOffset, partition, filter };
const [selectedPartition, selectedOffset] = selected
? decodeURIComponent(selected).split(":").map(stringToInt)
: [undefined, undefined];

return {
limit,
offset,
timestamp,
epoch,
selectedOffset,
selectedPartition,
partition,
filter,
};
}

0 comments on commit 474532f

Please sign in to comment.