syn2mas/
progress.rs

1// Copyright 2025 New Vector Ltd.
2//
3// SPDX-License-Identifier: AGPL-3.0-only
4// Please see LICENSE in the repository root for full details.
5
6use std::sync::{Arc, LazyLock, atomic::AtomicU32};
7
8use arc_swap::ArcSwap;
9use opentelemetry::{
10    KeyValue,
11    metrics::{Counter, Gauge},
12};
13
14use crate::telemetry::METER;
15
16/// A gauge that tracks the approximate number of entities of a given type
17/// that will be migrated.
18pub static APPROX_TOTAL_GAUGE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
19    METER
20        .u64_gauge("syn2mas.entity.approx_total")
21        .with_description("Approximate number of entities of this type to be migrated")
22        .build()
23});
24
25/// A counter that tracks the number of entities of a given type that have
26/// been migrated so far.
27pub static MIGRATED_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
28    METER
29        .u64_counter("syn2mas.entity.migrated")
30        .with_description("Number of entities of this type that have been migrated so far")
31        .build()
32});
33
34/// A counter that tracks the number of entities of a given type that have
35/// been skipped so far.
36pub static SKIPPED_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
37    METER
38        .u64_counter("syn2mas.entity.skipped")
39        .with_description("Number of entities of this type that have been skipped so far")
40        .build()
41});
42
43/// Enum representing the different types of entities that syn2mas can migrate.
44#[derive(Debug, Clone, Copy)]
45pub enum EntityType {
46    /// Represents users
47    Users,
48
49    /// Represents devices
50    Devices,
51
52    /// Represents third-party IDs
53    ThreePids,
54
55    /// Represents external IDs
56    ExternalIds,
57
58    /// Represents non-refreshable access tokens
59    NonRefreshableAccessTokens,
60
61    /// Represents refreshable access tokens
62    RefreshableTokens,
63}
64
65impl std::fmt::Display for EntityType {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "{}", self.name())
68    }
69}
70
71impl EntityType {
72    pub const fn name(self) -> &'static str {
73        match self {
74            Self::Users => "users",
75            Self::Devices => "devices",
76            Self::ThreePids => "threepids",
77            Self::ExternalIds => "external_ids",
78            Self::NonRefreshableAccessTokens => "nonrefreshable_access_tokens",
79            Self::RefreshableTokens => "refreshable_tokens",
80        }
81    }
82
83    pub fn as_kv(self) -> KeyValue {
84        KeyValue::new("entity", self.name())
85    }
86}
87
88/// Tracker for the progress of the migration
89///
90/// Cloning this struct intuitively gives a 'handle' to the same counters,
91/// which means it can be shared between tasks/threads.
92#[derive(Clone)]
93pub struct Progress {
94    current_stage: Arc<ArcSwap<ProgressStage>>,
95}
96
97#[derive(Clone)]
98pub struct ProgressCounter {
99    inner: Arc<ProgressCounterInner>,
100}
101
102struct ProgressCounterInner {
103    kv: [KeyValue; 1],
104    migrated: AtomicU32,
105    skipped: AtomicU32,
106}
107
108impl ProgressCounter {
109    fn new(entity: EntityType) -> Self {
110        Self {
111            inner: Arc::new(ProgressCounterInner {
112                kv: [entity.as_kv()],
113                migrated: AtomicU32::new(0),
114                skipped: AtomicU32::new(0),
115            }),
116        }
117    }
118
119    pub fn increment_migrated(&self) {
120        MIGRATED_COUNTER.add(1, &self.inner.kv);
121        self.inner
122            .migrated
123            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
124    }
125
126    pub fn increment_skipped(&self) {
127        SKIPPED_COUNTER.add(1, &self.inner.kv);
128        self.inner
129            .skipped
130            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
131    }
132
133    #[must_use]
134    pub fn migrated(&self) -> u32 {
135        self.inner
136            .migrated
137            .load(std::sync::atomic::Ordering::Relaxed)
138    }
139
140    #[must_use]
141    pub fn skipped(&self) -> u32 {
142        self.inner
143            .skipped
144            .load(std::sync::atomic::Ordering::Relaxed)
145    }
146}
147
148impl Progress {
149    #[must_use]
150    pub fn migrating_data(&self, entity: EntityType, approx_count: usize) -> ProgressCounter {
151        let counter = ProgressCounter::new(entity);
152        APPROX_TOTAL_GAUGE.record(approx_count as u64, &[entity.as_kv()]);
153        self.set_current_stage(ProgressStage::MigratingData {
154            entity,
155            counter: counter.clone(),
156            approx_count: approx_count as u64,
157        });
158        counter
159    }
160
161    pub fn rebuild_index(&self, index_name: String) {
162        self.set_current_stage(ProgressStage::RebuildIndex { index_name });
163    }
164
165    pub fn rebuild_constraint(&self, constraint_name: String) {
166        self.set_current_stage(ProgressStage::RebuildConstraint { constraint_name });
167    }
168
169    /// Sets the current stage of progress.
170    ///
171    /// This is probably not cheap enough to use for every individual row,
172    /// so use of atomic integers for the fields that will be updated is
173    /// recommended.
174    #[inline]
175    fn set_current_stage(&self, stage: ProgressStage) {
176        self.current_stage.store(Arc::new(stage));
177    }
178
179    /// Returns the current stage of progress.
180    #[inline]
181    #[must_use]
182    pub fn get_current_stage(&self) -> arc_swap::Guard<Arc<ProgressStage>> {
183        self.current_stage.load()
184    }
185}
186
187impl Default for Progress {
188    fn default() -> Self {
189        Self {
190            current_stage: Arc::new(ArcSwap::new(Arc::new(ProgressStage::SettingUp))),
191        }
192    }
193}
194
195pub enum ProgressStage {
196    SettingUp,
197    MigratingData {
198        entity: EntityType,
199        counter: ProgressCounter,
200        approx_count: u64,
201    },
202    RebuildIndex {
203        index_name: String,
204    },
205    RebuildConstraint {
206        constraint_name: String,
207    },
208}