athena_cli/commands/inspect/
detail.rs1use super::download::download_from_s3;
2use super::fields::{get_field_value, get_inspect_fields, InspectField};
3use crate::cli::InspectArgs;
4use crate::context::Context;
5use anyhow::Result;
6use aws_sdk_s3;
7use owo_colors::OwoColorize;
8use polars::prelude::*;
9use prettytable::{format, Cell, Row, Table};
10
11pub async fn detail(ctx: &Context, args: &InspectArgs) -> Result<()> {
12 let client = ctx.create_athena_client();
13 let query_id = args.query_id.clone();
14
15 let quiet_mode = args.quiet || ctx.quiet();
17
18 let execution_future = client
20 .get_query_execution()
21 .query_execution_id(&query_id)
22 .send();
23
24 let runtime_stats_future = client
25 .get_query_runtime_statistics()
26 .query_execution_id(&query_id)
27 .send();
28
29 let results_future = if !quiet_mode {
31 Some(
32 client
33 .get_query_results()
34 .query_execution_id(&query_id)
35 .max_results(11) .send(),
37 )
38 } else {
39 None
40 };
41
42 let (execution_result, runtime_stats_result, results_result) =
43 tokio::join!(execution_future, runtime_stats_future, async {
44 if let Some(fut) = results_future {
45 Some(fut.await)
46 } else {
47 None
48 }
49 });
50
51 let execution_output = execution_result?;
53 let execution = execution_output
54 .query_execution()
55 .ok_or_else(|| anyhow::anyhow!("No query execution found with ID: {}", query_id))?;
56
57 let is_succeeded = execution
59 .status()
60 .and_then(|status| status.state())
61 .map(|state| state.as_str() == "SUCCEEDED")
62 .unwrap_or(false);
63
64 let total_rows: Option<i64> = if is_succeeded {
65 runtime_stats_result.ok().and_then(|output| {
66 output
67 .query_runtime_statistics()
68 .and_then(|stats| stats.rows())
69 .and_then(|rows| rows.output_rows())
70 })
71 } else {
72 None
73 };
74
75 if !quiet_mode {
76 println!("\n{}", "Query Execution Details".bold());
77 println!("ID: {}\n", query_id.bright_green());
78 let mut table = Table::new();
80
81 table.set_format(*format::consts::FORMAT_CLEAN); let fields = get_inspect_fields();
86
87 table.add_row(Row::new(vec![
89 Cell::new("Field").style_spec("Fb"), Cell::new("Value").style_spec("Fb"), ]));
92
93 for field in fields {
95 let value = if field == InspectField::TotalRows {
96 total_rows
97 .map(|c| c.to_string())
98 .unwrap_or_else(|| "N/A".to_string())
99 } else {
100 get_field_value(execution, field)
101 };
102
103 let formatted_value = match field {
104 InspectField::Status => match value.as_str() {
105 "SUCCEEDED" => value.bright_green().to_string(),
106 "FAILED" => value.bright_red().to_string(),
107 _ => value.yellow().to_string(),
108 },
109 InspectField::DataScanned => value.bright_cyan().to_string(),
110 InspectField::TotalRows => value.bright_magenta().to_string(),
111 _ => value,
112 };
113
114 table.add_row(Row::new(vec![
115 Cell::new(&field.to_string()).style_spec("Fb"), Cell::new(&formatted_value),
117 ]));
118 }
119
120 table.printstd();
122 }
123
124 if let Some(status) = execution.status() {
126 if let Some(state) = status.state() {
127 if state.as_str() == "SUCCEEDED" {
128 if !quiet_mode {
129 if let Some(Ok(results)) = results_result {
131 if let Some(result_set) = results.result_set {
132 if let Some(ref rows) = result_set.rows {
133 if rows.len() > 1 {
134 println!("\n{}", "Query Result Sample (first 10 rows):".bold());
136
137 let column_names: Vec<String> =
139 if let Some(first_row) = rows.first() {
140 if let Some(data) = &first_row.data {
141 data.iter()
142 .map(|d| {
143 d.var_char_value
144 .as_deref()
145 .unwrap_or("")
146 .to_string()
147 })
148 .collect()
149 } else {
150 Vec::new()
151 }
152 } else {
153 Vec::new()
154 };
155
156 if !column_names.is_empty() {
157 let mut all_columns: Vec<Vec<String>> =
159 vec![Vec::new(); column_names.len()];
160
161 for row in rows.iter().skip(1) {
163 if let Some(data) = &row.data {
164 for (i, datum) in data.iter().enumerate() {
165 if i < all_columns.len() {
166 all_columns[i].push(
167 datum
168 .var_char_value
169 .as_deref()
170 .unwrap_or("")
171 .to_string(),
172 );
173 }
174 }
175 }
176 }
177
178 let series: Vec<Column> = all_columns
180 .iter()
181 .zip(column_names.iter())
182 .map(|(col, name)| {
183 Series::new(name.into(), col).into_column()
184 })
185 .collect();
186
187 match DataFrame::new(series) {
188 Ok(df) => println!("{}", df),
189 Err(e) => println!("Error creating DataFrame: {}", e),
190 }
191 }
192 }
193 }
194 }
195 }
196 }
197
198 if let Some(output_dir) = &args.output {
200 let s3_output_location = execution
201 .result_configuration()
202 .and_then(|c| c.output_location())
203 .ok_or_else(|| {
204 anyhow::anyhow!("No output location found for query: {}", query_id)
205 })?;
206
207 if !quiet_mode {
208 println!("\n{}", "S3 Output Location:".bold());
209 println!("📂 {}", s3_output_location.bright_blue());
210 println!("\n{}", "Downloading Results...".bold());
211 }
212
213 let s3_client = aws_sdk_s3::Client::new(ctx.aws_config());
214
215 match download_from_s3(&s3_client, s3_output_location, output_dir, &query_id)
216 .await
217 {
218 Ok(file_path) => {
219 if quiet_mode {
220 println!("{}", file_path.display());
221 } else {
222 println!(
223 "✅ Downloaded to: {}",
224 file_path.display().to_string().bright_green()
225 )
226 }
227 }
228 Err(e) => {
229 if quiet_mode {
230 return Err(e);
231 } else {
232 println!("❌ Error: {}", e.to_string().bright_red())
233 }
234 }
235 }
236 }
237 } else if !quiet_mode {
238 println!("\n{}", "Cannot display results:".bold());
239 println!("❌ Query status is {}", state.as_str().bright_red());
240 }
241 }
242 }
243
244 if !quiet_mode {
245 println!(); }
247 Ok(())
248}