diff --git a/Cargo.lock b/Cargo.lock index 729a834..73147ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1070,10 +1070,12 @@ dependencies = [ "system-configuration", "tokio", "tokio-native-tls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "winreg", ] @@ -1860,6 +1862,19 @@ version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.67" diff --git a/Cargo.toml b/Cargo.toml index 3ef482e..129dd83 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ futures = "0.3.30" lazy-regex = "3.1.0" lazy_static = "1.4.0" regex = "1.10.3" -reqwest = "0.11.23" +reqwest = { version = "0.11.23", features = ["stream"] } rocket = "0.5.0" rustls = "0.22.2" tokio = {version = "1.35.1", features = ["full"]} diff --git a/src/engines/duckduckgo.rs b/src/engines/duckduckgo.rs index 99eb11d..3397815 100644 --- a/src/engines/duckduckgo.rs +++ b/src/engines/duckduckgo.rs @@ -1,17 +1,16 @@ // Search engine parser for DuckDuckGo pub mod duckduckgo { use std::{ - cmp::min, collections::VecDeque, - pin::Pin, - str::Bytes, + pin::{pin, Pin}, task::{Context, Poll}, }; use async_trait::async_trait; - use futures::Stream; + use futures::{FutureExt, Stream, StreamExt}; use lazy_static::lazy_static; use regex::Regex; + use rocket::http::hyper::body::Bytes; use urlencoding::decode; use crate::{ @@ -35,32 +34,7 @@ pub mod duckduckgo { results_started: bool, previous_block: String, // Holds all results until consumed by iterator - pub results: VecDeque, - } - - impl DuckDuckGo { - fn slice_remaining_block(&mut self, start_position: &usize) { - let previous_block_bytes = self.previous_block.as_bytes().to_vec(); - let remaining_bytes = previous_block_bytes[*start_position..].to_vec(); - let remaining_text = String::from_utf8(remaining_bytes).unwrap(); - - self.previous_block.clear(); - self.previous_block.push_str(&remaining_text); - } - - pub fn new() -> Self { - Self { - callback: Box::new(|_: SearchResult| {}), - results_started: false, - previous_block: String::new(), - results: VecDeque::new(), - completed: false, - } - } - - pub fn set_callback(&mut self, callback: CallbackType) { - self.callback = callback; - } + pub results: Vec, } // impl Stream for DuckDuckGo { @@ -100,33 +74,80 @@ pub mod duckduckgo { // } // } + pub struct DuckDuckGoSearchStream<'a> { + pub ddg: &'a DuckDuckGo, + } + + impl<'a> DuckDuckGoSearchStream<'a> { + pub fn new(ddg: &'a DuckDuckGo) -> Self { + Self { ddg } + } + } + #[async_trait] - impl EngineBase for DuckDuckGo { - fn search(&mut self, query: &str) { - dbg!("searching duckduckgo"); + impl Stream for DuckDuckGoSearchStream<'_> { + type Item = SearchResult; - let client = Client::new("https://html.duckduckgo.com/html/"); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Poll::Pending + } + } - let packets = client.request(&"POST").unwrap(); + impl DuckDuckGo { + pub fn get_stream<'a>(&'a self) -> impl Stream + 'a { + DuckDuckGoSearchStream { ddg: self } + } - for ii in (0..packets.len()).step_by(PACKET_SIZE) { - let end_range = min(packets.len(), ii + PACKET_SIZE); + fn slice_remaining_block(&mut self, start_position: &usize) { + let previous_block_bytes = self.previous_block.as_bytes().to_vec(); + let remaining_bytes = previous_block_bytes[*start_position..].to_vec(); + let remaining_text = String::from_utf8(remaining_bytes).unwrap(); - let slice = &packets[ii..end_range]; - self.parse_packet(slice.iter()); + self.previous_block.clear(); + self.previous_block.push_str(&remaining_text); + } - // Call callback, there is probably a better way to do this - // while self.results.len() > 0 { - // let result = self.results.pop_front().unwrap(); - // - // (self.callback)(result); - // } + pub fn new() -> Self { + Self { + callback: Box::new(|_: SearchResult| {}), + results_started: false, + previous_block: String::new(), + results: vec![], + completed: false, + } + } + + pub fn set_callback(&mut self, callback: CallbackType) { + self.callback = callback; + } + + pub async fn search(&mut self, query: &str) { + let client = reqwest::Client::new(); + + let mut stream = client + .post("https://html.duckduckgo.com/html/") + .header("Content-Type", "application/x-www-form-urlencoded") + .body("q=duck") + .send() + .await + .unwrap() + .bytes_stream(); + + while let Some(item) = stream.next().await { + let packet = item.unwrap(); + + if let Some(result) = self.parse_packet(packet.iter()) { + self.results.push(result); + } } self.completed = true; } - fn parse_packet<'a>(&mut self, packet: impl Iterator) { + fn parse_packet<'a>( + &mut self, + packet: impl Iterator, + ) -> Option { let bytes: Vec = packet.map(|bit| *bit).collect(); let raw_text = String::from_utf8_lossy(&bytes); let text = STRIP.replace_all(&raw_text, " "); @@ -159,15 +180,15 @@ pub mod duckduckgo { let end_position = captures.name("end").unwrap().end(); self.slice_remaining_block(&end_position); - // (self.callback)(result); - - self.results.push_back(result); + return Some(result); } None => {} } } else if RESULTS_START.is_match(&text) { self.results_started = true; } + + None } } } diff --git a/src/engines/engine_base.rs b/src/engines/engine_base.rs index 470d7ac..5f3a519 100644 --- a/src/engines/engine_base.rs +++ b/src/engines/engine_base.rs @@ -17,6 +17,6 @@ pub mod engine_base { #[async_trait] pub trait EngineBase { fn parse_packet<'a>(&mut self, packet: impl Iterator); - fn search(&mut self, query: &str); + async fn search(&mut self, query: &str); } } diff --git a/src/main.rs b/src/main.rs index ddd0e29..bb1b36c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,13 +26,10 @@ use std::{ sync::{Arc, RwLock}, }; -use rocket::response::stream::{ReaderStream, TextStream}; +use futures::lock::Mutex; +use rocket::response::stream::TextStream; -use engines::{ - duckduckgo::duckduckgo::DuckDuckGo, engine_base::engine_base::EngineBase, - engine_base::engine_base::SearchResult, -}; -use tokio::sync::Mutex; +use engines::duckduckgo::duckduckgo::DuckDuckGo; pub mod client; pub mod engines; @@ -66,55 +63,52 @@ fn search_get() -> &'static str { async fn hello<'a>(query: &str) -> TextStream![String] { let query_box = Box::new(query.to_string()); + let ddg_ref = Arc::new(Mutex::new(DuckDuckGo::new())); + let ddg_writer_ref = ddg_ref.clone(); + + tokio::spawn(async move { + let mut ddg = ddg_writer_ref.lock().await; + ddg.search(&query_box).await; + }); + + let mut current_index = 0; + TextStream! { - let start = "".to_string(); + let start = "".to_string(); yield start; - let ddg_tv = Arc::new( - Mutex::new( - DuckDuckGo::new(), - ), - ); - let ddg_tv_clone = ddg_tv.clone(); - - tokio::spawn(async move { - ddg_tv_clone.lock().await.search(&query_box); - }); - - let mut last_position: i32 = -1; - loop { - let ddg = ddg_tv.lock().await; - let len = ddg.results.len() as i32; + let ddg = ddg_ref.lock().await; - if ddg.completed && last_position == len { - break; + let len = ddg.results.len(); + + if len == 0 { + continue } - if last_position < (len - 1) { - for i in max(0, last_position)..=(len - 1) { - match ddg.results.get(i as usize).clone() { - Some(result) => { - let html = format!("

{}

{}

", result.title, result.description); - yield html; - } - None => { - break; - } - } - } - - last_position = len; + if ddg.completed && current_index == len - 1 { + break } + + for ii in (current_index + 1)..len { + let result = ddg.results.get(ii); + + dbg!(&result); + } + + // [1] -> 0 + // 1 -> [1] + current_index = len - 1; } let end = "".to_string(); - yield end; + + yield end } } #[launch] -fn rocket() -> _ { +async fn rocket() -> _ { rocket::build() .mount("/", routes![index]) .mount("/", routes![hello])