Skip to content

Commit

Permalink
persist active config
Browse files Browse the repository at this point in the history
  • Loading branch information
jvanbuel committed Dec 7, 2024
1 parent 93ad52d commit fa95933
Show file tree
Hide file tree
Showing 13 changed files with 114 additions and 73 deletions.
6 changes: 3 additions & 3 deletions src/airflow/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ impl AirFlowClient {
}
}

impl From<AirflowConfig> for AirFlowClient {
fn from(config: AirflowConfig) -> Self {
Self::new(config).unwrap()
impl From<&AirflowConfig> for AirFlowClient {
fn from(config: &AirflowConfig) -> Self {
Self::new(config.clone()).unwrap()
}
}

Expand Down
35 changes: 25 additions & 10 deletions src/airflow/config/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::{Display, Formatter};
use std::fs::OpenOptions;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;

use clap::ValueEnum;
use log::info;
Expand Down Expand Up @@ -35,6 +35,9 @@ impl Display for ManagedService {
pub struct FlowrsConfig {
pub servers: Option<Vec<AirflowConfig>>,
pub managed_services: Option<Vec<ManagedService>>,
pub active_server: Option<String>,
#[serde(skip_serializing)]
pub path: Option<PathBuf>,
}

#[derive(Deserialize, Serialize, Debug, Clone)]
Expand Down Expand Up @@ -64,14 +67,16 @@ pub struct TokenCmd {
}

impl FlowrsConfig {
pub fn from_file(config_path: Option<&Path>) -> Result<Self> {
pub fn from_file(config_path: &Option<PathBuf>) -> Result<Self> {
let path = match config_path {
Some(path) => path,
None => CONFIG_FILE.as_path(),
None => &CONFIG_FILE.as_path().to_path_buf(),
};

let toml_config = std::fs::read_to_string(path)?;
Self::from_str(&toml_config)
let mut config = Self::from_str(&toml_config)?;
config.path = Some(path.to_path_buf());
Ok(config)
}
pub fn from_str(config: &str) -> Result<Self> {
let mut config: FlowrsConfig = toml::from_str(config)?;
Expand Down Expand Up @@ -115,19 +120,27 @@ impl FlowrsConfig {
toml::to_string(self).map_err(|e| e.into())
}

pub fn to_file(self: FlowrsConfig, path: Option<&Path>) -> Result<()> {
let path = match path {
Some(path) => path,
None => CONFIG_FILE.as_path(),
};
pub fn write_to_file(&mut self) -> Result<()> {
let path = self
.path
.clone()
.unwrap_or(CONFIG_FILE.as_path().to_path_buf());
let mut file = OpenOptions::new()
.read(true)
.write(true)
.truncate(true)
.create(true)
.open(path)?;

file.write_all(Self::to_str(&self)?.as_bytes())?;
// Only write non-managed servers to the config file
if let Some(servers) = &mut self.servers {
*servers = servers
.iter()
.filter(|server| server.managed.is_none())
.cloned()
.collect();
}
file.write_all(Self::to_str(self)?.as_bytes())?;
Ok(())
}
}
Expand Down Expand Up @@ -184,6 +197,8 @@ password = "airflow"
managed: None,
}]),
managed_services: Some(vec![ManagedService::Conveyor]),
active_server: None,
path: None,
};

let serialized_config = config.to_str().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/airflow/managed_services/conveyor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::airflow::config::{AirflowAuth, AirflowConfig, ManagedService, TokenCmd};
use anyhow::Result;
use log::info;
use serde::{Deserialize, Serialize};
use anyhow::Result;

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct ConveyorEnvironment {
Expand Down
29 changes: 18 additions & 11 deletions src/app.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use std::{
io,
sync::{Arc, Mutex},
};
use std::sync::{Arc, Mutex};

use anyhow::Result;
use crossterm::event::{KeyCode, KeyModifiers};
use events::{custom::FlowrsEvent, generator::EventGenerator};
use log::debug;
Expand All @@ -18,21 +16,27 @@ pub mod model;
pub mod state;
pub mod worker;

pub async fn run_app<B: Backend>(
terminal: &mut Terminal<B>,
app: Arc<Mutex<App>>,
) -> io::Result<()> {
pub async fn run_app<B: Backend>(terminal: &mut Terminal<B>, app: Arc<Mutex<App>>) -> Result<()> {
let mut events = EventGenerator::new(200);
let ui_app = app.clone();
let worker_app = app.clone();

let (tx_worker, rx_worker) = tokio::sync::mpsc::channel::<WorkerMessage>(100);

log::info!("Starting worker");
let airflow_client: AirFlowClient;
let airflow_client: Option<AirFlowClient>;
{
let app = app.lock().unwrap();
airflow_client = AirFlowClient::from(app.configs.all[0].clone());
let previously_active_server = &app.config.active_server;
let airflow_config = match previously_active_server {
Some(server) => app
.config
.servers
.as_ref()
.and_then(|servers| servers.iter().find(|s| s.name == *server)),
None => None,
};
airflow_client = airflow_config.map(AirFlowClient::from);
}

log::info!("Spawning worker");
Expand Down Expand Up @@ -84,7 +88,10 @@ pub async fn run_app<B: Backend>(
}
// Handle other key events
match key.code {
KeyCode::Char('q') => return Ok(()),
KeyCode::Char('q') => {
app.config.write_to_file()?;
return Ok(());
}
KeyCode::Enter | KeyCode::Right | KeyCode::Char('l') => app.next_panel(),
KeyCode::Esc | KeyCode::Left | KeyCode::Char('h') => app.previous_panel(),
_ => {}
Expand Down
12 changes: 10 additions & 2 deletions src/app/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,23 @@ pub enum Panel {

impl App {
pub fn new(config: FlowrsConfig) -> Result<Self> {
let servers = &config.clone().servers.unwrap();
let servers = &config.clone().servers.unwrap_or_default();
let active_server = if let Some(active_server) = &config.active_server {
servers.iter().find(|server| server.name == *active_server)
} else {
None
};
Ok(App {
config,
dags: DagModel::new(),
configs: ConfigModel::new(servers.to_vec()),
dagruns: DagRunModel::new(),
task_instances: TaskInstanceModel::new(),
logs: LogModel::new(),
active_panel: Panel::Dag,
active_panel: match active_server {
Some(_) => Panel::Dag,
None => Panel::Config,
},
ticks: 0,
})
}
Expand Down
51 changes: 31 additions & 20 deletions src/app/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tokio::sync::mpsc::Receiver;

pub struct Worker {
app: Arc<Mutex<App>>,
client: AirFlowClient,
client: Option<AirFlowClient>,
rx_worker: Receiver<WorkerMessage>,
}

Expand Down Expand Up @@ -71,7 +71,7 @@ pub enum WorkerMessage {
impl Worker {
pub fn new(
app: Arc<Mutex<App>>,
client: AirFlowClient,
client: Option<AirFlowClient>,
rx_worker: Receiver<WorkerMessage>,
) -> Self {
Worker {
Expand All @@ -82,9 +82,16 @@ impl Worker {
}

pub async fn process_message(&mut self, message: WorkerMessage) {
if self.client.is_none() {
if let WorkerMessage::ConfigSelected(idx) = message {
self.switch_airflow_client(idx);
};
return;
}
let client = self.client.as_ref().unwrap();
match message {
WorkerMessage::UpdateDags => {
let dag_list = self.client.list_dags().await;
let dag_list = client.list_dags().await;
let mut app = self.app.lock().unwrap();
match dag_list {
Ok(dag_list) => {
Expand All @@ -95,19 +102,17 @@ impl Worker {
}
}
WorkerMessage::ToggleDag { dag_id, is_paused } => {
let dag = self.client.toggle_dag(&dag_id, is_paused).await;
let dag = client.toggle_dag(&dag_id, is_paused).await;
if let Err(e) = dag {
let mut app = self.app.lock().unwrap();
app.dags.errors.push(e);
}
}
WorkerMessage::ConfigSelected(idx) => {
let mut app = self.app.lock().unwrap();
self.client = AirFlowClient::from(app.configs.filtered.items[idx].clone());
*app = App::new(app.config.clone()).unwrap();
self.switch_airflow_client(idx);
}
WorkerMessage::UpdateDagRuns { dag_id, clear } => {
let dag_runs = self.client.list_dagruns(&dag_id).await;
let dag_runs = client.list_dagruns(&dag_id).await;
let mut app = self.app.lock().unwrap();
if clear {
app.dagruns.dag_id = Some(dag_id);
Expand All @@ -125,7 +130,7 @@ impl Worker {
dag_run_id,
clear,
} => {
let task_instances = self.client.list_task_instances(&dag_id, &dag_run_id).await;
let task_instances = client.list_task_instances(&dag_id, &dag_run_id).await;
let mut app = self.app.lock().unwrap();
if clear {
app.task_instances.dag_run_id = Some(dag_run_id);
Expand All @@ -150,7 +155,7 @@ impl Worker {
.clone();
}

let dag_code = self.client.get_dag_code(&current_dag.file_token).await;
let dag_code = client.get_dag_code(&current_dag.file_token).await;
let mut app = self.app.lock().unwrap();
match dag_code {
Ok(dag_code) => {
Expand All @@ -171,7 +176,7 @@ impl Worker {
dag_ids
};
let dag_ids_str: Vec<&str> = dag_ids.iter().map(|s| s.as_str()).collect();
let dag_stats = self.client.get_dag_stats(dag_ids_str).await;
let dag_stats = client.get_dag_stats(dag_ids_str).await;
let mut app = self.app.lock().unwrap();
if clear {
app.dags.dag_stats = Default::default();
Expand All @@ -187,7 +192,7 @@ impl Worker {
}
WorkerMessage::ClearDagRun { dag_run_id, dag_id } => {
debug!("Clearing dag_run: {}", dag_run_id);
let dag_run = self.client.clear_dagrun(&dag_id, &dag_run_id).await;
let dag_run = client.clear_dagrun(&dag_id, &dag_run_id).await;
if let Err(e) = dag_run {
debug!("Error clearing dag_run: {}", e);
let mut app = self.app.lock().unwrap();
Expand All @@ -203,7 +208,7 @@ impl Worker {
debug!("Getting logs for task: {task_id}, try number {task_try}");
let logs = join_all(
(1..=task_try)
.map(|i| self.client.get_task_logs(&dag_id, &dag_run_id, &task_id, i))
.map(|i| client.get_task_logs(&dag_id, &dag_run_id, &task_id, i))
.collect::<Vec<_>>(),
)
.await;
Expand Down Expand Up @@ -233,8 +238,7 @@ impl Worker {
let mut app = self.app.lock().unwrap();
app.dagruns.mark_dag_run(&dag_run_id, &status.to_string());
}
let dag_run = self
.client
let dag_run = client
.mark_dag_run(&dag_id, &dag_run_id, &status.to_string())
.await;
if let Err(e) = dag_run {
Expand All @@ -249,8 +253,7 @@ impl Worker {
dag_run_id,
} => {
debug!("Clearing task_instance: {}", task_id);
let task_instance = self
.client
let task_instance = client
.clear_task_instance(&dag_id, &dag_run_id, &task_id)
.await;
if let Err(e) = task_instance {
Expand All @@ -272,8 +275,7 @@ impl Worker {
app.task_instances
.mark_task_instance(&task_id, &status.to_string());
}
let task_instance = self
.client
let task_instance = client
.mark_task_instance(&dag_id, &dag_run_id, &task_id, &status.to_string())
.await;
if let Err(e) = task_instance {
Expand All @@ -284,7 +286,7 @@ impl Worker {
}
WorkerMessage::TriggerDagRun { dag_id } => {
debug!("Triggering dag_run: {}", dag_id);
let dag_run = self.client.trigger_dag_run(&dag_id).await;
let dag_run = client.trigger_dag_run(&dag_id).await;
if let Err(e) = dag_run {
debug!("Error triggering dag_run: {}", e);
let mut app = self.app.lock().unwrap();
Expand All @@ -294,6 +296,15 @@ impl Worker {
}
}

pub fn switch_airflow_client(&mut self, idx: usize) {
let selected_config = self.app.lock().unwrap().configs.filtered.items[idx].clone();
self.client = Some(AirFlowClient::from(&selected_config));

let mut app = self.app.lock().unwrap();
app.config.active_server = Some(selected_config.name.clone());
*app = App::new(app.config.clone()).unwrap();
}

pub async fn run(&mut self) {
loop {
if let Some(message) = self.rx_worker.recv().await {
Expand Down
8 changes: 4 additions & 4 deletions src/commands/config/add.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::path::Path;
use std::path::PathBuf;

use inquire::Select;
use log::info;
Expand Down Expand Up @@ -62,8 +62,8 @@ impl AddCommand {
}
};

let path = self.file.as_ref().map(Path::new);
let mut config = FlowrsConfig::from_file(path)?;
let path = self.file.as_deref().map(PathBuf::from);
let mut config = FlowrsConfig::from_file(&path)?;

if let Some(mut servers) = config.servers.clone() {
servers.retain(|server| server.name != new_config.name && server.managed.is_none());
Expand All @@ -73,7 +73,7 @@ impl AddCommand {
config.servers = Some(vec![new_config]);
}

config.to_file(path)?;
config.write_to_file()?;

println!("✅ Config added successfully!");
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions src/commands/config/list.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::path::Path;
use std::path::PathBuf;

use super::model::ListCommand;
use crate::airflow::config::FlowrsConfig;
use anyhow::Result;

impl ListCommand {
pub fn run(&self) -> Result<()> {
let path = self.file.as_ref().map(Path::new);
let config = FlowrsConfig::from_file(path)?;
let path = self.file.as_ref().map(PathBuf::from);
let config = FlowrsConfig::from_file(&path)?;
let servers = config.servers.unwrap_or_default();

if servers.is_empty() {
Expand Down
Loading

0 comments on commit fa95933

Please sign in to comment.