fix: Use mpsc to pass messages

This commit is contained in:
Myzel394 2024-02-21 09:24:44 +01:00
parent e02c3f4960
commit 5d076328ec
No known key found for this signature in database
GPG Key ID: DEC4AAB876F73185
5 changed files with 172 additions and 315 deletions

View File

@ -1,12 +1,19 @@
// Search engine parser for Brave Search // Search engine parser for Brave Search
// This uses the clearnet, unlocalized version of the search engine. // This uses the clearnet, unlocalized version of the search engine.
pub mod brave { pub mod brave {
use std::sync::Arc;
use futures::lock::Mutex;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use tokio::sync::mpsc::Sender;
use urlencoding::decode; use urlencoding::decode;
use crate::{ use crate::{
engines::engine_base::engine_base::{EngineBase, SearchEngine, SearchResult}, engines::engine_base::engine_base::{
EngineBase, EnginePositions, ResultsCollector, SearchEngine, SearchResult,
},
helpers::helpers::build_default_client,
utils::utils::decode_html_text, utils::utils::decode_html_text,
}; };
@ -17,68 +24,40 @@ pub mod brave {
static ref STRIP_HTML_TAGS: Regex = Regex::new(r#"<(?:"[^"]*"['"]*|'[^']*'['"]*|[^'">])+>"#).unwrap(); static ref STRIP_HTML_TAGS: Regex = Regex::new(r#"<(?:"[^"]*"['"]*|'[^']*'['"]*|[^'">])+>"#).unwrap();
} }
#[derive(Clone, Debug)]
pub struct Brave { pub struct Brave {
pub completed: bool, positions: EnginePositions,
results_started: bool,
pub previous_block: String,
pub results: Vec<SearchResult>,
}
impl Brave {
fn slice_remaining_block(&mut self, start_position: &usize) {
let previous_block_bytes = self.previous_block.as_bytes().to_vec();
let remaining_bytes = previous_block_bytes[*start_position..].to_vec();
let remaining_text = String::from_utf8(remaining_bytes).unwrap();
self.previous_block.clear();
self.previous_block.push_str(&remaining_text);
}
pub fn new() -> Self {
Self {
results_started: false,
previous_block: String::new(),
results: vec![],
completed: false,
}
}
} }
impl EngineBase for Brave { impl EngineBase for Brave {
fn add_result(&mut self, result: crate::engines::engine_base::engine_base::SearchResult) {
self.results.push(result);
}
fn parse_next<'a>(&mut self) -> Option<SearchResult> { fn parse_next<'a>(&mut self) -> Option<SearchResult> {
if self.results_started { if self.positions.started {
match SINGLE_RESULT.captures(&self.previous_block.to_owned()) { if let Some(capture) =
Some(captures) => { SINGLE_RESULT.captures(&self.positions.previous_block.to_owned())
let title = decode(captures.name("title").unwrap().as_str()) {
.unwrap() let title = decode(capture.name("title").unwrap().as_str())
.into_owned(); .unwrap()
let description_raw = .into_owned();
decode_html_text(captures.name("description").unwrap().as_str()) let description_raw =
.unwrap(); decode_html_text(capture.name("description").unwrap().as_str()).unwrap();
let description = STRIP_HTML_TAGS let description = STRIP_HTML_TAGS
.replace_all(&description_raw, "") .replace_all(&description_raw, "")
.into_owned(); .into_owned();
let url = decode(captures.name("url").unwrap().as_str()) let url = decode(capture.name("url").unwrap().as_str())
.unwrap() .unwrap()
.into_owned(); .into_owned();
let result = SearchResult { let result = SearchResult {
title, title,
description, description,
url, url,
engine: SearchEngine::DuckDuckGo, engine: SearchEngine::DuckDuckGo,
}; };
let end_position = captures.get(0).unwrap().end(); let end_position = capture.get(0).unwrap().end();
self.slice_remaining_block(&end_position); self.positions.slice_remaining_block(&end_position);
return Some(result); return Some(result);
}
None => {}
} }
} }
@ -90,15 +69,28 @@ pub mod brave {
let raw_text = String::from_utf8_lossy(&bytes); let raw_text = String::from_utf8_lossy(&bytes);
let text = STRIP.replace_all(&raw_text, " "); let text = STRIP.replace_all(&raw_text, " ");
if self.results_started { if self.positions.started {
self.previous_block.push_str(&text); self.positions.previous_block.push_str(&text);
} else { } else {
self.results_started = RESULTS_START.is_match(&text); self.positions.started = RESULTS_START.is_match(&text);
}
}
}
impl Brave {
pub fn new() -> Self {
Self {
positions: EnginePositions::new(),
} }
} }
async fn search(&mut self, query: &str) { pub async fn search(&mut self, query: &str, tx: Sender<SearchResult>) {
todo!() let client = build_default_client();
let request = client
.get(format!("https://search.brave.com/search?q={}", query))
.send();
self.handle_request(request, tx).await;
} }
} }
} }

View File

@ -1,14 +1,7 @@
// Search engine parser for DuckDuckGo // Search engine parser for DuckDuckGo
pub mod duckduckgo { pub mod duckduckgo {
use std::{
io::{Read, Write},
net::TcpStream,
sync::Arc,
};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use rustls::RootCertStore;
use urlencoding::decode; use urlencoding::decode;
use crate::{ use crate::{
@ -72,10 +65,6 @@ pub mod duckduckgo {
// } // }
impl EngineBase for DuckDuckGo { impl EngineBase for DuckDuckGo {
fn add_result(&mut self, result: SearchResult) {
self.results.push(result);
}
fn parse_next<'a>(&mut self) -> Option<SearchResult> { fn parse_next<'a>(&mut self) -> Option<SearchResult> {
if self.results_started { if self.results_started {
match SINGLE_RESULT.captures(&self.previous_block.to_owned()) { match SINGLE_RESULT.captures(&self.previous_block.to_owned()) {
@ -123,152 +112,6 @@ pub mod duckduckgo {
self.results_started = RESULTS_START.is_match(&text); self.results_started = RESULTS_START.is_match(&text);
} }
} }
// Searches DuckDuckGo for the given query
// Uses rustls as reqwest does not support accessing the raw packets
async fn search(&mut self, query: &str) {
let root_store =
RootCertStore::from_iter(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
let mut config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();
// Allow using SSLKEYLOGFILE.
config.key_log = Arc::new(rustls::KeyLogFile::new());
let now = std::time::Instant::now();
let server_name = "html.duckduckgo.com".try_into().unwrap();
let mut conn = rustls::ClientConnection::new(Arc::new(config), server_name).unwrap();
let mut sock = TcpStream::connect("html.duckduckgo.com:443").unwrap();
let mut tls = rustls::Stream::new(&mut conn, &mut sock);
tls.write_all(
concat!(
"POST /html/ HTTP/1.1\r\n",
"Host: html.duckduckgo.com\r\n",
"Connection: cloSe\r\n",
"Accept-Encoding: identity\r\n",
"Content-Length: 6\r\n",
// form data
"Content-Type: application/x-www-form-urlencoded\r\n",
"\r\n",
"q=test",
)
.as_bytes(),
)
.unwrap();
let mut plaintext = Vec::new();
dbg!(now.elapsed());
loop {
let mut buf = [0; 65535];
tls.conn.complete_io(tls.sock);
let n = tls.conn.reader().read(&mut buf);
if n.is_ok() {
dbg!(&n);
let n = n.unwrap();
if n == 0 {
break;
}
println!("{}", "=================");
dbg!(now.elapsed());
// println!("{}", String::from_utf8_lossy(&buf));
plaintext.extend_from_slice(&buf);
}
}
// let root_store = RootCertStore {
// roots: webpki_roots::TLS_SERVER_ROOTS.into(),
// };
//
// let mut config = rustls::ClientConfig::builder()
// .with_root_certificates(root_store)
// .with_no_client_auth();
//
// // Allow using SSLKEYLOGFILE.
// config.key_log = Arc::new(rustls::KeyLogFile::new());
//
// let server_name = "html.duckduckgo.com".try_into().unwrap();
// let mut conn = rustls::ClientConnection::new(Arc::new(config), server_name).unwrap();
//
// let mut sock = TcpStream::connect("html.duckduckgo.com:443").unwrap();
// let mut tls = rustls::Stream::new(&mut conn, &mut sock);
// tls.write_all(
// concat!(
// "POST /html/ HTTP/1.1\r\n",
// "Host: html.duckduckgo.com\r\n",
// "Connection: close\r\n",
// "Accept-Encoding: identity\r\n",
// "Content-Length: 6\r\n",
// // form data
// "Content-Type: application/x-www-form-urlencoded\r\n",
// "\r\n",
// "q=test",
// )
// .as_bytes(),
// )
// .unwrap();
// let ciphersuite = tls.conn.negotiated_cipher_suite().unwrap();
// writeln!(
// &mut std::io::stderr(),
// "Current ciphersuite: {:?}",
// ciphersuite.suite()
// )
// .unwrap();
//
// // Iterate over the stream to read the response.
// loop {
// let mut buf = [0u8; 1024];
// let n = tls.read(&mut buf).unwrap();
// if n == 0 {
// break;
// }
//
// if let Some(result) = self.parse_packet(buf.iter()) {
// self.add_result(result);
//
// // Wait one second
// std::thread::sleep(std::time::Duration::from_millis(100));
// }
// }
//
// while let Some(result) = self.parse_next() {
// self.add_result(result);
// }
//
// dbg!("done with searching");
// let client = reqwest::Client::new();
//
// let now = std::time::Instant::now();
//
// let mut stream = client
// .post("https://html.duckduckgo.com/html/")
// .header("Content-Type", "application/x-www-form-urlencoded")
// .body(format!("q={}", query))
// .send()
// .await
// .unwrap()
// .bytes_stream();
//
// let diff = now.elapsed();
// dbg!(diff);
//
// while let Some(item) = stream.next().await {
// let packet = item.unwrap();
//
// if let Some(result) = self.parse_packet(packet.iter()) {
// self.add_result(result);
// }
// }
//
// while let Some(result) = self.parse_next() {
// self.add_result(result);
// }
//
// let second_diff = now.elapsed();
// dbg!(second_diff);
}
} }
impl DuckDuckGo { impl DuckDuckGo {

View File

@ -1,12 +1,11 @@
pub mod engine_base { pub mod engine_base {
use std::sync::Arc; use std::sync::Arc;
use bytes::Bytes; use futures::{lock::Mutex, Future, StreamExt};
use futures::{lock::Mutex, Future, Stream, StreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use reqwest::{Client, Error, Response}; use reqwest::{Error, Response};
use tokio::sync::mpsc::Sender;
lazy_static! { lazy_static! {
static ref STRIP: Regex = Regex::new(r"\s+").unwrap(); static ref STRIP: Regex = Regex::new(r"\s+").unwrap();
@ -25,23 +24,19 @@ pub mod engine_base {
pub engine: SearchEngine, pub engine: SearchEngine,
} }
pub trait EngineBase { /// ResultsCollector collects results across multiple tasks
fn add_result(&mut self, result: SearchResult); #[derive(Clone, Debug, Hash, Default)]
pub struct ResultsCollector {
pub started: bool,
pub previous_block: String,
results: Vec<SearchResult>,
current_index: usize,
}
pub trait EngineBase {
fn parse_next<'a>(&mut self) -> Option<SearchResult>; fn parse_next<'a>(&mut self) -> Option<SearchResult>;
fn push_packet<'a>(&mut self, packet: impl Iterator<Item = &'a u8>); fn push_packet<'a>(&mut self, packet: impl Iterator<Item = &'a u8>);
// fn push_packet<'a>(&mut self, packet: impl Iterator<Item = &'a u8>) {
// let bytes: Vec<u8> = packet.map(|bit| *bit).collect();
// let raw_text = String::from_utf8_lossy(&bytes);
// let text = STRIP.replace_all(&raw_text, " ");
//
// if self.results_started {
// self.previous_block.push_str(&text);
// } else {
// self.results_started = RESULTS_START.is_match(&text);
// }
// }
/// Push packet to internal block and return next available search result, if available /// Push packet to internal block and return next available search result, if available
fn parse_packet<'a>( fn parse_packet<'a>(
@ -53,11 +48,89 @@ pub mod engine_base {
self.parse_next() self.parse_next()
} }
async fn search(&mut self, query: &str); async fn handle_request(
&mut self,
request: impl Future<Output = Result<Response, Error>>,
tx: Sender<SearchResult>,
) {
let mut stream = request.await.unwrap().bytes_stream();
while let Some(chunk) = stream.next().await {
let buffer = chunk.unwrap();
self.push_packet(buffer.iter());
while let Some(result) = self.parse_next() {
tx.send(result).await;
}
}
while let Some(result) = self.parse_next() {
tx.send(result).await;
}
}
} }
#[derive(Clone, Debug, Hash, Default)] impl ResultsCollector {
pub struct ResultsCollector { pub fn new() -> Self {
results: Vec<SearchResult>, Self {
results: Vec::new(),
current_index: 0,
previous_block: String::new(),
started: false,
}
}
pub fn results(&self) -> &Vec<SearchResult> {
&self.results
}
pub fn add_result(&mut self, result: SearchResult) {
self.results.push(result);
}
pub fn get_next_items(&self) -> &[SearchResult] {
if self.current_index >= self.results.len() {
return &[];
}
&self.results[self.current_index + 1..self.results.len()]
}
pub fn update_index(&mut self) {
self.current_index = self.results.len() - 1;
}
pub fn has_more_results(&self) -> bool {
if self.results.len() == 0 {
return true;
}
self.current_index < self.results.len() - 1
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct EnginePositions {
pub previous_block: String,
pub started: bool,
}
impl EnginePositions {
pub fn new() -> Self {
EnginePositions {
previous_block: String::new(),
started: false,
}
}
pub fn slice_remaining_block(&mut self, start_position: &usize) {
let previous_block_bytes = self.previous_block.as_bytes().to_vec();
let remaining_bytes = previous_block_bytes[*start_position..].to_vec();
let remaining_text = String::from_utf8(remaining_bytes).unwrap();
self.previous_block.clear();
self.previous_block.push_str(&remaining_text);
}
} }
} }

View File

@ -4,29 +4,18 @@
pub mod helpers { pub mod helpers {
use std::sync::Arc; use std::sync::Arc;
use futures::{lock::Mutex, Future, StreamExt}; use bytes::Bytes;
use reqwest::{Error, Response}; use futures::{lock::Mutex, Future, Stream, StreamExt};
use reqwest::{Client, ClientBuilder, Error, Response};
use crate::engines::engine_base::engine_base::EngineBase; use crate::engines::engine_base::engine_base::{EngineBase, ResultsCollector};
pub async fn run_search( const DEFAULT_USER_AGENT: &str = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.3";
request: impl Future<Output = Result<Response, Error>>,
engine_ref: Arc<Mutex<impl EngineBase>>,
) {
let response = request.await.unwrap();
let mut stream = response.bytes_stream(); pub fn build_default_client() -> Client {
while let Some(chunk) = stream.next().await { ClientBuilder::new()
let buffer = chunk.unwrap(); .user_agent(DEFAULT_USER_AGENT)
.build()
let mut engine = engine_ref.lock().await; .unwrap()
if let Some(result) = engine.parse_packet(buffer.iter()) {
engine.add_result(result);
drop(engine);
tokio::task::yield_now().await;
}
}
} }
} }

View File

@ -1,16 +1,15 @@
use std::str;
use std::sync::Arc; use std::sync::Arc;
use std::{str, thread};
use engines::brave::brave::Brave; use engines::brave::brave::Brave;
use engines::engine_base::engine_base::{ResultsCollector, SearchResult};
use futures::lock::Mutex; use futures::lock::Mutex;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use reqwest::ClientBuilder;
use rocket::response::content::{RawCss, RawHtml}; use rocket::response::content::{RawCss, RawHtml};
use rocket::response::stream::TextStream; use rocket::response::stream::TextStream;
use rocket::time::Instant; use rocket::time::Instant;
use utils::utils::Yieldable; use tokio::sync::mpsc;
use crate::helpers::helpers::run_search;
use crate::static_files::static_files::read_file_contents; use crate::static_files::static_files::read_file_contents;
pub mod client; pub mod client;
@ -52,73 +51,34 @@ fn get_tailwindcss() -> RawCss<&'static str> {
async fn hello<'a>(query: &str) -> RawHtml<TextStream![String]> { async fn hello<'a>(query: &str) -> RawHtml<TextStream![String]> {
let query_box = query.to_string(); let query_box = query.to_string();
let completed_ref = Arc::new(Mutex::new(false)); let mut first_result_yielded = false;
let completed_ref_writer = completed_ref.clone(); let first_result_start = Instant::now();
let brave_ref = Arc::new(Mutex::new(Brave::new()));
let brave_ref_writer = brave_ref.clone(); let (tx, mut rx) = mpsc::channel::<SearchResult>(16);
let mut brave_first_result_has_yielded = false;
let brave_first_result_start = Instant::now();
let client = Arc::new(Box::new(
ClientBuilder::new().user_agent(USER_AGENT).build().unwrap(),
));
let client_ref = client.clone();
tokio::spawn(async move { tokio::spawn(async move {
let request = client_ref let mut brave = Brave::new();
.get(format!("https://search.brave.com/search?q={}", query_box))
.send();
run_search(request, brave_ref_writer).await; brave.search(&query_box, tx).await;
let mut completed = completed_ref_writer.lock().await;
*completed = true;
}); });
let mut current_index = 0;
RawHtml(TextStream! { RawHtml(TextStream! {
yield HTML_BEGINNING.to_string(); yield HTML_BEGINNING.to_string();
loop { while let Some(result) = rx.recv().await {
let brave = brave_ref.lock().await; if !first_result_yielded {
let diff = first_result_start.elapsed().whole_milliseconds();
let len = brave.results.len(); first_result_yielded = true;
if len == 0 {
drop(brave);
tokio::task::yield_now().await;
continue
}
let completed = completed_ref.lock().await;
if *completed && current_index == len - 1 {
break
}
drop(completed);
if !brave_first_result_has_yielded {
let diff = brave_first_result_start.elapsed().whole_milliseconds();
brave_first_result_has_yielded = true;
yield format!("<strong>Time taken: {}ms</strong>", diff); yield format!("<strong>Time taken: {}ms</strong>", diff);
} }
for ii in (current_index + 1)..len { let text = format!("<li><h1>{}</h1><p>{}</p></li>", &result.title, &result.description);
let result = brave.results.get(ii).unwrap();
let text = format!("<li><h1>{}</h1><p>{}</p></li>", &result.title, &result.description); yield text.to_string();
yield text.to_string();
}
drop(brave);
tokio::task::yield_now().await;
// [1] -> 0
// 1 -> [1]
current_index = len - 1;
} }
let diff = brave_first_result_start.elapsed().whole_milliseconds(); let diff = first_result_start.elapsed().whole_milliseconds();
yield format!("<strong>End taken: {}ms</strong>", diff); yield format!("<strong>End taken: {}ms</strong>", diff);
yield HTML_END.to_string(); yield HTML_END.to_string();
}) })