Skip to content

Commit

Permalink
repurpose persistent_id to use it as a connector name (#8073)
Browse files Browse the repository at this point in the history
GitOrigin-RevId: cb259294265cae0f74921a90e7c0f147114b466d
  • Loading branch information
zxqfd555-pw authored and Manul from Pathway committed Jan 29, 2025
1 parent 9eb8756 commit 59a646d
Show file tree
Hide file tree
Showing 82 changed files with 1,072 additions and 794 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- methods `pw.io.postgres.write` and `pw.io.postgres.write_snapshot` now accept an additional argument `init_mode`, which allows initializing the table before writing.
- `pw.io.deltalake.read` now supports serialization and deserialization for all Pathway data types.
- New parser `pathway.xpacks.llm.parsers.DoclingParser` supporting parsing of pdfs with tables and images.
- Output connectors now include an optional `name` parameter. If provided, this name will appear in logs and monitoring dashboards.
- Automatic naming for input and output connectors has been enhanced.

### Changed
- **BREAKING**: `pw.io.deltalake.read` now requires explicit specification of primary key fields.
Expand All @@ -18,6 +20,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- **BREAKING**: When using delay in temporal behavior, current time is updated immediately, not in the next batch.
- **BREAKING**: The `Pointer` type is now serialized to Delta Tables as raw bytes.
- `pw.io.kafka.write` now allows to specify `key` and `headers` for JSON and CSV data formats.
- `persistent_id` parameter in connectors has been renamed to `name`. This new `name` parameter allows you to assign names to connectors, which will appear in logs and monitoring dashboards.

### Fixed
- `generate_class` method in `Schema` now correctly renders columns of `UnionType` and `None` types.
Expand Down
16 changes: 8 additions & 8 deletions docs/2.developers/4.user-guide/60.deployment/55.persistence.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,24 +64,24 @@ Both metadata and snapshot storages are configurable with the class [`pw.persist

The config itself is created via constructor, where it accepts `backend` argument of the type `pw.persistence.Backend`, and optionally the snapshot interval given in milliseconds.

This, however, is not the only thing needed for persistence to work, and it moves us to the persistent ids.
This, however, is not the only thing needed for persistence to work, and it moves us to the unique names.

### Persistent IDs
### Unique names

To persist certain input sources, Pathway requires them to have the `persistent_id` parameter. This identifier denotes the factual data source, so it is also expected not to be changed over the reruns of the program.
To persist certain input sources, Pathway uses the `name` parameter. This identifier denotes the factual data source, so it is also expected not to be changed over the reruns of the program.

The motivation behind the persistent id is that the data source can be changed as much as it's needed for the computations as long as it contains the data with the same schema. For example, it is possible that:
The motivation behind the unique names usage is that the data source can be changed as much as it's needed for the computations as long as it contains the data with the same schema. For example, it is possible that:
* a data format in the source is changed. For instance, JSON was used, however, the newer entries are in CSV;
* a path to the data source is changed. For instance, the logs that are parsed by Pathway are now stored on a different volume;
* some fields are renamed in the data source. As an example, it is possible that the field "date" was renamed to "datetime" to have a more accurate name.

All in all, the variety of changes can be huge. Still, by using the same `persistent_id` the engine knows that the data still corresponds to a certain table.
All in all, the variety of changes can be huge. Still, by using the same `name` the engine knows that the data still corresponds to a certain table.

These IDs can be assigned in two different ways. The first one is automatic generation. It will assign the IDs to the sources based on the order in which they are added. For example, if the program first reads a dataset from the CSV and then it reads the stream of events from Kafka, it will have two persistent IDs automatically generated, where the first one will point to the dataset and the second one will point to the stream of events. Please note that this approach is fine in case the code is not going to be updated. Otherwise, the sources can be changed which will incur the generation of different persistent IDs, not corresponding to the old ones.
These IDs can be assigned in two different ways. The first one is automatic generation. It will assign the IDs to the sources based on the order in which they are added. For example, if the program first reads a dataset from the CSV and then it reads the stream of events from Kafka, it will have two unique names automatically generated, where the first one will point to the dataset and the second one will point to the stream of events. Please note that this approach is fine in case the code is not going to be updated. Otherwise, the sources can be changed which will incur the generation of different unique names, not corresponding to the old ones.

The second way can be a bit more difficult, but it allows more flexibility: the persistent IDs can be assigned manually in the input connector. To do so, one needs to specify the string parameter `persistent_id`. For example, the word count program above relied on the automatic `persistent_id` generation. However, it would also be possible to specify it explicitly this way:
The second way can be a bit more difficult, but it allows more flexibility: the unique names can be assigned manually in the input connector. To do so, one needs to specify the string parameter `name`. For example, the word count program above relied on the automatic `name` generation. However, it would also be possible to specify it explicitly this way:
```python
words = pw.io.csv.read("inputs/", schema=InputSchema, persistent_id="words_source")
words = pw.io.csv.read("inputs/", schema=InputSchema, name="words_source")
```

## Requirements and assumptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,18 @@ backend = pw.persistence.Backend.filesystem("./PStorage")
persistence_config = pw.persistence.Config(backend)
```

## Persistent IDs
## Unique Names

The second (and optional) thing we need to do is the persistent ID assignment. The persistent IDs are required for the engine to match the data sources between different runs.
The second (and optional) thing we need to do is unique names assignment. These unique names are required for the engine to match the data sources between different runs.

In principle, the persistent ID assignment can be done automatically by the engine. In this case, it will assign the persistent IDs to the sources in the order of their appearance and construction. However, this is not generally recommended if you need to change your Pathway program and the data source in the future.
In principle, this assignment can be done automatically by the engine. In this case, it will assign unique names to the sources in the order of their appearance and construction. However, this is not generally recommended if you need to change your Pathway program and the data source in the future.

For the sake of completeness, in this tutorial, we will demonstrate the manual persistent ID assignment. The only difference from the non-persistent variant of the input is the parameter `persistent_id` which should be passed to the `pw.io.csv.read` method. So, if we name the data source `words_data_source` the assignment may look as follows:
For the sake of completeness, in this tutorial, we will demonstrate the manual unique names assignment. The only difference from the non-persistent variant of the input is the parameter `name` which should be passed to the `pw.io.csv.read` method. So, if we name the data source `words_data_source` the assignment may look as follows:

```python
pw.io.csv.read(
...,
persistent_id="words_data_source"
name="words_data_source"
)
```

Expand All @@ -269,7 +269,7 @@ if __name__ == "__main__":
"inputs/",
schema=InputSchema,
autocommit_duration_ms=10,
persistent_id="words_input_source", # Changed: now persistent_id is assigned here
name="words_input_source", # Changed: now name is assigned here
)
word_counts = words.groupby(words.word).reduce(words.word, count=pw.reducers.count())
pw.io.jsonlines.write(word_counts, "result.jsonlines")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,17 +172,17 @@ Logs may arrive at varying intervals, with a cron-like job delivering data every

[Persistence](/developers/api-docs/persistence-api/) can help here. The idea is to store the state of previous calculations so that when new files are added later, there's no need to start from scratch.

So you need to do two things. First, you need to "name" input sources by assigning persistent IDs to them. This way, when the program recovers, it can accurately match the operator with the correct data dump.
So you need to do two things. First, you need to "name" input sources by assigning uniqie names to them. This way, when the program recovers, it can accurately match the operator with the correct data dump.

Here, it is optional because the data processing pipeline doesn't change. It will be shown for the fullness of the example that the only difference is the parameter `persistent_id`:
Here, it is optional because the data processing pipeline doesn't change. It will be shown for the fullness of the example that the only difference is the parameter `name`:


```python
access_entries = pw.io.csv.read(
"logs/",
schema=InputSchema,
mode="static",
persistent_id="logs"
name="logs"
)
```

Expand Down Expand Up @@ -225,7 +225,7 @@ if __name__ == "__main__":
"logs/",
mode="static",
schema=InputSchema,
persistent_id="logs", # Change: persistent ID assigned
name="logs", # Change: unique name assigned
)
sessions = table.windowby(
pw.this.access_time,
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/kafka/test_backfilling.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def run_backfilling_program(
topic=kafka_context.input_topic,
format="plaintext",
autocommit_duration_ms=5,
persistent_id="1",
name="1",
),
kwargs={"persistence_config": persistence_config},
)
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/kafka/test_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def test_kafka_recovery(tmp_path: pathlib.Path, kafka_context: KafkaTestContext)
value_columns=["v"],
primary_key=["k"],
autocommit_duration_ms=100,
persistent_id="1",
name="1",
)

pw.io.csv.write(table, tmp_path / "output.csv")
Expand Down Expand Up @@ -438,7 +438,7 @@ def test_kafka_recovery(tmp_path: pathlib.Path, kafka_context: KafkaTestContext)
value_columns=["v"],
primary_key=["k"],
autocommit_duration_ms=100,
persistent_id="1",
name="1",
)

pw.io.csv.write(table, tmp_path / "output_backfilled.csv")
Expand Down
8 changes: 4 additions & 4 deletions integration_tests/s3/test_s3_generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def test_s3_backfilling(snapshot_access, tmp_path: pathlib.Path, s3_path: str):
value_columns=["key", "value"],
mode="static",
autocommit_duration_ms=1000,
persistent_id="1",
name="1",
)
pw.io.csv.write(table, str(tmp_path / "output.csv"))
pw.run(
Expand All @@ -50,7 +50,7 @@ def test_s3_backfilling(snapshot_access, tmp_path: pathlib.Path, s3_path: str):
value_columns=["key", "value"],
mode="static",
autocommit_duration_ms=1000,
persistent_id="1",
name="1",
)
pw.io.csv.write(table, str(tmp_path / "output_backfilled.csv"))
pw.run(
Expand All @@ -74,7 +74,7 @@ def test_s3_backfilling(snapshot_access, tmp_path: pathlib.Path, s3_path: str):
value_columns=["key", "value"],
mode="static",
autocommit_duration_ms=1000,
persistent_id="1",
name="1",
)
pw.io.csv.write(table, str(output_path))
pw.run(
Expand Down Expand Up @@ -120,7 +120,7 @@ class InputSchema(pw.Schema):
format="json",
schema=InputSchema,
mode="static",
persistent_id="1",
name="1",
)
pw.io.jsonlines.write(table, str(output_path))
pw.run(
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/wordcount/pw_wordcount.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class InputSchema(pw.Schema):
schema=InputSchema,
format="json",
mode=args.mode,
persistent_id="1",
name="1",
autocommit_duration_ms=100,
)
result = words.groupby(words.word).reduce(
Expand Down
25 changes: 12 additions & 13 deletions python/pathway/debug/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,17 +488,17 @@ def run(self):


class StreamGenerator:
_persistent_id = itertools.count()
_unique_name = itertools.count()
events: dict[tuple[str, int], list[api.SnapshotEvent]] = {}

def _get_next_persistent_id(self) -> str:
return str(f"_stream_generator_{next(self._persistent_id)}")
def _get_next_unique_name(self) -> str:
return str(f"_stream_generator_{next(self._unique_name)}")

def _advance_time_for_all_workers(
self, persistent_id: str, workers: Iterable[int], timestamp: int
self, unique_name: str, workers: Iterable[int], timestamp: int
):
for worker in workers:
self.events[(persistent_id, worker)].append(
self.events[(unique_name, worker)].append(
api.SnapshotEvent.advance_time(timestamp)
)

Expand All @@ -519,10 +519,10 @@ def _table_from_dict(
batches: dictionary with specified batches to be put in the table
schema: schema of the table
"""
persistent_id = self._get_next_persistent_id()
unique_name = self._get_next_unique_name()
workers = {worker for batch in batches.values() for worker in batch}
for worker in workers:
self.events[(persistent_id, worker)] = []
self.events[(unique_name, worker)] = []

timestamps = set(batches.keys())

Expand All @@ -541,24 +541,23 @@ def _table_from_dict(
batches = {2 * timestamp: batches[timestamp] for timestamp in batches}

for timestamp in sorted(batches):
self._advance_time_for_all_workers(persistent_id, workers, timestamp)
self._advance_time_for_all_workers(unique_name, workers, timestamp)
batch = batches[timestamp]
for worker, changes in batch.items():
for diff, key, values in changes:
if diff == 1:
event = api.SnapshotEvent.insert(key, values)
self.events[(persistent_id, worker)] += [event] * diff
self.events[(unique_name, worker)] += [event] * diff
elif diff == -1:
event = api.SnapshotEvent.delete(key, values)
self.events[(persistent_id, worker)] += [event] * (-diff)
self.events[(unique_name, worker)] += [event] * (-diff)
else:
raise ValueError("only diffs of 1 and -1 are supported")

return read(
_EmptyConnectorSubject(),
persistent_id=persistent_id,
_EmptyConnectorSubject(datasource_name="debug.stream-generator"),
name=unique_name,
schema=schema,
name="debug.stream-generator",
)

def table_from_list_of_batches_by_workers(
Expand Down
13 changes: 5 additions & 8 deletions python/pathway/demo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def generate_custom_stream(
nb_rows: int | None = None,
autocommit_duration_ms: int = 1000,
input_rate: float = 1.0,
persistent_id: str | None = None,
name: str | None = None,
) -> pw.Table:
"""Generates a data stream.
Expand Down Expand Up @@ -104,12 +104,11 @@ def _get_row(i):
time.sleep(1.0 / input_rate)

table = pw.io.python.read(
FileStreamSubject(),
FileStreamSubject(datasource_name="demo.custom-stream"),
schema=schema,
format="json",
autocommit_duration_ms=autocommit_duration_ms,
persistent_id=persistent_id,
name="demo.custom-stream",
name=name,
)

return table
Expand Down Expand Up @@ -247,11 +246,10 @@ def run(self):
time.sleep(1.0 / input_rate)

return pw.io.python.read(
FileStreamSubject(),
FileStreamSubject(datasource_name="demo.replay-csv"),
schema=schema.with_types(**{name: str for name in schema.column_names()}),
autocommit_duration_ms=autocommit_ms,
format="json",
name="demo.replay-csv",
).cast_to_types(**schema.typehints())


Expand Down Expand Up @@ -331,9 +329,8 @@ def run(self):
self.next_json(values)

return pw.io.python.read(
FileStreamSubject(),
FileStreamSubject(datasource_name="demo.replay-csv-with-time"),
schema=schema.with_types(**{name: str for name in schema.column_names()}),
autocommit_duration_ms=autocommit_ms,
format="json",
name="demo.replay-csv-with-time",
).cast_to_types(**schema.typehints())
9 changes: 6 additions & 3 deletions python/pathway/engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class ConnectorProperties:
commit_duration_ms: int | None = None
unsafe_trusted_ids: bool | None = False
column_properties: list[ColumnProperties] = []
unique_name: str | None = None

class Column:
"""A Column holds data and conceptually is a Dict[Universe elems, dt]
Expand Down Expand Up @@ -596,7 +597,7 @@ class Scope:
grouping_columns: list[ColumnPath],
reduced_columns: list[ColumnPath],
combine: Callable[[Any, Any], Any],
persistent_id: str | None,
unique_name: str | None,
table_properties: TableProperties,
) -> Table: ...
def ix_table(
Expand Down Expand Up @@ -671,13 +672,15 @@ class Scope:
on_change: Callable,
on_time_end: Callable,
on_end: Callable,
unique_name: str | None = None,
): ...
def output_table(
self,
table: Table,
column_paths: Iterable[ColumnPath],
data_sink: DataStorage,
data_format: DataFormat,
unique_name: str | None = None,
): ...
def export_table(
self, table: Table, column_paths: Iterable[ColumnPath]
Expand Down Expand Up @@ -752,7 +755,7 @@ class DataStorage:
elasticsearch_params: ElasticSearchParams | None
parallel_readers: int | None
python_subject: PythonSubject | None
persistent_id: str | None
unique_name: str | None
max_batch_size: int | None
object_pattern: str
mock_events: dict[tuple[str, int], list[SnapshotEvent]] | None
Expand Down Expand Up @@ -824,7 +827,7 @@ class SnapshotEvent:
FINISHED: SnapshotEvent

class LocalBinarySnapshotWriter:
def __init__(self, path: str, persistent_id: str, worker_id: int): ...
def __init__(self, path: str, unique_name: str, worker_id: int): ...
def write(self, events: list[SnapshotEvent]): ...

class TelemetryConfig:
Expand Down
2 changes: 1 addition & 1 deletion python/pathway/internals/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ class DeduplicateContext(Context):
instance: tuple[ColumnWithExpression, ...]
acceptor: Callable[[Any, Any], bool]
orig_id_column: IdColumn
persistent_id: str | None
unique_name: str | None

def column_dependencies_internal(self) -> Iterable[Column]:
return (self.value,) + self.instance
Expand Down
2 changes: 2 additions & 0 deletions python/pathway/internals/datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class GenericDataSink(DataSink):
datastorage: api.DataStorage
dataformat: api.DataFormat
datasink_name: str
unique_name: str | None

@property
def name(self) -> str:
Expand All @@ -33,6 +34,7 @@ class CallbackDataSink(DataSink):
on_end: Callable[[], None]
skip_persisted_batch: bool
skip_errors: bool
unique_name: str | None


@dataclass(frozen=True)
Expand Down
2 changes: 2 additions & 0 deletions python/pathway/internals/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
class DataSourceOptions:
commit_duration_ms: int | None = None
unsafe_trusted_ids: bool | None = False
unique_name: str | None = None


@dataclass(frozen=True, kw_only=True)
Expand All @@ -39,6 +40,7 @@ def connector_properties(self) -> api.ConnectorProperties:
commit_duration_ms=self.data_source_options.commit_duration_ms,
unsafe_trusted_ids=self.data_source_options.unsafe_trusted_ids,
column_properties=columns,
unique_name=self.data_source_options.unique_name,
)

def get_effective_schema(self) -> type[Schema]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ def is_different_with_state(
instance_paths,
reduced_columns_paths,
is_different_with_state,
self.context.persistent_id,
self.context.unique_name,
properties,
)

Expand Down
Loading

0 comments on commit 59a646d

Please sign in to comment.