mas_tasks/
recovery.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use anyhow::Context;
8use async_trait::async_trait;
9use mas_email::{Address, Mailbox};
10use mas_i18n::DataLocale;
11use mas_storage::{
12    Pagination, RepositoryAccess,
13    queue::SendAccountRecoveryEmailsJob,
14    user::{UserEmailFilter, UserRecoveryRepository},
15};
16use mas_templates::{EmailRecoveryContext, TemplateContext};
17use rand::distributions::{Alphanumeric, DistString};
18use tracing::{error, info};
19
20use crate::{
21    State,
22    new_queue::{JobContext, JobError, RunnableJob},
23};
24
25/// Job to send account recovery emails for a given recovery session.
26#[async_trait]
27impl RunnableJob for SendAccountRecoveryEmailsJob {
28    #[tracing::instrument(
29        name = "job.send_account_recovery_email",
30        fields(
31            user_recovery_session.id = %self.user_recovery_session_id(),
32            user_recovery_session.email,
33        ),
34        skip_all,
35    )]
36    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
37        let clock = state.clock();
38        let mailer = state.mailer();
39        let url_builder = state.url_builder();
40        let mut rng = state.rng();
41        let mut repo = state.repository().await.map_err(JobError::retry)?;
42
43        let session = repo
44            .user_recovery()
45            .lookup_session(self.user_recovery_session_id())
46            .await
47            .map_err(JobError::retry)?
48            .context("User recovery session not found")
49            .map_err(JobError::fail)?;
50
51        tracing::Span::current().record("user_recovery_session.email", &session.email);
52
53        if session.consumed_at.is_some() {
54            info!("Recovery session already consumed, not sending email");
55            return Ok(());
56        }
57
58        let mut cursor = Pagination::first(50);
59
60        let lang: DataLocale = session
61            .locale
62            .parse()
63            .context("Invalid locale in database on recovery session")
64            .map_err(JobError::fail)?;
65
66        loop {
67            let page = repo
68                .user_email()
69                .list(UserEmailFilter::new().for_email(&session.email), cursor)
70                .await
71                .map_err(JobError::retry)?;
72
73            for email in page.edges {
74                let ticket = Alphanumeric.sample_string(&mut rng, 32);
75
76                let ticket = repo
77                    .user_recovery()
78                    .add_ticket(&mut rng, &clock, &session, &email, ticket)
79                    .await
80                    .map_err(JobError::retry)?;
81
82                let user_email = repo
83                    .user_email()
84                    .lookup(email.id)
85                    .await
86                    .map_err(JobError::retry)?
87                    .context("User email not found")
88                    .map_err(JobError::fail)?;
89
90                let user = repo
91                    .user()
92                    .lookup(user_email.user_id)
93                    .await
94                    .map_err(JobError::retry)?
95                    .context("User not found")
96                    .map_err(JobError::fail)?;
97
98                let url = url_builder.account_recovery_link(ticket.ticket);
99
100                let address: Address = user_email.email.parse().map_err(JobError::fail)?;
101                let mailbox = Mailbox::new(Some(user.username.clone()), address);
102
103                info!("Sending recovery email to {}", mailbox);
104                let context = EmailRecoveryContext::new(user, session.clone(), url)
105                    .with_language(lang.clone());
106
107                // XXX: we only log if the email fails to send, to avoid stopping the loop
108                if let Err(e) = mailer.send_recovery_email(mailbox, &context).await {
109                    error!(
110                        error = &e as &dyn std::error::Error,
111                        "Failed to send recovery email"
112                    );
113                }
114
115                cursor = cursor.after(email.id);
116            }
117
118            if !page.has_next_page {
119                break;
120            }
121        }
122
123        repo.save().await.map_err(JobError::fail)?;
124
125        Ok(())
126    }
127}