mas_tasks/
user.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7use anyhow::Context;
8use async_trait::async_trait;
9use mas_storage::{
10    RepositoryAccess,
11    compat::CompatSessionFilter,
12    oauth2::OAuth2SessionFilter,
13    queue::{DeactivateUserJob, ReactivateUserJob},
14    user::{BrowserSessionFilter, UserEmailFilter, UserRepository},
15};
16use tracing::info;
17
18use crate::{
19    State,
20    new_queue::{JobContext, JobError, RunnableJob},
21};
22
23/// Job to deactivate a user, both locally and on the Matrix homeserver.
24#[async_trait]
25impl RunnableJob for DeactivateUserJob {
26    #[tracing::instrument(
27    name = "job.deactivate_user"
28        fields(user.id = %self.user_id(), erase = %self.hs_erase()),
29        skip_all,
30    )]
31    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
32        let clock = state.clock();
33        let matrix = state.matrix_connection();
34        let mut repo = state.repository().await.map_err(JobError::retry)?;
35
36        let user = repo
37            .user()
38            .lookup(self.user_id())
39            .await
40            .map_err(JobError::retry)?
41            .context("User not found")
42            .map_err(JobError::fail)?;
43
44        // Let's first lock & deactivate the user
45        let user = repo
46            .user()
47            .lock(&clock, user)
48            .await
49            .context("Failed to lock user")
50            .map_err(JobError::retry)?;
51
52        let user = repo
53            .user()
54            .deactivate(&clock, user)
55            .await
56            .context("Failed to deactivate user")
57            .map_err(JobError::retry)?;
58
59        // Kill all sessions for the user
60        let n = repo
61            .browser_session()
62            .finish_bulk(
63                &clock,
64                BrowserSessionFilter::new().for_user(&user).active_only(),
65            )
66            .await
67            .map_err(JobError::retry)?;
68        info!(affected = n, "Killed all browser sessions for user");
69
70        let n = repo
71            .oauth2_session()
72            .finish_bulk(
73                &clock,
74                OAuth2SessionFilter::new().for_user(&user).active_only(),
75            )
76            .await
77            .map_err(JobError::retry)?;
78        info!(affected = n, "Killed all OAuth 2.0 sessions for user");
79
80        let n = repo
81            .compat_session()
82            .finish_bulk(
83                &clock,
84                CompatSessionFilter::new().for_user(&user).active_only(),
85            )
86            .await
87            .map_err(JobError::retry)?;
88        info!(affected = n, "Killed all compatibility sessions for user");
89
90        // Delete all the email addresses for the user
91        let n = repo
92            .user_email()
93            .remove_bulk(UserEmailFilter::new().for_user(&user))
94            .await
95            .map_err(JobError::retry)?;
96        info!(affected = n, "Removed all email addresses for user");
97
98        // Before calling back to the homeserver, commit the changes to the database, as
99        // we want the user to be locked out as soon as possible
100        repo.save().await.map_err(JobError::retry)?;
101
102        let mxid = matrix.mxid(&user.username);
103        info!("Deactivating user {} on homeserver", mxid);
104        matrix
105            .delete_user(&mxid, self.hs_erase())
106            .await
107            .map_err(JobError::retry)?;
108
109        Ok(())
110    }
111}
112
113/// Job to reactivate a user, both locally and on the Matrix homeserver.
114#[async_trait]
115impl RunnableJob for ReactivateUserJob {
116    #[tracing::instrument(
117        name = "job.reactivate_user",
118        fields(user.id = %self.user_id()),
119        skip_all,
120    )]
121    async fn run(&self, state: &State, _context: JobContext) -> Result<(), JobError> {
122        let matrix = state.matrix_connection();
123        let mut repo = state.repository().await.map_err(JobError::retry)?;
124
125        let user = repo
126            .user()
127            .lookup(self.user_id())
128            .await
129            .map_err(JobError::retry)?
130            .context("User not found")
131            .map_err(JobError::fail)?;
132
133        let mxid = matrix.mxid(&user.username);
134        info!("Reactivating user {} on homeserver", mxid);
135        matrix
136            .reactivate_user(&mxid)
137            .await
138            .map_err(JobError::retry)?;
139
140        // We want to unlock the user from our side only once it has been reactivated on
141        // the homeserver
142        let _user = repo.user().unlock(user).await.map_err(JobError::retry)?;
143        repo.save().await.map_err(JobError::retry)?;
144
145        Ok(())
146    }
147}