Skip to content

Commit

Permalink
tests: topic produce test
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jul 6, 2024
1 parent 5089ed8 commit b74ae3e
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 90 deletions.
7 changes: 5 additions & 2 deletions crates/fluvio-benchmark/src/producer_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::time::Instant;
use async_channel::Sender;
use anyhow::Result;

use fluvio::{TopicProducer, RecordKey, Fluvio, TopicProducerConfigBuilder};
use fluvio::{
spu::SpuSocketPool,
Fluvio, RecordKey, TopicProducer, TopicProducerConfigBuilder,
};

use crate::{
benchmark_config::{
Expand All @@ -15,7 +18,7 @@ use crate::{
};

pub struct ProducerWorker {
fluvio_producer: TopicProducer,
fluvio_producer: TopicProducer<SpuSocketPool>,
records_to_send: Option<Vec<BenchmarkRecord>>,
config: BenchmarkConfig,
producer_id: u64,
Expand Down
15 changes: 8 additions & 7 deletions crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod cmd {
Compression, Fluvio, FluvioError, TopicProducer, TopicProducerConfigBuilder, RecordKey,
ProduceOutput, DeliverySemantic, SmartModuleContextData, Isolation, SmartModuleInvocation,
};
use fluvio::spu::SpuSocketPool;
use fluvio_extension_common::Terminal;
use fluvio_types::print_cli_ok;

Expand Down Expand Up @@ -349,7 +350,7 @@ mod cmd {

impl ProduceOpt {
#[cfg(feature = "producer-file-io")]
async fn process_raw_file(&self, producer: &TopicProducer) -> Result<()> {
async fn process_raw_file(&self, producer: &TopicProducer<SpuSocketPool>) -> Result<()> {
let key = self.key.clone().map(Bytes::from);
// Read all input and send as one record
let buffer = match &self.file {
Expand Down Expand Up @@ -404,7 +405,7 @@ mod cmd {

async fn produce_lines(
&self,
producer: Arc<TopicProducer>,
producer: Arc<TopicProducer<SpuSocketPool>>,
#[cfg(feature = "stats")] maybe_stats_bar: Option<&ProgressBar>,
) -> Result<()> {
#[cfg(feature = "stats")]
Expand Down Expand Up @@ -476,7 +477,7 @@ mod cmd {
Ok(())
}

async fn producer_stdin(&self, producer: &Arc<TopicProducer>) -> Result<()> {
async fn producer_stdin(&self, producer: &Arc<TopicProducer<SpuSocketPool>>) -> Result<()> {
let mut lines = BufReader::new(std::io::stdin()).lines();
if self.interactive_mode() {
eprint!("> ");
Expand Down Expand Up @@ -521,7 +522,7 @@ mod cmd {

async fn produce_line(
&self,
producer: &Arc<TopicProducer>,
producer: &Arc<TopicProducer<SpuSocketPool>>,
line: &str,
) -> Result<Option<ProduceOutput>> {
let produce_output = if let Some(separator) = &self.key_separator {
Expand All @@ -538,7 +539,7 @@ mod cmd {

async fn produce_key_value(
&self,
producer: Arc<TopicProducer>,
producer: Arc<TopicProducer<SpuSocketPool>>,
line: &str,
separator: &str,
) -> Result<Option<ProduceOutput>> {
Expand Down Expand Up @@ -587,7 +588,7 @@ mod cmd {
fn update_stats_bar(
&self,
maybe_stats_bar: Option<&ProgressBar>,
producer: &Arc<TopicProducer>,
producer: &Arc<TopicProducer<SpuSocketPool>>,
line: &str,
) {
if self.is_print_live_stats() {
Expand Down Expand Up @@ -657,7 +658,7 @@ mod cmd {
#[cfg(feature = "stats")]
/// Initialize Ctrl-C event handler to print session summary when we are collecting stats
async fn init_ctrlc(
producer: Arc<TopicProducer>,
producer: Arc<TopicProducer<SpuSocketPool>>,
maybe_stats_bar: Option<ProgressBar>,
force_print_summary: bool,
) -> Result<()> {
Expand Down
7 changes: 4 additions & 3 deletions crates/fluvio-cli/src/client/produce/stats/report.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use fluvio::stats::{
};
use indicatif::ProgressBar;
use fluvio::TopicProducer;
use fluvio::spu::SpuPool;
use crate::Result;
use crate::error::CliError;

Expand All @@ -16,7 +17,7 @@ use crate::error::CliError;
/// Column names include units
pub async fn start_csv_report(
stats_path: &PathBuf,
producer: &Arc<TopicProducer>,
producer: &Arc<TopicProducer<SpuSocketPool>>,
) -> Result<BufWriter<File>, crate::error::CliError> {
let mut stats_file = BufWriter::new(
OpenOptions::new()
Expand All @@ -40,7 +41,7 @@ pub async fn start_csv_report(
/// Writes a row of CSV data to stats report file
/// Header contents dependent on what stats producer configured to collect
pub async fn write_csv_dataframe(
producer: &Arc<TopicProducer>,
producer: &Arc<TopicProducer<SpuSocketPool>>,
last_update_check: i64,
maybe_stats_file: Option<&mut BufWriter<File>>,
) -> Result<i64, CliError> {
Expand Down Expand Up @@ -197,7 +198,7 @@ pub async fn format_summary_stats(client_stats: ClientStatsDataFrame) -> String

/// Report the producer summary to stdout with the `ProgressBar`
pub async fn producer_summary(
producer: &Arc<TopicProducer>,
producer: &Arc<TopicProducer<SpuSocketPool>>,
maybe_stats_bar: Option<&ProgressBar>,
force_print_stats: bool,
) {
Expand Down
7 changes: 4 additions & 3 deletions crates/fluvio-test-util/test_runner/test_driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//use fluvio::consumer::{PartitionSelectionStrategy, ConsumerConfig};

use fluvio::dataplane::link::ErrorCode;
use fluvio::spu::SpuSocketPool;
use tracing::debug;
use anyhow::Result;

Expand Down Expand Up @@ -78,7 +79,7 @@ impl TestDriver {
&self,
topic: &str,
config: TopicProducerConfig,
) -> TopicProducer {
) -> TopicProducer<SpuSocketPool> {
debug!(topic, "creating producer");
let fluvio_client = self.create_client().await.expect("cant' create client");
match fluvio_client
Expand All @@ -96,15 +97,15 @@ impl TestDriver {
}

// Wrapper to getting a producer. We keep track of the number of producers we create
pub async fn create_producer(&self, topic: &str) -> TopicProducer {
pub async fn create_producer(&self, topic: &str) -> TopicProducer<SpuSocketPool> {
self.create_producer_with_config(topic, Default::default())
.await
}

// Wrapper to producer send. We measure the latency and accumulation of message payloads sent.
pub async fn send_count(
&self,
p: &TopicProducer,
p: &TopicProducer<SpuSocketPool>,
key: RecordKey,
message: Vec<u8>,
) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-test/src/tests/data_generator/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use fluvio_test_util::test_runner::test_driver::TestDriver;
use fluvio_test_util::test_meta::environment::EnvDetail;
use std::time::SystemTime;
use tracing::debug;
use fluvio::{Offset, TopicProducer, TopicProducerConfigBuilder, RecordKey};
use fluvio::{spu::SpuSocketPool, Offset, RecordKey, TopicProducer, TopicProducerConfigBuilder};
use futures::StreamExt;
use std::time::Duration;

Expand Down Expand Up @@ -142,7 +142,7 @@ async fn send_record(
producer_id: u32,
records_sent: u32,
test_driver: &TestDriver,
producer: &TopicProducer,
producer: &TopicProducer<SpuSocketPool>,
) {
let record = generate_record(option.clone(), producer_id, records_sent);
test_driver
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-test/src/tests/producer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::any::Any;
use clap::Parser;

use fluvio::spu::SpuSocketPool;
use fluvio::{RecordKey, TopicProducer};
use fluvio_test_derive::fluvio_test;
use fluvio_test_util::test_meta::environment::EnvironmentSetup;
Expand Down Expand Up @@ -74,7 +75,7 @@ impl TestOption for ProducerTestOption {

// The total num_records should be divided up and assigned per producer
async fn producer_work(
producer: TopicProducer,
producer: TopicProducer<SpuSocketPool>,
producer_id: u32,
workload_size: u32,
record_tag_start: u32,
Expand Down
10 changes: 5 additions & 5 deletions crates/fluvio/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use fluvio_protocol::record::Batch;
use crate::FluvioError;
use crate::metrics::ClientMetrics;
use crate::offset::{Offset, fetch_offsets};
use crate::spu::{SpuDirectory, SpuPool};
use crate::spu::{SpuDirectory, SpuSocketPool};

pub use config::{ConsumerConfig, ConsumerConfigBuilder};
pub use config::{ConsumerConfigExt, ConsumerConfigExtBuilder, OffsetManagementStrategy};
Expand All @@ -52,7 +52,7 @@ const STREAM_TO_SERVER_CHANNEL_SIZE: usize = 100;
/// [`Offset`]: struct.Offset.html
/// [`partition_consumer`]: struct.Fluvio.html#method.partition_consumer
/// [`Fluvio`]: struct.Fluvio.html
pub struct PartitionConsumer<P = SpuPool> {
pub struct PartitionConsumer<P = SpuSocketPool> {
topic: String,
partition: PartitionId,
pool: Arc<P>,
Expand Down Expand Up @@ -685,7 +685,7 @@ pub enum PartitionSelectionStrategy {
}

impl PartitionSelectionStrategy {
async fn selection(&self, spu_pool: Arc<SpuPool>) -> Result<Vec<(String, PartitionId)>> {
async fn selection(&self, spu_pool: Arc<SpuSocketPool>) -> Result<Vec<(String, PartitionId)>> {
let pairs = match self {
PartitionSelectionStrategy::All(topic) => {
let topics = spu_pool.metadata.topics();
Expand All @@ -707,14 +707,14 @@ impl PartitionSelectionStrategy {
#[derive(Clone)]
pub struct MultiplePartitionConsumer {
strategy: PartitionSelectionStrategy,
pool: Arc<SpuPool>,
pool: Arc<SpuSocketPool>,
metrics: Arc<ClientMetrics>,
}

impl MultiplePartitionConsumer {
pub(crate) fn new(
strategy: PartitionSelectionStrategy,
pool: Arc<SpuPool>,
pool: Arc<SpuSocketPool>,
metrics: Arc<ClientMetrics>,
) -> Self {
Self {
Expand Down
18 changes: 11 additions & 7 deletions crates/fluvio/src/fluvio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use fluvio_future::net::DomainConnector;
use semver::Version;

use crate::admin::FluvioAdmin;
use crate::spu::SpuPool;
use crate::TopicProducer;
use crate::PartitionConsumer;

Expand All @@ -29,15 +30,15 @@ use crate::consumer::{
};
use crate::metrics::ClientMetrics;
use crate::producer::TopicProducerConfig;
use crate::spu::SpuPool;
use crate::spu::SpuSocketPool;
use crate::sync::MetadataStores;

/// An interface for interacting with Fluvio streaming
pub struct Fluvio {
socket: SharedMultiplexerSocket,
config: Arc<ClientConfig>,
versions: Versions,
spu_pool: OnceCell<Arc<SpuPool>>,
spu_pool: OnceCell<Arc<SpuSocketPool>>,
metadata: MetadataStores,
watch_version: i16,
metric: Arc<ClientMetrics>,
Expand Down Expand Up @@ -125,12 +126,12 @@ impl Fluvio {
}

/// lazy get spu pool
async fn spu_pool(&self) -> Result<Arc<SpuPool>> {
async fn spu_pool(&self) -> Result<Arc<SpuSocketPool>> {
self.spu_pool
.get_or_try_init(|| async {
let metadata =
MetadataStores::start(self.socket.clone(), self.watch_version).await?;
let pool = SpuPool::start(self.config.clone(), metadata);
let pool = SpuSocketPool::start(self.config.clone(), metadata);
Ok(Arc::new(pool?))
})
.await
Expand All @@ -153,7 +154,10 @@ impl Fluvio {
/// # Ok(())
/// # }
/// ```
pub async fn topic_producer(&self, topic: impl Into<String>) -> Result<TopicProducer> {
pub async fn topic_producer(
&self,
topic: impl Into<String>,
) -> Result<TopicProducer<SpuSocketPool>> {
self.topic_producer_with_config(topic, Default::default())
.await
}
Expand All @@ -179,12 +183,12 @@ impl Fluvio {
&self,
topic: impl Into<String>,
config: TopicProducerConfig,
) -> Result<TopicProducer> {
) -> Result<TopicProducer<SpuSocketPool>> {
let topic = topic.into();
debug!(topic = &*topic, "Creating producer");

let spu_pool = self.spu_pool().await?;
if !spu_pool.topic_exists(&topic).await? {
if !spu_pool.topic_exists(topic.clone()).await? {
return Err(FluvioError::TopicNotFound(topic).into());
}

Expand Down
4 changes: 3 additions & 1 deletion crates/fluvio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ const MINIMUM_PLATFORM_VERSION: &str = "0.9.0";
///
/// [`Fluvio`]: ./struct.Fluvio.html
#[instrument(skip(topic))]
pub async fn producer(topic: impl Into<String>) -> anyhow::Result<TopicProducer> {
pub async fn producer(
topic: impl Into<String>,
) -> anyhow::Result<TopicProducer<spu::SpuSocketPool>> {
let fluvio = Fluvio::connect().await?;
let producer = fluvio.topic_producer(topic).await?;
Ok(producer)
Expand Down
Loading

0 comments on commit b74ae3e

Please sign in to comment.