1use anyhow::Context;
8use async_trait::async_trait;
9use mas_email::{Address, Mailbox};
10use mas_i18n::DataLocale;
11use mas_storage::{
12 Pagination, RepositoryAccess,
13 queue::SendAccountRecoveryEmailsJob,
14 user::{UserEmailFilter, UserRecoveryRepository},
15};
16use mas_templates::{EmailRecoveryContext, TemplateContext};
17use rand::distributions::{Alphanumeric, DistString};
18use tracing::{error, info};
19
20use crate::{
21 State,
22 new_queue::{JobContext, JobError, RunnableJob},
23};
24
25#[async_trait]
27impl RunnableJob for SendAccountRecoveryEmailsJob {
28 #[tracing::instrument(
29 name = "job.send_account_recovery_email",
30 fields(
31 user_recovery_session.id = %self.user_recovery_session_id(),
32 user_recovery_session.email,
33 ),
34 skip_all,
35 )]
36 async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
37 let clock = state.clock();
38 let mailer = state.mailer();
39 let url_builder = state.url_builder();
40 let mut rng = state.rng();
41 let mut repo = state.repository().await.map_err(JobError::retry)?;
42
43 let session = repo
44 .user_recovery()
45 .lookup_session(self.user_recovery_session_id())
46 .await
47 .map_err(JobError::retry)?
48 .context("User recovery session not found")
49 .map_err(JobError::fail)?;
50
51 tracing::Span::current().record("user_recovery_session.email", &session.email);
52
53 if session.consumed_at.is_some() {
54 info!("Recovery session already consumed, not sending email");
55 return Ok(());
56 }
57
58 let mut cursor = Pagination::first(50);
59
60 let lang: DataLocale = session
61 .locale
62 .parse()
63 .context("Invalid locale in database on recovery session")
64 .map_err(JobError::fail)?;
65
66 loop {
67 let page = repo
68 .user_email()
69 .list(UserEmailFilter::new().for_email(&session.email), cursor)
70 .await
71 .map_err(JobError::retry)?;
72
73 for email in page.edges {
74 let ticket = Alphanumeric.sample_string(&mut rng, 32);
75
76 let ticket = repo
77 .user_recovery()
78 .add_ticket(&mut rng, &clock, &session, &email, ticket)
79 .await
80 .map_err(JobError::retry)?;
81
82 let user_email = repo
83 .user_email()
84 .lookup(email.id)
85 .await
86 .map_err(JobError::retry)?
87 .context("User email not found")
88 .map_err(JobError::fail)?;
89
90 let user = repo
91 .user()
92 .lookup(user_email.user_id)
93 .await
94 .map_err(JobError::retry)?
95 .context("User not found")
96 .map_err(JobError::fail)?;
97
98 let url = url_builder.account_recovery_link(ticket.ticket);
99
100 let address: Address = user_email.email.parse().map_err(JobError::fail)?;
101 let mailbox = Mailbox::new(Some(user.username.clone()), address);
102
103 info!("Sending recovery email to {}", mailbox);
104 let context = EmailRecoveryContext::new(user, session.clone(), url)
105 .with_language(lang.clone());
106
107 if let Err(e) = mailer.send_recovery_email(mailbox, &context).await {
109 error!(
110 error = &e as &dyn std::error::Error,
111 "Failed to send recovery email"
112 );
113 }
114
115 cursor = cursor.after(email.id);
116 }
117
118 if !page.has_next_page {
119 break;
120 }
121 }
122
123 repo.save().await.map_err(JobError::fail)?;
124
125 Ok(())
126 }
127}