-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
client.rs
165 lines (147 loc) · 4.81 KB
/
client.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
use thiserror::Error;
use tonic::{Code, Status};
use tracing::info;
use self::proto::turbod_client::TurbodClient;
use super::{
connector::{DaemonConnector, DaemonConnectorError},
endpoint::SocketOpenError,
};
use crate::get_version;
pub mod proto {
tonic::include_proto!("turbodprotocol");
}
#[derive(Debug)]
pub struct DaemonClient<T> {
client: TurbodClient<tonic::transport::Channel>,
connect_settings: T,
}
impl<T> DaemonClient<T> {
/// Interrogate the server for its version.
pub(super) async fn handshake(&mut self) -> Result<(), DaemonError> {
let _ret = self
.client
.hello(proto::HelloRequest {
version: get_version().to_string(),
// todo(arlyon): add session id
..Default::default()
})
.await?;
Ok(())
}
/// Stops the daemon and closes the connection, returning
/// the connection settings that were used to connect.
pub async fn stop(mut self) -> Result<T, DaemonError> {
info!("Stopping daemon");
self.client.shutdown(proto::ShutdownRequest {}).await?;
Ok(self.connect_settings)
}
}
impl DaemonClient<()> {
pub fn new(client: TurbodClient<tonic::transport::Channel>) -> Self {
Self {
client,
connect_settings: (),
}
}
/// Augment the client with the connect settings, allowing it to be
/// restarted.
pub fn with_connect_settings(
self,
connect_settings: DaemonConnector,
) -> DaemonClient<DaemonConnector> {
DaemonClient {
client: self.client,
connect_settings,
}
}
}
impl DaemonClient<DaemonConnector> {
/// Stops the daemon, closes the connection, and opens a new connection.
pub async fn restart(self) -> Result<DaemonClient<DaemonConnector>, DaemonError> {
self.stop().await?.connect().await.map_err(Into::into)
}
#[allow(dead_code)]
pub async fn get_changed_outputs(
&mut self,
hash: String,
output_globs: Vec<String>,
) -> Result<Vec<String>, DaemonError> {
Ok(self
.client
.get_changed_outputs(proto::GetChangedOutputsRequest { hash, output_globs })
.await?
.into_inner()
.changed_output_globs)
}
#[allow(dead_code)]
pub async fn notify_outputs_written(
&mut self,
hash: String,
output_globs: Vec<String>,
output_exclusion_globs: Vec<String>,
) -> Result<(), DaemonError> {
self.client
.notify_outputs_written(proto::NotifyOutputsWrittenRequest {
hash,
output_globs,
output_exclusion_globs,
})
.await?;
Ok(())
}
/// Get the status of the daemon.
pub async fn status(&mut self) -> Result<proto::DaemonStatus, DaemonError> {
self.client
.status(proto::StatusRequest {})
.await?
.into_inner()
.daemon_status
.ok_or(DaemonError::MalformedResponse)
}
pub fn pid_file(&self) -> &turbopath::AbsoluteSystemPathBuf {
&self.connect_settings.pid_file
}
pub fn sock_file(&self) -> &turbopath::AbsoluteSystemPathBuf {
&self.connect_settings.sock_file
}
}
#[derive(Error, Debug)]
pub enum DaemonError {
/// The server was connected but is now unavailable.
#[error("server is unavailable")]
Unavailable,
#[error("error opening socket: {0}")]
SocketOpen(#[from] SocketOpenError),
/// The server is running a different version of turborepo.
#[error("version mismatch")]
VersionMismatch,
/// There is an issue with the underlying grpc transport.
#[error("bad grpc transport: {0}")]
GrpcTransport(#[from] tonic::transport::Error),
/// The daemon returned an unexpected status code.
#[error("bad grpc status code: {0}")]
GrpcFailure(tonic::Code),
/// The daemon returned a malformed response.
#[error("malformed response")]
MalformedResponse,
/// There was an issue connecting to the daemon.
#[error("unable to connect: {0}")]
DaemonConnect(#[from] DaemonConnectorError),
/// The timeout specified was invalid.
#[error("invalid timeout specified ({0})")]
InvalidTimeout(String),
/// The server is unable to start file watching.
#[error("unable to start file watching")]
FileWatching(#[from] globwatch::Error),
#[error("unable to display output: {0}")]
DisplayError(#[from] serde_json::Error),
}
impl From<Status> for DaemonError {
fn from(status: Status) -> DaemonError {
match status.code() {
Code::FailedPrecondition => DaemonError::VersionMismatch,
Code::Unavailable => DaemonError::Unavailable,
c => DaemonError::GrpcFailure(c),
}
}
}