1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
//! Mail

mod client;
mod event;
mod metadata;
pub mod store;

use anyhow::Result;
use futures::{
    future::FutureExt,
    stream::{Stream, StreamExt},
};
use notify_rust::{Notification, Timeout};
use panorama_imap::{
    client::{
        auth::{self, Auth},
        ClientBuilder, ClientConfig,
    },
    command::Command as ImapCommand,
    response::{AttributeValue, Envelope, MailboxData, Response},
};
use tokio::{
    sync::mpsc::{UnboundedReceiver, UnboundedSender},
    task::JoinHandle,
};
use tokio_stream::wrappers::WatchStream;

use crate::config::{Config, ConfigWatcher, ImapAuth, MailAccountConfig, TlsMethod};

pub use self::event::MailEvent;
pub use self::metadata::EmailMetadata;
pub use self::store::MailStore;

/// Command sent to the mail thread by something else (i.e. UI)
#[derive(Debug)]
#[non_exhaustive]
pub enum MailCommand {
    /// Refresh the list
    Refresh,

    /// Send a raw command
    Raw(ImapCommand),
}

/// Main entrypoint for the mail listener.
pub async fn run_mail(
    mail_store: MailStore,
    mut config_watcher: ConfigWatcher,
    ui2mail_rx: UnboundedReceiver<MailCommand>,
    mail2ui_tx: UnboundedSender<MailEvent>,
) -> Result<()> {
    let mut curr_conn: Vec<JoinHandle<_>> = Vec::new();

    // let mut config_watcher = WatchStream::new(config_watcher);
    loop {
        debug!("listening for configs");
        let config: Config = match config_watcher.changed().await {
            Ok(_) => config_watcher.borrow().clone(),
            _ => break,
        };
        debug!("got");

        // TODO: gracefully shut down connection
        // just gonna drop the connection for now
        // FUTURE TODO: possible to hash the connections and only reconn the ones that changed
        debug!("dropping all connections...");
        for conn in curr_conn.drain(0..) {
            conn.abort();
        }

        for (acct_name, acct) in config.mail_accounts.clone().into_iter() {
            let mail2ui_tx = mail2ui_tx.clone();
            let mail_store = mail_store.clone();
            let config2 = config.clone();
            let handle = tokio::spawn(async move {
                // debug!("opening imap connection for {:?}", acct);

                // this loop is to make sure accounts are restarted on error
                loop {
                    match client::sync_main(
                        config2.clone(),
                        &acct_name,
                        acct.clone(),
                        mail2ui_tx.clone(),
                        mail_store.clone(),
                    )
                    .await
                    {
                        Ok(_) => {}
                        Err(err) => {
                            error!("error from sync_main: {}", err);
                            for err in err.chain() {
                                error!("cause: {}", err);
                            }
                        }
                    }

                    warn!("connection dropped, retrying");

                    // wait a bit so we're not hitting the server really fast if the fail happens
                    // early on
                    //
                    // TODO: some kind of smart exponential backoff that considers some time
                    // threshold to be a failing case?
                    tokio::time::sleep(std::time::Duration::from_secs(5)).await;
                }
            });
            curr_conn.push(handle);
        }
    }

    Ok(())
}