1use std::sync::{Arc, LazyLock, atomic::AtomicU32};
7
8use arc_swap::ArcSwap;
9use opentelemetry::{
10 KeyValue,
11 metrics::{Counter, Gauge},
12};
13
14use crate::telemetry::METER;
15
16pub static APPROX_TOTAL_GAUGE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
19 METER
20 .u64_gauge("syn2mas.entity.approx_total")
21 .with_description("Approximate number of entities of this type to be migrated")
22 .build()
23});
24
25pub static MIGRATED_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
28 METER
29 .u64_counter("syn2mas.entity.migrated")
30 .with_description("Number of entities of this type that have been migrated so far")
31 .build()
32});
33
34pub static SKIPPED_COUNTER: LazyLock<Counter<u64>> = LazyLock::new(|| {
37 METER
38 .u64_counter("syn2mas.entity.skipped")
39 .with_description("Number of entities of this type that have been skipped so far")
40 .build()
41});
42
43#[derive(Debug, Clone, Copy)]
45pub enum EntityType {
46 Users,
48
49 Devices,
51
52 ThreePids,
54
55 ExternalIds,
57
58 NonRefreshableAccessTokens,
60
61 RefreshableTokens,
63}
64
65impl std::fmt::Display for EntityType {
66 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 write!(f, "{}", self.name())
68 }
69}
70
71impl EntityType {
72 pub const fn name(self) -> &'static str {
73 match self {
74 Self::Users => "users",
75 Self::Devices => "devices",
76 Self::ThreePids => "threepids",
77 Self::ExternalIds => "external_ids",
78 Self::NonRefreshableAccessTokens => "nonrefreshable_access_tokens",
79 Self::RefreshableTokens => "refreshable_tokens",
80 }
81 }
82
83 pub fn as_kv(self) -> KeyValue {
84 KeyValue::new("entity", self.name())
85 }
86}
87
88#[derive(Clone)]
93pub struct Progress {
94 current_stage: Arc<ArcSwap<ProgressStage>>,
95}
96
97#[derive(Clone)]
98pub struct ProgressCounter {
99 inner: Arc<ProgressCounterInner>,
100}
101
102struct ProgressCounterInner {
103 kv: [KeyValue; 1],
104 migrated: AtomicU32,
105 skipped: AtomicU32,
106}
107
108impl ProgressCounter {
109 fn new(entity: EntityType) -> Self {
110 Self {
111 inner: Arc::new(ProgressCounterInner {
112 kv: [entity.as_kv()],
113 migrated: AtomicU32::new(0),
114 skipped: AtomicU32::new(0),
115 }),
116 }
117 }
118
119 pub fn increment_migrated(&self) {
120 MIGRATED_COUNTER.add(1, &self.inner.kv);
121 self.inner
122 .migrated
123 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
124 }
125
126 pub fn increment_skipped(&self) {
127 SKIPPED_COUNTER.add(1, &self.inner.kv);
128 self.inner
129 .skipped
130 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
131 }
132
133 #[must_use]
134 pub fn migrated(&self) -> u32 {
135 self.inner
136 .migrated
137 .load(std::sync::atomic::Ordering::Relaxed)
138 }
139
140 #[must_use]
141 pub fn skipped(&self) -> u32 {
142 self.inner
143 .skipped
144 .load(std::sync::atomic::Ordering::Relaxed)
145 }
146}
147
148impl Progress {
149 #[must_use]
150 pub fn migrating_data(&self, entity: EntityType, approx_count: usize) -> ProgressCounter {
151 let counter = ProgressCounter::new(entity);
152 APPROX_TOTAL_GAUGE.record(approx_count as u64, &[entity.as_kv()]);
153 self.set_current_stage(ProgressStage::MigratingData {
154 entity,
155 counter: counter.clone(),
156 approx_count: approx_count as u64,
157 });
158 counter
159 }
160
161 pub fn rebuild_index(&self, index_name: String) {
162 self.set_current_stage(ProgressStage::RebuildIndex { index_name });
163 }
164
165 pub fn rebuild_constraint(&self, constraint_name: String) {
166 self.set_current_stage(ProgressStage::RebuildConstraint { constraint_name });
167 }
168
169 #[inline]
175 fn set_current_stage(&self, stage: ProgressStage) {
176 self.current_stage.store(Arc::new(stage));
177 }
178
179 #[inline]
181 #[must_use]
182 pub fn get_current_stage(&self) -> arc_swap::Guard<Arc<ProgressStage>> {
183 self.current_stage.load()
184 }
185}
186
187impl Default for Progress {
188 fn default() -> Self {
189 Self {
190 current_stage: Arc::new(ArcSwap::new(Arc::new(ProgressStage::SettingUp))),
191 }
192 }
193}
194
195pub enum ProgressStage {
196 SettingUp,
197 MigratingData {
198 entity: EntityType,
199 counter: ProgressCounter,
200 approx_count: u64,
201 },
202 RebuildIndex {
203 index_name: String,
204 },
205 RebuildConstraint {
206 constraint_name: String,
207 },
208}