Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for -o flag like in kafkacat #70

Open
derekjn opened this issue Oct 4, 2017 · 3 comments
Open

support for -o flag like in kafkacat #70

derekjn opened this issue Oct 4, 2017 · 3 comments

Comments

@derekjn
Copy link
Contributor

derekjn commented Oct 4, 2017

(from @timnon, moved from pipelinedb/pipelinedb#1872)

Is it possible to start the consumption using the last n messages? Comparable to -o -1000 for the last 1000 messages in kafkacat. Couldnt find anything start_offset:=-1000 doesnt work.

A normal use case is to pull some messages from the queue to avoid a cold start without any history. However, starting at the beginning of the queue takes quite some time depending on how much is saved, so limiting this process to the e.g. last 1000000 messages would be nice.

I also experienced that offsets are not reseted when dropping the complete extension, it is then still necessary to reset them by hand using the offsets-table.

@derekjn
Copy link
Contributor Author

derekjn commented Oct 4, 2017

@timnon, this does seem useful but the main complication I see is that this would only really work on a per-partition level. Special offsets (such as -1) work across partitions because they have a relative meaning.

I believe what kafkacat does when specifying a relative offset without a specific partition is simply consume from the relative offset across all partitions. So if you had a topic with 4 partitions and did something like:

kafkacat -b localhost:9092 -C -t topic -o -100

You'd potentially get 400 messages (-100 from each partition). Is this the behavior you have in mind here?

@timnon
Copy link

timnon commented Oct 5, 2017

In my use-case, I analyse some click-stream history (for a recommender system) and do a lot of testing. However, every time the database scheme is changed or the database is simply reseted, the collected history from the kafka stream is lost. There are probably other ways to re-ingest the data, but the easiest is to simply repull some saved history to avoid a cold-start without any history. Starting at the beginning of the topic (even if only the last few days are saved by restricting the pipelindb-views) takes quite a while, so it would be nice to set an upper bound on the messages using some heuristic (e.g. every day has roughly 1'000'000 messages , so lets go bach 2'000'000 messages for two days, maybe plus another 1'000'000 to be sure that two days are captured).

In the current testing setting, there is so far only one partition, so no problems with multiple messages. Might be a problem for a generic setting, but not here.

@timnon
Copy link

timnon commented Oct 12, 2017

Just in case anybody is facing a similar issue and wants to cover this topic completely in psql. The following scripts starts the stream, waits five seconds, and then saves the current offsets in a tmp-table. Afterwards the stream is restarted with a modified offset. If the five seconds is not long enough, -2 will be taken as the start offset. The five seconds is clearly not a good way to handle this, but works for testing.

DROP STREAM IF EXISTS :stream CASCADE;
CREATE STREAM :stream ( event JSON );
SELECT pipeline_kafka.consume_begin(:'kafka_topic',:'stream',format:='json');


SELECT pg_sleep(5);
SELECT pipeline_kafka.consume_end(:'kafka_topic',:'stream');
DROP TABLE IF EXISTS tmp_:stream;
CREATE TABLE tmp_:stream AS 
(
SELECT num FROM
(
  SELECT 
  CASE WHEN CAST("offset" AS INT)-:start_n_messages >= 1 THEN CAST("offset" AS INT)-:start_n_messages
  ELSE -2 END AS num
  FROM pipeline_kafka.offsets
  WHERE consumer_id IN ( SELECT id FROM pipeline_kafka.consumers WHERE topic=:'kafka_topic')
) AS A
UNION
SELECT -2 AS num
);

DROP STREAM IF EXISTS :stream CASCADE;
CREATE STREAM :stream ( event JSON );
SELECT pipeline_kafka.consume_begin(:'kafka_topic',:'stream',format:='json',start_offset:=num) FROM tmp_:stream ORDER BY num DESC LIMIT 1;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants