Fix stuff
This commit is contained in:
+115
-2
@@ -547,10 +547,19 @@ pub fn generate_rust_code(
|
||||
output.push_str("#[allow(unused_imports)]\n\n");
|
||||
output.push_str("use roto_runtime::{ProtoAccessor, ProtoBuilder, Result, RotoError, read_varint, RepeatedFieldIterator};\n");
|
||||
output.push_str("use std::str;\n");
|
||||
output.push_str("use bytes::Bytes;\n");
|
||||
output.push_str("use bytes::{Bytes, BytesMut, Buf, BufMut};\n");
|
||||
output.push_str("use tonic::{Request, Response, Status};\n");
|
||||
output.push_str("use tokio_stream::Stream;\n");
|
||||
output.push_str("use std::pin::Pin;\n\n");
|
||||
output.push_str("use std::pin::Pin;\n");
|
||||
output.push_str("use std::sync::Arc;\n");
|
||||
output.push_str("use std::task::{Context, Poll};\n");
|
||||
output.push_str("use std::future::Future;\n");
|
||||
output.push_str("use tonic::body::BoxBody;\n");
|
||||
output.push_str("use tower::Service;\n");
|
||||
output.push_str("use futures_util::StreamExt;\n");
|
||||
output.push_str("use http_body_util::BodyExt;\n");
|
||||
output.push_str("use http_body::Body;\n");
|
||||
output.push_str("use roto_tonic::{BufferPool, StatusBody};\n\n");
|
||||
|
||||
for dep_res in file_proto.dependency() {
|
||||
let (dep_data, _) = dep_res.expect("Failed to iterate dependency");
|
||||
@@ -681,4 +690,108 @@ fn write_service(svc_proto: &ServiceDescriptorProto, output: &mut String) {
|
||||
));
|
||||
}
|
||||
output.push_str("}\n\n");
|
||||
|
||||
let server_name = format!("{}Server", svc_name);
|
||||
output.push_str(&format!("pub struct {} {{\n", server_name));
|
||||
output.push_str(&format!(" inner: Arc<dyn {}>,\n", svc_name));
|
||||
output.push_str(" pool: Arc<BufferPool>,\n");
|
||||
output.push_str("}\n\n");
|
||||
|
||||
output.push_str(&format!("impl {} {{\n", server_name));
|
||||
output.push_str(&format!(" pub fn new(inner: Arc<dyn {}>, pool: Arc<BufferPool>) -> Self {{\n", svc_name));
|
||||
output.push_str(" Self { inner, pool }\n");
|
||||
output.push_str(" }\n");
|
||||
output.push_str("}\n\n");
|
||||
|
||||
output.push_str(&format!("impl tonic::server::NamedService for {} {{\n", server_name));
|
||||
output.push_str(&format!(" const NAME: &'static str = \"{}\";\n", svc_proto.name().unwrap()));
|
||||
output.push_str("}\n\n");
|
||||
|
||||
output.push_str(&format!("impl Service<http::Request<BoxBody>> for {} {{\n", server_name));
|
||||
output.push_str(" type Response = http::Response<BoxBody>;\n");
|
||||
output.push_str(" type Error = std::convert::Infallible;\n");
|
||||
output.push_str(" type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;\n\n");
|
||||
|
||||
output.push_str(" fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {\n");
|
||||
output.push_str(" Poll::Ready(Ok(()))\n");
|
||||
output.push_str(" }\n\n");
|
||||
|
||||
output.push_str(" fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {\n");
|
||||
output.push_str(" let inner = self.inner.clone();\n");
|
||||
output.push_str(" let pool = self.pool.clone();\n");
|
||||
output.push_str(" Box::pin(async move {\n");
|
||||
output.push_str(" let body = req.into_body();\n");
|
||||
output.push_str(" let mut buf = pool.get();\n");
|
||||
output.push_str(" let mut stream = body;\n");
|
||||
output.push_str(" while let Some(frame_result) = stream.frame().await {\n");
|
||||
output.push_str(" let frame = frame_result.map_err(|e| {\n");
|
||||
output.push_str(" panic!(\"Body frame error: {}\", e);\n");
|
||||
output.push_str(" })?;\n");
|
||||
output.push_str(" if let Some(data) = frame.data_ref() {\n");
|
||||
output.push_str(" buf.put(data.clone());\n");
|
||||
output.push_str(" }\n");
|
||||
output.push_str(" }\n\n");
|
||||
|
||||
output.push_str(" let total_len = buf.len();\n");
|
||||
output.push_str(" let bytes_vec = buf.split_to(total_len).freeze();\n");
|
||||
output.push_str(" pool.put(buf);\n");
|
||||
output.push_str(" if bytes_vec.len() < 5 {\n");
|
||||
output.push_str(" let res_body = BoxBody::new(StatusBody(Some(Bytes::from_static(&[0, 0, 0, 0, 0]))));\n");
|
||||
output.push_str(" return Ok(http::Response::builder().status(200).body(res_body).unwrap());\n");
|
||||
output.push_str(" }\n\n");
|
||||
output.push_str(" let payload = bytes_vec.slice(5..);\n");
|
||||
output.push_str(" let path = req.uri().path();\n");
|
||||
output.push_str(" let mut routed = false;\n\n");
|
||||
|
||||
let mut methods = Vec::new();
|
||||
for method_res in svc_proto.method() {
|
||||
let (method_data, _) = method_res.expect("Failed to iterate method");
|
||||
let method_proto = MethodDescriptorProto::new(method_data).expect("Failed to parse MethodDescriptorProto");
|
||||
let method_name = to_snake_case(method_proto.name().unwrap());
|
||||
let input_full_name = method_proto.input_type().unwrap();
|
||||
let input_type = input_full_name.split('.').last().unwrap();
|
||||
let input_owned = format!("Owned{}", input_type);
|
||||
methods.push((method_name, input_owned));
|
||||
}
|
||||
|
||||
for (method_name, input_owned) in methods {
|
||||
output.push_str(&format!(" if path == \"/{}/{}\" {{\n", svc_proto.name().unwrap(), method_name));
|
||||
output.push_str(&format!(" let request_msg = match {}::decode(payload) {{\n", input_owned));
|
||||
output.push_str(" Ok(msg) => msg,\n");
|
||||
output.push_str(" Err(e) => {\n");
|
||||
output.push_str(" let res_body = BoxBody::new(StatusBody(Some(Bytes::from_static(&[0, 0, 0, 0, 0]))));\n");
|
||||
output.push_str(" return Ok(http::Response::builder().status(200).body(res_body).unwrap());\n");
|
||||
output.push_str(" }\n");
|
||||
output.push_str(" }};\n\n");
|
||||
output.push_str(&format!(" let response = match inner.{}(Request::new(request_msg)).await {{\n", method_name));
|
||||
output.push_str(" Ok(res) => res,\n");
|
||||
output.push_str(" Err(e) => {\n");
|
||||
output.push_str(" let res_body = BoxBody::new(StatusBody(Some(Bytes::from_static(&[0, 0, 0, 0, 0]))));\n");
|
||||
output.push_str(" return Ok(http::Response::builder().status(200).body(res_body).unwrap());\n");
|
||||
output.push_str(" }\n");
|
||||
output.push_str(" }};\n\n");
|
||||
output.push_str(" let response_msg = response.into_inner();\n");
|
||||
output.push_str(" let response_bytes = response_msg.bytes();\n");
|
||||
output.push_str(" let mut res_buf = pool.get();\n");
|
||||
output.push_str(" res_buf.put_u8(0);\n");
|
||||
output.push_str(" let len = response_bytes.len() as u32;\n");
|
||||
output.push_str(" res_buf.put_slice(&len.to_be_bytes());\n");
|
||||
output.push_str(" res_buf.put_slice(&response_bytes);\n");
|
||||
output.push_str(" let frame_len = res_buf.len();\n");
|
||||
output.push_str(" let frame = res_buf.split_to(frame_len).freeze();\n");
|
||||
output.push_str(" pool.put(res_buf);\n");
|
||||
output.push_str(" let res_body = BoxBody::new(StatusBody(Some(frame)));\n");
|
||||
output.push_str(" routed = true;\n");
|
||||
output.push_str(" return Ok(http::Response::builder().status(200).header(\"content-type\", \"application/grpc\").body(res_body).unwrap());\n");
|
||||
output.push_str(" }\n");
|
||||
}
|
||||
|
||||
output.push_str(" if !routed {\n");
|
||||
output.push_str(" let res_body = BoxBody::new(StatusBody(Some(Bytes::from_static(&[0, 0, 0, 0, 0]))));\n");
|
||||
output.push_str(" return Ok(http::Response::builder().status(200).body(res_body).unwrap());\n");
|
||||
output.push_str(" }\n");
|
||||
output.push_str(" Ok(http::Response::builder().status(200).body(BoxBody::new(StatusBody(None))).unwrap())\n");
|
||||
output.push_str(" })\n");
|
||||
output.push_str(" }\n");
|
||||
output.push_str("}\n");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user