Skip to content

Commit

Permalink
add a possibility to start reading from a certain timestamp from kafk…
Browse files Browse the repository at this point in the history
…a (#7615)

GitOrigin-RevId: 0e12505ec1f4ab5cc2948fb841769442a2495f04
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Nov 7, 2024
1 parent 52b8c49 commit 384c6f4
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 20 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]

### Added
- `pw.io.kafka.read` now supports reading entries starting from a specified timestamp.

## [0.15.3] - 2024-11-07

### Added
Expand Down
117 changes: 117 additions & 0 deletions integration_tests/kafka/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import json
import pathlib
import threading
import time

import pytest

Expand Down Expand Up @@ -419,3 +421,118 @@ def test_kafka_recovery(tmp_path: pathlib.Path, kafka_context: KafkaTestContext)
),
},
)


@pytest.mark.flaky(reruns=3)
def test_start_from_timestamp_ms_seek_to_middle(
tmp_path: pathlib.Path, kafka_context: KafkaTestContext
):
kafka_context.fill(["foo", "bar"])
time.sleep(10)
start_from_timestamp_ms = (int(time.time()) - 5) * 1000
kafka_context.fill(["qqq", "www"])

table = pw.io.kafka.read(
rdkafka_settings=kafka_context.default_rdkafka_settings(),
topic=kafka_context.input_topic,
format="plaintext",
autocommit_duration_ms=100,
start_from_timestamp_ms=start_from_timestamp_ms,
)

pw.io.csv.write(table, str(tmp_path / "output.csv"))

wait_result_with_checker(
expect_csv_checker(
"""
data
qqq
www
""",
tmp_path / "output.csv",
usecols=["data"],
index_col=["data"],
),
10,
)


@pytest.mark.flaky(reruns=3)
def test_start_from_timestamp_ms_seek_to_beginning(
tmp_path: pathlib.Path, kafka_context: KafkaTestContext
):
kafka_context.fill(["foo", "bar"])
start_from_timestamp_ms = (int(time.time()) - 3600) * 1000

table = pw.io.kafka.read(
rdkafka_settings=kafka_context.default_rdkafka_settings(),
topic=kafka_context.input_topic,
format="plaintext",
autocommit_duration_ms=100,
start_from_timestamp_ms=start_from_timestamp_ms,
)

pw.io.csv.write(table, str(tmp_path / "output.csv"))

wait_result_with_checker(
expect_csv_checker(
"""
data
foo
bar
""",
tmp_path / "output.csv",
usecols=["data"],
index_col=["data"],
),
10,
)


@pytest.mark.flaky(reruns=3)
def test_start_from_timestamp_ms_seek_to_end(
tmp_path: pathlib.Path, kafka_context: KafkaTestContext
):
kafka_context.fill(["foo", "bar"])
time.sleep(10)
start_from_timestamp_ms = int(time.time() - 5) * 1000

table = pw.io.kafka.read(
rdkafka_settings=kafka_context.default_rdkafka_settings(),
topic=kafka_context.input_topic,
format="plaintext",
autocommit_duration_ms=100,
start_from_timestamp_ms=start_from_timestamp_ms,
)

def stream_inputs():
for i in range(10):
kafka_context.fill([str(i)])
time.sleep(1)

t = threading.Thread(target=stream_inputs, daemon=True)
t.run()

pw.io.csv.write(table, str(tmp_path / "output.csv"))

wait_result_with_checker(
expect_csv_checker(
"""
data
0
1
2
3
4
5
6
7
8
9
""",
tmp_path / "output.csv",
usecols=["data"],
index_col=["data"],
),
30,
)
4 changes: 4 additions & 0 deletions python/pathway/io/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def read(
autocommit_duration_ms: int | None = 1500,
json_field_paths: dict[str, str] | None = None,
autogenerate_key: bool = False,
start_from_timestamp_ms: int | None = None,
parallel_readers: int | None = None,
persistent_id: str | None = None,
value_columns: list[str] | None = None,
Expand Down Expand Up @@ -78,6 +79,8 @@ def read(
autogenerate_key: If ``True``, Pathway automatically generates unique primary key
for the entries read. Otherwise it first tries to use the key from the message.
This parameter is used only if the ``format`` is "raw" or "plaintext".
start_from_timestamp_ms: If defined, the read starts from entries with the given
timestamp in the past, specified in milliseconds.
parallel_readers: number of copies of the reader to work in parallel. In case
the number is not specified, min{pathway_threads, total number of partitions}
will be taken. This number also can't be greater than the number of Pathway
Expand Down Expand Up @@ -257,6 +260,7 @@ def read(
topic=topic,
parallel_readers=parallel_readers,
persistent_id=persistent_id,
start_from_timestamp_ms=start_from_timestamp_ms,
mode=api.ConnectorMode.STREAMING,
)
schema, data_format = construct_schema_and_data_format(
Expand Down
44 changes: 26 additions & 18 deletions src/connectors/data_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,7 @@ pub struct KafkaReader {
consumer: BaseConsumer<DefaultConsumerContext>,
persistent_id: Option<PersistentId>,
topic: Arc<String>,
positions_for_seek: HashMap<i32, i64>,
positions_for_seek: HashMap<i32, KafkaOffset>,
}

impl Reader for KafkaReader {
Expand All @@ -774,25 +774,31 @@ impl Reader for KafkaReader {
let message_key = kafka_message.key().map(<[u8]>::to_vec);
let message_payload = kafka_message.payload().map(<[u8]>::to_vec);

if let Some(last_read_offset) = self.positions_for_seek.get(&kafka_message.partition())
if let Some(lazy_seek_offset) = self.positions_for_seek.get(&kafka_message.partition())
{
if last_read_offset >= &kafka_message.offset() {
if let Err(e) = self.consumer.seek(
info!(
"Performing Kafka topic seek for ({}, {}) to {:?}",
kafka_message.topic(),
kafka_message.partition(),
lazy_seek_offset
);
// If there is a need for seek, perform it and remove the seek requirement.
if let Err(e) = self.consumer.seek(
kafka_message.topic(),
kafka_message.partition(),
*lazy_seek_offset,
None,
) {
error!(
"Failed to seek topic and partition ({}, {}) to offset {:?}: {e}",
kafka_message.topic(),
kafka_message.partition(),
KafkaOffset::Offset(*last_read_offset + 1),
None,
) {
error!(
"Failed to seek topic and partition ({}, {}) to offset {}: {e}",
kafka_message.topic(),
kafka_message.partition(),
*last_read_offset + 1
);
}
continue;
lazy_seek_offset,
);
} else {
self.positions_for_seek.remove(&kafka_message.partition());
}
self.positions_for_seek.remove(&kafka_message.partition());
continue;
}

let offset = {
Expand Down Expand Up @@ -830,7 +836,8 @@ impl Reader for KafkaReader {
to be done on behalf of rdkafka client, taking account of other
members in its' consumer group.
*/
self.positions_for_seek.insert(*partition, *position);
self.positions_for_seek
.insert(*partition, KafkaOffset::Offset(*position + 1));
} else {
error!("Unexpected offset in Kafka frontier: ({offset_key:?}, {offset_value:?})");
}
Expand Down Expand Up @@ -861,12 +868,13 @@ impl KafkaReader {
consumer: BaseConsumer<DefaultConsumerContext>,
topic: String,
persistent_id: Option<PersistentId>,
positions_for_seek: HashMap<i32, KafkaOffset>,
) -> KafkaReader {
KafkaReader {
consumer,
persistent_id,
topic: Arc::new(topic),
positions_for_seek: HashMap::new(),
positions_for_seek,
}
}
}
Expand Down
Loading

0 comments on commit 384c6f4

Please sign in to comment.