Use roto_tonic types in hello_world example
Fix the client's poll_ready implementation to properly propagate readiness.
This commit is contained in:
@@ -5,6 +5,8 @@ use roto_runtime::RotoOwned;
|
||||
use std::task::{Context, Poll};
|
||||
use tower::Service;
|
||||
|
||||
pub use roto_tonic::{BufferPool, StatusBody};
|
||||
|
||||
pub mod hello {
|
||||
include!(concat!(env!("OUT_DIR"), "/hello.rs"));
|
||||
}
|
||||
@@ -19,11 +21,14 @@ where
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.0.poll_ready(cx)
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> S::Future {
|
||||
let waker = futures_util::task::noop_waker();
|
||||
let mut cx = std::task::Context::from_waker(&waker);
|
||||
let _ = self.poll_ready(&mut cx);
|
||||
self.0.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,35 +13,12 @@ use roto_runtime::{RotoOwned, RotoMessage};
|
||||
use http_body_util::BodyExt;
|
||||
use http_body::Body;
|
||||
|
||||
pub use roto_tonic::{BufferPool, StatusBody};
|
||||
|
||||
pub mod hello {
|
||||
include!(concat!(env!("OUT_DIR"), "/hello.rs"));
|
||||
}
|
||||
|
||||
struct BufferPool {
|
||||
pool: Mutex<Vec<BytesMut>>,
|
||||
default_capacity: usize,
|
||||
}
|
||||
|
||||
impl BufferPool {
|
||||
fn new(default_capacity: usize) -> Self {
|
||||
Self {
|
||||
pool: Mutex::new(Vec::new()),
|
||||
default_capacity,
|
||||
}
|
||||
}
|
||||
|
||||
fn get(&self) -> BytesMut {
|
||||
self.pool.lock().unwrap().pop().unwrap_or_else(|| BytesMut::with_capacity(self.default_capacity))
|
||||
}
|
||||
|
||||
fn put(&self, mut buf: BytesMut) {
|
||||
buf.clear();
|
||||
if buf.capacity() >= self.default_capacity {
|
||||
self.pool.lock().unwrap().push(buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MyHelloWorld {
|
||||
pool: Arc<BufferPool>,
|
||||
@@ -99,24 +76,6 @@ impl tonic::server::NamedService for HelloWorldServer {
|
||||
const NAME: &'static str = "hello.HelloWorldService";
|
||||
}
|
||||
|
||||
struct StatusBody(Option<Bytes>);
|
||||
|
||||
impl Body for StatusBody {
|
||||
type Data = Bytes;
|
||||
type Error = Status;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
||||
if let Some(data) = self.0.take() {
|
||||
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
|
||||
} else {
|
||||
Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<http::Request<BoxBody>> for HelloWorldServer {
|
||||
type Response = http::Response<BoxBody>;
|
||||
type Error = std::convert::Infallible;
|
||||
@@ -152,7 +111,7 @@ impl Service<http::Request<BoxBody>> for HelloWorldServer {
|
||||
|
||||
if bytes_vec.len() < 5 {
|
||||
println!("Body too short: {} bytes", bytes_vec.len());
|
||||
let res_body = BoxBody::new(StatusBody(Some(Bytes::from_static(&[0, 0, 0, 0, 0]))));
|
||||
let res_body = BoxBody::new(StatusBody::new(Some(Bytes::from_static(&[0, 0, 0, 0, 0])), 0));
|
||||
return Ok(http::Response::builder()
|
||||
.status(200)
|
||||
.body(res_body)
|
||||
@@ -164,7 +123,7 @@ impl Service<http::Request<BoxBody>> for HelloWorldServer {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
println!("Decode error: {}", e);
|
||||
let res_body = BoxBody::new(StatusBody(Some(Bytes::from_static(&[0, 0, 0, 0, 0]))));
|
||||
let res_body = BoxBody::new(StatusBody::new(Some(Bytes::from_static(&[0, 0, 0, 0, 0])), 0));
|
||||
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
|
||||
}
|
||||
};
|
||||
@@ -174,7 +133,7 @@ impl Service<http::Request<BoxBody>> for HelloWorldServer {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
println!("Service error: {}", e);
|
||||
let res_body = BoxBody::new(StatusBody(Some(Bytes::from_static(&[0, 0, 0, 0, 0]))));
|
||||
let res_body = BoxBody::new(StatusBody::new(Some(Bytes::from_static(&[0, 0, 0, 0, 0])), 0));
|
||||
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
|
||||
}
|
||||
};
|
||||
@@ -193,7 +152,7 @@ impl Service<http::Request<BoxBody>> for HelloWorldServer {
|
||||
let frame = res_buf.split_to(frame_len).freeze();
|
||||
pool.put(res_buf);
|
||||
|
||||
let res_body = BoxBody::new(StatusBody(Some(frame)));
|
||||
let res_body = BoxBody::new(StatusBody::new(Some(frame), 0));
|
||||
Ok(http::Response::builder()
|
||||
.status(200)
|
||||
.header("content-type", "application/grpc")
|
||||
|
||||
Reference in New Issue
Block a user