-
Notifications
You must be signed in to change notification settings - Fork 268
/
lib.rs
235 lines (220 loc) · 9.5 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This software may be used and distributed according to the terms of the
* GNU General Public License version 2.
*/
use std::path::Path;
use std::sync::Arc;
use anyhow::Result;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use cached_config::ConfigHandle;
use cached_config::ConfigStore;
use cached_config::ConfigUpdateWatcher;
use cloned::cloned;
use futures::future::join_all;
use metaconfig_parser::config::configerator_config_handle;
use metaconfig_parser::config::load_configs_from_raw;
use metaconfig_parser::RepoConfigs;
use metaconfig_parser::StorageConfigs;
use repos::RawRepoConfigs;
use slog::error;
use slog::info;
use slog::trace;
use slog::Logger;
use stats::prelude::*;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
const LIVENESS_INTERVAL: u64 = 300;
type Swappable<T> = Arc<ArcSwap<T>>;
define_stats! {
prefix = "mononoke.config_refresh";
refresh_failure_count: timeseries(Average, Sum, Count),
liveness_count: timeseries(Average, Sum, Count),
}
/// Configuration provider and update notifier for all of Mononoke services
/// and jobs. The configurations provided by this struct are always up-to-date
/// with its source.
pub struct MononokeConfigs {
repo_configs: Swappable<RepoConfigs>,
storage_configs: Swappable<StorageConfigs>,
update_receivers: Swappable<Vec<Arc<dyn ConfigUpdateReceiver>>>,
maybe_config_updater: Option<JoinHandle<()>>,
maybe_liveness_updater: Option<JoinHandle<()>>,
maybe_config_handle: Option<ConfigHandle<RawRepoConfigs>>,
}
impl MononokeConfigs {
/// Create a new instance of MononokeConfigs with configurations backed via ConfigStore.
/// If the config path points to a dynamic config source (e.g. configerator), this enables
/// auto-refresh of those configurations.
pub fn new(
config_path: impl AsRef<Path>,
config_store: &ConfigStore,
runtime_handle: Handle,
logger: Logger,
) -> Result<Self> {
let storage_configs = metaconfig_parser::load_storage_configs(&config_path, config_store)?;
let storage_configs = Arc::new(ArcSwap::from_pointee(storage_configs));
let repo_configs = metaconfig_parser::load_repo_configs(&config_path, config_store)?;
let repo_configs = Arc::new(ArcSwap::from_pointee(repo_configs));
let update_receivers = Arc::new(ArcSwap::from_pointee(vec![]));
let maybe_config_handle = configerator_config_handle(config_path.as_ref(), config_store)?;
let maybe_config_watcher = maybe_config_handle
.as_ref()
.map(|config_handle| config_handle.watcher())
.transpose()?;
// If we are dynamically updating the config, we need to have a liveness updater process in place.
let maybe_liveness_updater = maybe_config_watcher
.as_ref()
.map(|_| runtime_handle.spawn(liveness_updater()));
// If the configuration is backed by a static source, the config update watcher
// and the config updater handle will be None.
let maybe_config_updater = maybe_config_watcher.map(|config_watcher| {
cloned!(storage_configs, repo_configs, update_receivers);
runtime_handle.spawn(watch_and_update(
repo_configs,
storage_configs,
update_receivers,
config_watcher,
logger,
))
});
Ok(Self {
repo_configs,
storage_configs,
update_receivers,
maybe_config_updater,
maybe_config_handle,
maybe_liveness_updater,
})
}
/// The latest repo configs fetched from the underlying configuration store.
pub fn repo_configs(&self) -> Arc<RepoConfigs> {
// Load full since there could be lots of calls to repo_configs.
self.repo_configs.load_full()
}
/// The latest storage configs fetched from the underlying configuration store.
pub fn storage_configs(&self) -> Arc<StorageConfigs> {
// Load full since there could be lots of calls to storage_configs.
self.storage_configs.load_full()
}
/// Is automatic update of the underlying configuration enabled?
pub fn auto_update_enabled(&self) -> bool {
// If the config updater handle is none, configs won't be updated.
self.maybe_config_updater.is_some()
}
// Config watcher that can be used to get notified of the latest
// changes in the underlying config and to act on it. This is useful
// if the processing to be performed is long running which is not supported
// via ConfigUpdateReceivers
pub fn config_watcher(&self) -> Option<ConfigUpdateWatcher<RawRepoConfigs>> {
self.maybe_config_handle
.as_ref()
.and_then(|config_handle| config_handle.watcher().ok())
}
/// Register an instance of ConfigUpdateReceiver to receive notifications of updates to
/// the underlying configs which can then be used to perform further actions. Note that
/// the operation performed by the ConfigUpdateReceiver should not be too long running.
/// If that's the case, use config_watcher method instead.
pub fn register_for_update(&self, update_receiver: Arc<dyn ConfigUpdateReceiver>) {
let mut update_receivers = Vec::from_iter(self.update_receivers.load().iter().cloned());
update_receivers.push(update_receiver);
self.update_receivers.store(Arc::new(update_receivers));
}
}
impl Drop for MononokeConfigs {
// If MononokeConfigs is getting dropped, then we need to terminate the updater
// process as well.
fn drop(&mut self) {
// If the updater process exists, abort it.
if let Some(updater_handle) = self.maybe_config_updater.as_ref() {
updater_handle.abort();
}
// If the liveness updater process exists, abort it.
if let Some(liveness_updater) = self.maybe_liveness_updater.as_ref() {
liveness_updater.abort();
}
}
}
async fn liveness_updater() {
loop {
STATS::liveness_count.add_value(1);
tokio::time::sleep(tokio::time::Duration::from_secs(LIVENESS_INTERVAL)).await;
}
}
async fn watch_and_update(
repo_configs: Swappable<RepoConfigs>,
storage_configs: Swappable<StorageConfigs>,
update_receivers: Swappable<Vec<Arc<dyn ConfigUpdateReceiver>>>,
mut config_watcher: ConfigUpdateWatcher<RawRepoConfigs>,
logger: Logger,
) {
loop {
match config_watcher.wait_for_next().await {
Ok(raw_repo_configs) => {
info!(
logger,
"Raw Repo Configs changed in config store, applying update"
);
trace!(logger, "Applied configs: {:?}", raw_repo_configs);
match load_configs_from_raw(Arc::unwrap_or_clone(raw_repo_configs)) {
Ok((new_repo_configs, new_storage_configs)) => {
let new_repo_configs = Arc::new(new_repo_configs);
let new_storage_configs = Arc::new(new_storage_configs);
repo_configs.store(new_repo_configs.clone());
storage_configs.store(new_storage_configs.clone());
let receivers = update_receivers.load();
let update_tasks = receivers.iter().map(|receiver| {
receiver
.apply_update(new_repo_configs.clone(), new_storage_configs.clone())
});
if let Err(e) = join_all(update_tasks)
.await
.into_iter()
.collect::<Result<Vec<_>>>()
{
error!(
logger,
"Failure in sending config update to receivers. Error: {:?}", e
);
STATS::refresh_failure_count.add_value(1);
} else {
info!(logger, "Successfully applied config update");
// Need to publish a value of 0 to keep the counter alive
STATS::refresh_failure_count.add_value(0);
}
}
Err(e) => {
error!(
logger,
"Failure in parsing config from raw config. Error: {:?}", e
);
STATS::refresh_failure_count.add_value(1);
}
}
}
Err(e) => {
error!(
logger,
"Failure in fetching latest config change. Error: {:?}", e
);
STATS::refresh_failure_count.add_value(1);
}
}
}
}
/// Trait defining methods related to config update notification. A struct implementing
/// this trait can be configured to receive the most updated config value everytime the
/// underlying config changes.
#[async_trait]
pub trait ConfigUpdateReceiver: Send + Sync {
/// Method containing the logic to be executed when the configuration is updated. This
/// should not be too long running since the config updates will wait for all update
/// receivers before checking for the next config update.
async fn apply_update(
&self,
repo_configs: Arc<RepoConfigs>,
storage_configs: Arc<StorageConfigs>,
) -> Result<()>;
}