-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.rs
378 lines (313 loc) · 10.4 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
extern crate serde_json;
extern crate serde;
extern crate regex;
extern crate docopt;
extern crate rustc_serialize;
extern crate env_logger;
extern crate ansi_term;
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate log;
extern crate sonic;
extern crate rpassword;
extern crate pbr;
mod util {
#[cfg(feature = "serde_macros")]
include!("util.rs.in");
#[cfg(not(feature = "serde_macros"))]
include!(concat!(env!("OUT_DIR"), "/util.rs"));
}
use std::path::PathBuf;
use std::process;
use std::io::{Write, stderr, stdout};
use std::cell::RefCell;
use docopt::Docopt;
use pbr::ProgressBar;
use sonic::{Authenticate, SonicMessage};
use rpassword::read_password;
#[cfg_attr(rustfmt, rustfmt_skip)]
const USAGE: &'static str = "
.
d8b
Y8P
.d8888b .d88b. 88888b. 888 .d8888b
88K d88\"\"88b 888 \"88b 888 d88P\"
\"Y8888b. 888 888 888 888 888 888
X88 Y88..88P 888 888 888 Y88b.
88888P' \"Y88P\" 888 888 888 \"Y8888P
Usage:
sonic <source> [options] -e <query>
sonic <source> [options] -f <file>
sonic login [options]
sonic -h | --help
sonic --version
Options:
-e, --execute Run query literal
-f, --file Run file contents as query
-c <config> Override default configuration file ($HOME/.sonicrc)
-d <foo=var> ... Replace variable in query in the form of `${foo}` with value `var`
-r, --rows-only Skip printing column names
-S, --silent Disable progress bar
-v, --verbose Enable debug level logging
-h, --help Print this message
--version Print version
";
static VERSION: &'static str = env!("CARGO_PKG_VERSION");
static COMMIT: Option<&'static str> = option_env!("SONIC_COMMIT");
#[derive(Debug, RustcDecodable)]
struct Args {
arg_file: String,
arg_query: String,
flag_silent: bool,
flag_rows_only: bool,
flag_d: Vec<String>,
flag_file: bool,
flag_c: String,
flag_verbose: bool,
arg_source: String,
cmd_login: bool,
flag_execute: bool,
flag_version: bool,
flag_help: bool,
}
error_chain! {
types {
Error, ErrorKind, ChainErr, Result;
}
links {
sonic::Error, sonic::ErrorKind, SonicdError;
}
foreign_links {
::std::io::Error, IoError;
::serde_json::Error, Json;
::std::sync::mpsc::RecvError, RecvError;
}
errors {
InvalidFormat(co: String, msg: String) {
description("invalid format error")
display("invalid format {}: {}", co, msg)
}
}
}
fn hide<T: Write>(pb: &mut ProgressBar<T>) {
pb.show_bar = false;
pb.show_speed = false;
pb.show_percent = false;
pb.show_counter = false;
pb.show_time_left = false;
pb.show_tick = false;
pb.show_message = false;
}
fn show<T: Write>(pb: &mut ProgressBar<T>) {
pb.show_bar = true;
pb.show_speed = false;
pb.show_percent = true;
pb.show_counter = true;
pb.show_time_left = true;
pb.show_tick = true;
pb.show_message = true;
}
fn exec(
host: &str,
port: &u16,
query: SonicMessage,
rows_only: bool,
silent: bool
) -> Result<()> {
let out = RefCell::new(stdout());
let mut buf: Vec<SonicMessage> = Vec::new();
let mut pb = ProgressBar::on(stderr(), 0);
pb.format("╢░░_╟");
pb.tick_format("▀▐▄▌");
show(&mut pb);
let (tx, rx) = ::std::sync::mpsc::channel();
try!(sonic::stream((host, *port), query, tx));
let draw = |b: &mut Vec<SonicMessage>| {
let len = b.len();
for msg in b.drain(..len) {
let cols = match msg {
SonicMessage::OutputChunk(data) => {
data.iter().fold(String::new(),
|acc, val| format!("{}{:?}\t", acc, val))
}
SonicMessage::TypeMetadata(data) => {
data.iter().fold(String::new(),
|acc, col| format!("{}{:?}\t", acc, col.0))
}
_ => panic!("not possible!"),
};
let row = format!("{}\n", cols.trim_right());
let mut bout = out.borrow_mut();
bout.write_all(row.as_bytes()).unwrap();
bout.flush().unwrap();
}
};
let res: Result<()>;
loop {
match try!(rx.recv()) {
Ok(SonicMessage::StreamStarted(trace_id)) => {
debug!("started query with trace_id: {}", trace_id);
}
Ok(msg @ SonicMessage::TypeMetadata(_)) => {
if !rows_only {
buf.push(msg);
}
}
Ok(msg @ SonicMessage::OutputChunk(_)) => {
buf.push(msg);
if !silent && !pb.is_finish {
// tick format breaks suffix length
// so we need to disable it before finishing bar
hide(&mut pb);
pb.tick();
pb.finish();
}
draw(&mut buf);
}
Ok(SonicMessage::QueryProgress { status, progress, total, .. }) => {
if !silent && !pb.is_finish {
pb.message(&format!("{:?} ", status));
debug!("{:?}: {:?}/{:?}", status, progress, total);
if let Some(total) = total {
pb.total = total as u64;
}
if progress >= 1.0 {
pb.add(progress as u64);
} else {
pb.tick();
};
}
}
Ok(SonicMessage::StreamCompleted(None, trace_id)) => {
debug!("stream '{}' completed successfully", trace_id);
res = Ok(());
break;
}
Ok(SonicMessage::StreamCompleted(Some(cause), trace_id)) => {
debug!("stream '{}' failed with error {:?}", &cause, trace_id);
let error: Error = cause.into();
res = Err(error).chain_err(|| {
format!("error when running query; trace_id: {}", trace_id)
});
break;
}
Err(e) => {
let error: Error = e.into();
res = Err(error).chain_err(|| "unexpected channel recv error");
break;
}
Ok(a) => debug!("ignoring msg {:?}", a),
}
}
draw(&mut buf);
res
}
pub fn login(host: &str, tcp_port: &u16) -> Result<()> {
let user = std::env::var("USER").unwrap_or("unknown".to_owned());
try!(stdout().write(b"Enter key: "));
try!(stdout().flush());
let key = try!(read_password());
let (tx, rx) = ::std::sync::mpsc::channel();
let cmd = SonicMessage::AuthenticateMsg(Authenticate::new(user.to_owned(),
key,
None));
try!(sonic::stream((host, *tcp_port), cmd, tx));
let token: String;
loop {
match try!(rx.recv()) {
Ok(SonicMessage::OutputChunk(data)) => {
token = try!(util::parse_token(data));
break;
}
Ok(SonicMessage::StreamCompleted(None, trace_id)) => {
return Err(format!("protocol error: server returned no data \
for '{}'",
trace_id)
.into());
}
Ok(SonicMessage::StreamCompleted(Some(cause), trace_id)) => {
let error: Error = cause.into();
return Err(error).chain_err(|| {
format!("error when running login command; trace_id: {}",
trace_id)
});
}
Ok(_) => {}
Err(e) => return Err(e.into()),
};
}
debug!("generated token for user {:?}: {:?}", &user, &token);
let path = util::get_config_path();
let config = try!(util::read_config(&path));
let new_config = util::ClientConfig {
auth: Some(token),
..config
};
try!(util::write_config(&new_config, &path));
println!("OK");
Ok(())
}
fn _main(args: Args) -> Result<()> {
let Args { arg_file,
arg_query,
flag_silent,
flag_rows_only,
flag_d,
flag_file,
flag_c,
arg_source,
cmd_login,
flag_execute,
flag_version,
.. } = args;
let util::ClientConfig { host, tcp_port, sources, auth } = if flag_c !=
"" {
debug!("sourcing passed config in path '{:?}'", &flag_c);
try!(util::read_config(&PathBuf::from(flag_c)))
} else {
debug!("sourcing default config in path '$HOME/.sonicrc'");
try!(util::get_default_config())
};
if flag_file {
let query_str =
try!(util::read_file_contents(&PathBuf::from(&arg_file)));
let split = try!(util::split_key_value(&flag_d));
let injected = try!(util::inject_vars(&query_str, &split));
let query = try!(util::build(arg_source, sources, auth, injected));
exec(&host, &tcp_port, query, flag_rows_only, flag_silent)
} else if flag_execute {
let query = try!(util::build(arg_source, sources, auth, arg_query));
exec(&host, &tcp_port, query, flag_rows_only, flag_silent)
} else if cmd_login {
login(&host, &tcp_port)
} else if flag_version {
println!("Sonic CLI version {} ({})",
VERSION,
COMMIT.unwrap_or_else(|| "dev"));
Ok(())
} else {
panic!("unexpected args");
}
}
fn main() {
let args: Args = Docopt::new(USAGE)
.and_then(|d| d.decode())
.unwrap_or_else(|e| e.exit());
let verbose: bool = args.flag_verbose;
if verbose {
let mut builder = env_logger::LogBuilder::new();
builder.parse("debug");
builder.init().unwrap();
} else {
env_logger::init().unwrap();
}
debug!("parsed args {:?}", args);
match _main(args) {
Ok(_) => {}
Err(e) => {
util::report_error(&e, verbose);
process::exit(1)
}
}
}