Skip to content

Commit

Permalink
chore: add rust examples to workspace (#4609)
Browse files Browse the repository at this point in the history
* chore: add rust examples to workspace

* add missing licenses
  • Loading branch information
thibault-martinez authored Jan 6, 2025
1 parent b362fbb commit e8b9da4
Show file tree
Hide file tree
Showing 12 changed files with 2,348 additions and 989 deletions.
3,102 changes: 2,216 additions & 886 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ members = [
"crates/typed-store-error",
"crates/typed-store-workspace-hack",
"docs/examples/rust",
"examples/custom-indexer/rust",
"examples/tic-tac-toe/cli",
"iota-execution",
"iota-execution/cut",
"iota-execution/latest/iota-adapter",
Expand Down
3 changes: 1 addition & 2 deletions examples/custom-indexer/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "custom-indexer"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

[dependencies]
# external dependencies
Expand All @@ -21,5 +22,3 @@ path = "local_reader.rs"
[[bin]]
name = "remote_reader"
path = "remote_reader.rs"

[workspace]
44 changes: 27 additions & 17 deletions examples/custom-indexer/rust/local_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,56 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use tokio::sync::oneshot;
use std::{env, path::PathBuf};

use anyhow::Result;
use async_trait::async_trait;
use iota_types::full_checkpoint_content::CheckpointData;
use iota_data_ingestion_core as sdic;
use sdic::{Worker, WorkerPool, ReaderOptions};
use sdic::{DataIngestionMetrics, FileProgressStore, IndexerExecutor};
use iota_types::full_checkpoint_content::CheckpointData;
use prometheus::Registry;
use std::path::PathBuf;
use std::env;
use sdic::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, Worker, WorkerPool,
};
use tokio::sync::oneshot;

struct CustomWorker;

#[async_trait]
impl Worker for CustomWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
// custom processing logic
println!("Processing Local checkpoint: {}", checkpoint.checkpoint_summary.to_string());
println!(
"Processing Local checkpoint: {}",
checkpoint.checkpoint_summary.to_string()
);
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let concurrency = 5;
let (exit_sender, exit_receiver) = oneshot::channel();
let (_, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());
let backfill_progress_file_path =
env::var("BACKFILL_PROGRESS_FILE_PATH").unwrap_or("/tmp/local_reader_progress".to_string());
let progress_store = FileProgressStore::new(PathBuf::from(backfill_progress_file_path));
let mut executor = IndexerExecutor::new(progress_store, 1 /* number of workflow types */, metrics);
let mut executor = IndexerExecutor::new(
progress_store,
1, // number of workflow types
metrics,
);
let worker_pool = WorkerPool::new(CustomWorker, "local_reader".to_string(), concurrency);

executor.register(worker_pool).await?;
executor.run(
PathBuf::from("./chk".to_string()), // path to a local directory
None,
vec![], // optional remote store access options
ReaderOptions::default(), /* remote_read_batch_size */
exit_receiver,
).await?;
executor
.run(
PathBuf::from("./chk".to_string()), // path to a local directory
None,
vec![], // optional remote store access options
ReaderOptions::default(), // remote_read_batch_size
exit_receiver,
)
.await?;
Ok(())
}
20 changes: 12 additions & 8 deletions examples/custom-indexer/rust/remote_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

use anyhow::Result;
use async_trait::async_trait;
use iota_types::full_checkpoint_content::CheckpointData;
use iota_data_ingestion_core::{Worker, setup_single_workflow};
use iota_types::full_checkpoint_content::CheckpointData;

struct CustomWorker;

Expand All @@ -14,20 +14,24 @@ impl Worker for CustomWorker {
async fn process_checkpoint(&self, checkpoint: CheckpointData) -> Result<()> {
// custom processing logic
// print out the checkpoint number
println!("Processing checkpoint: {}", checkpoint.checkpoint_summary.to_string());
println!(
"Processing checkpoint: {}",
checkpoint.checkpoint_summary.to_string()
);
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
let (executor, term_sender) = setup_single_workflow(
let (executor, _) = setup_single_workflow(
CustomWorker,
"https://checkpoints.testnet.iota.io".to_string(),
0, /* initial checkpoint number */
5, /* concurrency */
None, /* extra reader options */
).await?;
0, // initial checkpoint number
5, // concurrency
None, // extra reader options
)
.await?;
executor.await?;
Ok(())
}
1 change: 1 addition & 0 deletions examples/tic-tac-toe/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "tic-tac-toe"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"

[dependencies]
# external dependencies
Expand Down
5 changes: 3 additions & 2 deletions examples/tic-tac-toe/cli/src/board.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// Modifications Copyright (c) 2024 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

use serde::Deserialize;
use std::fmt;
use iota_types::base_types::{ObjectID, IotaAddress};

use iota_types::base_types::{IotaAddress, ObjectID};
use serde::Deserialize;

#[derive(Deserialize)]
pub(crate) struct Board {
Expand Down
Loading

0 comments on commit e8b9da4

Please sign in to comment.