Skip to content

Commit

Permalink
Move code snippet to be above the overwiew of the code snippet
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 committed Nov 4, 2024
1 parent d398203 commit 6de1340
Showing 1 changed file with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ These methods support managing partition CRUD operations and getting the next av

The following code snippet shows a basic source coordination workflow, using a hypothetical database in which each partition represents an individual database file. See the [full code](https://github.com/graytaylor0/data-prepper/blob/6e38dead8e9beca089381519654f329b82524b9d/data-prepper-plugins/sample-source-coordination-source/src/main/java/DatabaseWorker.java#L40) on GitHub.

The key components and workflow in the code for implementing source coordination in Data Prepper are as follows:

1. Upon starting Data Prepper, a leader partition is established. See the [code reference](https://github.com/graytaylor0/data-prepper/blob/6e38dead8e9beca089381519654f329b82524b9d/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSource.java#L41)). The single leader partition is assigned to the Data Prepper node that successfully calls `acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)`, assigning it the task of identifying new database files.
2. When a Data Prepper node owns the leader partition, it queries the hypothetical database and creates new partitions in the source coordination store, enabling all nodes running this source to access these database file partitions.
3. A database file partition is acquired for processing. In cases where no partitions need processing, an empty `Optional` is returned.
4. The database file undergoes processing, with its records written into the Data Prepper buffer as individual `Events`. Once all records have been written to the buffer, the source coordination store marks the database file partition as `COMPLETED`, ensuring that it is not processed again.

```java
public void run() {
Expand Down Expand Up @@ -113,6 +106,13 @@ public void run() {
}
```

The key components and workflow in the code for implementing source coordination in Data Prepper are as follows:

1. Upon starting Data Prepper, a leader partition is established. See the [code reference](https://github.com/graytaylor0/data-prepper/blob/6e38dead8e9beca089381519654f329b82524b9d/data-prepper-plugins/sample-source-coordination-source/src/main/java/SampleSource.java#L41)). The single leader partition is assigned to the Data Prepper node that successfully calls `acquireAvailablePartition(LeaderPartition.PARTITION_TYPE)`, assigning it the task of identifying new database files.
2. When a Data Prepper node owns the leader partition, it queries the hypothetical database and creates new partitions in the source coordination store, enabling all nodes running this source to access these database file partitions.
3. A database file partition is acquired for processing. In cases where no partitions need processing, an empty `Optional` is returned.
4. The database file undergoes processing, with its records written into the Data Prepper buffer as individual `Events`. Once all records have been written to the buffer, the source coordination store marks the database file partition as `COMPLETED`, ensuring that it is not processed again.

### Running and testing Data Prepper using source coordination

Before creating a new plugin, you must set up and run Data Prepper locally. The following steps guide you through configuring Data Prepper for streaming documents from MongoDB to OpenSearch using source coordination. While this example uses a single Data Prepper instance, the source coordination allows for scalability when running multiple instances with identical pipeline configurations and shared source coordination store settings defined in `data-prepper-config.yaml`.
Expand Down

0 comments on commit 6de1340

Please sign in to comment.