1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
mod client;
mod event;
mod metadata;
pub mod store;
use anyhow::Result;
use futures::{
future::FutureExt,
stream::{Stream, StreamExt},
};
use notify_rust::{Notification, Timeout};
use panorama_imap::{
client::{
auth::{self, Auth},
ClientBuilder, ClientConfig,
},
command::Command as ImapCommand,
response::{AttributeValue, Envelope, MailboxData, Response},
};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender},
task::JoinHandle,
};
use tokio_stream::wrappers::WatchStream;
use crate::config::{Config, ConfigWatcher, ImapAuth, MailAccountConfig, TlsMethod};
pub use self::event::MailEvent;
pub use self::metadata::EmailMetadata;
pub use self::store::MailStore;
#[derive(Debug)]
#[non_exhaustive]
pub enum MailCommand {
Refresh,
Raw(ImapCommand),
}
pub async fn run_mail(
mail_store: MailStore,
mut config_watcher: ConfigWatcher,
ui2mail_rx: UnboundedReceiver<MailCommand>,
mail2ui_tx: UnboundedSender<MailEvent>,
) -> Result<()> {
let mut curr_conn: Vec<JoinHandle<_>> = Vec::new();
loop {
debug!("listening for configs");
let config: Config = match config_watcher.changed().await {
Ok(_) => config_watcher.borrow().clone(),
_ => break,
};
debug!("got");
debug!("dropping all connections...");
for conn in curr_conn.drain(0..) {
conn.abort();
}
for (acct_name, acct) in config.mail_accounts.clone().into_iter() {
let mail2ui_tx = mail2ui_tx.clone();
let mail_store = mail_store.clone();
let config2 = config.clone();
let handle = tokio::spawn(async move {
loop {
match client::sync_main(
config2.clone(),
&acct_name,
acct.clone(),
mail2ui_tx.clone(),
mail_store.clone(),
)
.await
{
Ok(_) => {}
Err(err) => {
error!("error from sync_main: {}", err);
for err in err.chain() {
error!("cause: {}", err);
}
}
}
warn!("connection dropped, retrying");
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
curr_conn.push(handle);
}
}
Ok(())
}