diff --git a/Cargo.lock b/Cargo.lock index 3feb588..2bacead 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -201,6 +201,7 @@ dependencies = [ "crossterm_winapi", "derive_more", "document-features", + "futures-core", "mio", "parking_lot", "rustix 1.0.7", @@ -388,6 +389,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -404,12 +420,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -428,8 +466,10 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", "futures-io", + "futures-macro", "futures-sink", "futures-task", "memchr", @@ -622,7 +662,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -772,6 +812,17 @@ dependencies = [ "syn", ] +[[package]] +name = "io-uring" +version = "0.7.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "046fa2d4d00aea763528b4950358d0ead425372445dc8ff86312b3c69ff7727b" +dependencies = [ + "bitflags", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -1446,6 +1497,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -1600,17 +1661,31 @@ dependencies = [ [[package]] name = "tokio" -version = "1.45.1" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "pin-project-lite", - "socket2", - "windows-sys 0.52.0", + "slab", + "socket2 0.6.0", + "tokio-macros", + "windows-sys 0.59.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1635,13 +1710,14 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.15" +version = "0.7.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" +checksum = "14307c986784f72ef81c89db7d9e28d6ac26d16213b109ea501696195e6e3ce5" dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", "pin-project-lite", "tokio", ] @@ -1716,11 +1792,14 @@ version = "0.1.0" dependencies = [ "argh", "crossterm 0.29.0", + "futures", "open", "ratatui", "reqwest", "rusqlite", "time", + "tokio", + "tokio-util", "xml-rs", ] diff --git a/Cargo.toml b/Cargo.toml index 2d8b326..57c692e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,12 +5,15 @@ edition = "2021" [dependencies] argh = "0.1.13" -crossterm = "0.29.0" +crossterm = { version = "0.29.0", features = ["event-stream"] } +futures = "0.3.31" open = "5.3.2" ratatui = "0.29.0" reqwest = { version = "0.12.20", features = ["blocking"] } rusqlite = { version = "0.36.0", features = ["bundled", "time"] } time = { version = "0.3.41", features = ["parsing"] } +tokio = { version = "1.47.1", features = ["macros", "rt", "rt-multi-thread"] } +tokio-util = { version = "0.7.16", features = ["futures-util"] } xml-rs = "0.8.26" [profile.release] diff --git a/src/main.rs b/src/main.rs index 4d7dbb4..127f5dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,15 +8,18 @@ pub mod parser; pub mod persistence; pub mod ui; -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { let args = argh::from_env::(); - let mut ctx = TrsEnv::new("test3")?; + let db_name = "test3"; match args.sub_command { TrsSubCommand::AddChannel(args) => { + let mut ctx = TrsEnv::new(db_name)?; commands::add_channel(&mut ctx, &args)?; Ok(()) } TrsSubCommand::ListChannels(args) => { + let mut ctx = TrsEnv::new(db_name)?; let channels = commands::list_channels(&mut ctx, &args)?; for channel in channels { println!( @@ -27,8 +30,12 @@ fn main() -> Result<()> { Ok(()) } - TrsSubCommand::RemoveChannel(args) => commands::remove_channel(&mut ctx, &args), + TrsSubCommand::RemoveChannel(args) => { + let mut ctx = TrsEnv::new("test3")?; + commands::remove_channel(&mut ctx, &args) + } TrsSubCommand::GetArticles(args) => { + let mut ctx = TrsEnv::new(db_name)?; let channels = commands::get_articles_by_channel(&mut ctx, &args)?; for channel in channels { println!( @@ -49,7 +56,10 @@ fn main() -> Result<()> { } Ok(()) } - TrsSubCommand::MarkRead(args) => commands::mark_read(&mut ctx, &args), - TrsSubCommand::Ui(args) => ui::ui(ctx, &args), + TrsSubCommand::MarkRead(args) => { + let mut ctx = TrsEnv::new("test3")?; + commands::mark_read(&mut ctx, &args) + } + TrsSubCommand::Ui(args) => ui::ui(&args, db_name).await, } } diff --git a/src/ui.rs b/src/ui.rs index 8881b84..eda8d1b 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -8,27 +8,28 @@ pub mod title; use std::{ io::Stdout, - sync::mpsc::{channel, Receiver, Sender}, - thread, time::Duration, + sync::mpsc::{channel, Sender}, + time::Duration, }; use crate::{ - args::{self, ListChannelArgs, UiArgs}, - commands::{self, TrsEnv}, + args::{self, UiArgs}, error::{Result, TrsError}, persistence::RssChannelD, }; use articles::ArticlesWidget; use channels::ChannelsWidget; use controls::ControlsWidget; -use crossterm::event; +use crossterm::event::{self, KeyEventKind}; use debug::DebugWidget; use executor::UiCommandExecutor; +use futures::{FutureExt, StreamExt}; use ratatui::{ prelude::*, widgets::{Block, Borders}, }; use title::TitleWidget; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; pub struct AppState { exit: bool, @@ -42,7 +43,7 @@ pub struct AppState { show_add_channel_ui: bool, add_channel: String, dispatcher: Sender, - receiver: Receiver, + receiver: UnboundedReceiver, } #[derive(Clone, Copy, PartialEq)] @@ -82,11 +83,13 @@ pub enum UiCommandDispatchActions { AddChannel(args::AddChannelArgs), RemoveChannel(args::RemoveChannelArgs), MarkArticleRead(args::MarkReadArgs), + ListChannels(args::ListChannelArgs), } -pub fn ui(ctx: TrsEnv, args: &UiArgs) -> Result<()> { - let (tdispatch, rdispatch) = channel(); - let (tupdate, rupdate) = channel(); +pub async fn ui(args: &UiArgs, db_name: &str) -> Result<()> { + let (app_dispatch, app_recv) = channel(); + let (executor_dispatch, executor_recv) = tokio::sync::mpsc::unbounded_channel(); + let event_recv = start_event_loop(executor_recv); let mut terminal = ratatui::init(); let mut app_state = AppState { channels: Vec::new(), @@ -99,33 +102,81 @@ pub fn ui(ctx: TrsEnv, args: &UiArgs) -> Result<()> { last_action: None, show_add_channel_ui: false, add_channel: String::new(), - dispatcher: tdispatch, - receiver: rupdate, + dispatcher: app_dispatch, + receiver: event_recv, }; - let ctx_cloned = ctx.clone(); - let executor = UiCommandExecutor::new(rdispatch, tupdate); - let executor_handle = thread::spawn(move || { - executor.run(ctx_cloned); + let db_name = db_name.to_string(); + std::thread::spawn(move || { + let mut executor = UiCommandExecutor::new(app_recv, executor_dispatch); + executor.run(db_name); }); - let channels = commands::list_channels(&ctx, &ListChannelArgs { limit: None })?; - app_state.channels = channels; + app_state + .dispatcher + .send(UiCommandDispatchActions::ListChannels( + args::ListChannelArgs { limit: None }, + )) + .map_err(|e| TrsError::Error(format!("Unable to send initial app: {}", e)))?; loop { draw(&app_state, &mut terminal)?; - handle_events(&mut app_state, &ctx)?; + handle_events(&mut app_state).await?; if app_state.exit { break; } } drop(app_state); - executor_handle.join().unwrap(); ratatui::restore(); Ok(()) } +fn start_event_loop( + mut executor_recv: UnboundedReceiver, +) -> UnboundedReceiver { + let (evt_dispatch, evt_recv) = tokio::sync::mpsc::unbounded_channel(); + let _event_tx = evt_dispatch.clone(); + let _task = tokio::spawn(async move { + let mut reader = crossterm::event::EventStream::new(); + let mut tick_interval = tokio::time::interval(Duration::from_millis(250)); + loop { + let tick_delay = tick_interval.tick(); + let crossterm_event = reader.next().fuse(); + tokio::select! { + user_input = crossterm_event => { + match user_input { + Some(Ok(evt)) => { + match evt { + crossterm::event::Event::Key(key) => { + if key.kind == KeyEventKind::Press { + _event_tx.send(Event::UserInput(crossterm::event::Event::Key(key))).unwrap(); + } + }, + _ => {} + } + }, + _ => {} + } + }, + executor_event = executor_recv.recv() => { + match executor_event { + Some(backend_event) => { + _event_tx.send(Event::BackendEvent(backend_event)).unwrap(); + }, + None => {} + } + }, + _ = tick_delay => { + _event_tx.send(Event::Tick).unwrap(); + }, + } + } + }); + + evt_recv +} + fn draw(app_state: &AppState, terminal: &mut Terminal>) -> Result<()> { terminal .draw(|f| { @@ -138,20 +189,29 @@ fn draw(app_state: &AppState, terminal: &mut Terminal>) pub enum Event { UserInput(crossterm::event::Event), - ReloadState, + BackendEvent(BackendEvent), Tick, } -fn handle_events(state: &mut AppState, ctx: &TrsEnv) -> Result<()> { - let event = get_event(state)?; +pub enum BackendEvent { + ReloadState(Vec), +} + +async fn handle_events(state: &mut AppState) -> Result<()> { + let event = state.receiver.recv().await; + let Some(event) = event else { + return Ok(()); + }; + match event { Event::UserInput(event) => { handle_user_input(state, event)?; } - Event::ReloadState => { - let channels = commands::list_channels(&ctx, &ListChannelArgs { limit: None })?; - state.channels = channels; - } + Event::BackendEvent(backend_event) => match backend_event { + BackendEvent::ReloadState(channels) => { + state.channels = channels; + } + }, Event::Tick => {} }; @@ -171,21 +231,6 @@ fn handle_user_input(state: &mut AppState, event: event::Event) -> Result<()> { return Ok(()); } -fn get_event(state: &mut AppState) -> Result { - let recv_action = state.receiver.try_recv(); - if let Ok(_) = recv_action { - return Ok(Event::ReloadState); - } - - let raw_event = event::poll(Duration::from_millis(250)).map_err(|e| TrsError::TuiError(e))?; - if raw_event == false { - return Ok(Event::Tick); - } - - // It's guaranteed that an event is available now - Ok(Event::UserInput(event::read().unwrap())) -} - struct AppStateWidget<'a> { app_state: &'a AppState, } diff --git a/src/ui/executor.rs b/src/ui/executor.rs index 80cd11c..d0eaf99 100644 --- a/src/ui/executor.rs +++ b/src/ui/executor.rs @@ -1,26 +1,32 @@ -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::Receiver; + +use tokio::sync::mpsc::UnboundedSender; + +use crate::{commands::TrsEnv, ui::BackendEvent}; use super::UiCommandDispatchActions; pub struct UiCommandExecutor { - pub command_receiver: Receiver, - pub status_sender: Sender, + pub app_recv: Receiver, + pub executor_dispatch: UnboundedSender, } impl UiCommandExecutor { pub fn new( - command_receiver: Receiver, - status_sender: Sender, + app_recv: Receiver, + executor_dispatch: UnboundedSender, ) -> Self { UiCommandExecutor { - command_receiver, - status_sender, + app_recv, + executor_dispatch, } } - pub fn run(&self, ctx: crate::commands::TrsEnv) -> () { + // This one will have to run on the same thread as this manages the sqlite connection + pub fn run(&mut self, db_name: String) -> () { + let ctx = TrsEnv::new(db_name.as_str()).unwrap(); loop { - let action = self.command_receiver.recv(); + let action = self.app_recv.recv(); let Ok(action) = action else { break; }; @@ -28,22 +34,39 @@ impl UiCommandExecutor { match action { UiCommandDispatchActions::AddChannel(args) => { if let Ok(_) = crate::commands::add_channel(&ctx, &args) { - self.status_sender.send(1).unwrap_or_default(); + self.send_new_state_default(&ctx); }; } - UiCommandDispatchActions::RemoveChannel(args) => { if let Ok(_) = crate::commands::remove_channel(&ctx, &args) { - self.status_sender.send(1).unwrap_or_default(); + self.send_new_state_default(&ctx); } } - UiCommandDispatchActions::MarkArticleRead(args) => { if let Ok(_) = crate::commands::mark_read(&ctx, &args) { - self.status_sender.send(1).unwrap_or_default(); + self.send_new_state_default(&ctx); } } + UiCommandDispatchActions::ListChannels(args) => { + self.send_new_state(&ctx, args); + } } } } + + fn send_new_state_default(&mut self, ctx: &crate::commands::TrsEnv) { + self.send_new_state(ctx, crate::args::ListChannelArgs { limit: None }); + } + + fn send_new_state( + &mut self, + ctx: &crate::commands::TrsEnv, + args: crate::args::ListChannelArgs, + ) { + if let Ok(channels) = crate::commands::list_channels(ctx, &args) { + self.executor_dispatch + .send(BackendEvent::ReloadState(channels)) + .unwrap_or_default(); + } + } }