Compare commits

...

3 Commits

Author SHA1 Message Date
charles daa42d2d07 Update dependencies in codegen build tests
Bump bytes version to 1.7 and synchronize dependencies across all
generated Cargo.toml files in codegen tests.
2026-05-12 14:03:28 -07:00
charles 804ff3ead0 Checkpoint for gRPC implementation 2026-05-12 13:44:53 -07:00
charles 02a0b0d908 Initial commit of AI generated slop 2026-05-11 22:31:04 -07:00
19 changed files with 1969 additions and 23 deletions
+1
View File
@@ -2,3 +2,4 @@
test_gen_project test_gen_project
test_types_gen_project test_types_gen_project
test_map_gen_project test_map_gen_project
test_grpc_project
Generated
+1143 -4
View File
File diff suppressed because it is too large Load Diff
+2
View File
@@ -4,6 +4,8 @@ members = [
"codegen", "codegen",
"protos", "protos",
"benches", "benches",
"roto-tonic", "test_grpc_project",
"examples/hello_world",
] ]
exclude = [ exclude = [
+12 -12
View File
@@ -8,8 +8,11 @@ Instead of deserializing binary protobuf data into Rust structs, roto scans a me
construction — recording the byte offset of each field — then reads fields on demand directly from construction — recording the byte offset of each field — then reads fields on demand directly from
the original bytes. No heap allocation, no data copying, no full deserialization upfront. the original bytes. No heap allocation, no data copying, no full deserialization upfront.
Writing works the same way: you provide a fixed buffer and a builder writes fields directly into it, It also provides a first-class integration with the `tonic` gRPC framework via the `roto-tonic` crate,
returning a slice of the bytes written. enabling zero-allocation request/response processing.
Writing works the same way: you provide a fixed buffer (or a `bytes::BufMut`) and a builder writes
fields directly into it, returning a slice of the bytes written.
## Design ## Design
@@ -28,10 +31,15 @@ This will generate a file, src/hackers.rs.
## Generated code ## Generated code
For each protobuf message roto generates two types: For each protobuf message roto generates three types:
- **Reader struct** `MessageName<'a>` — borrows the original byte slice, zero-copy. - **Reader struct** `MessageName<'a>` — borrows the original byte slice, zero-copy.
- **Builder struct** `MessageNameBuilder<'b>` — writes into a caller-provided `&mut [u8]`. - **Builder struct** `MessageNameBuilder<'b>` — writes into a caller-provided `&mut [u8]` or `BufMut`.
- **Owned struct** `OwnedMessageName` — owns the byte buffer and implements `RotoOwned`, providing a bridge to the `Reader`.
For each protobuf service, roto generates:
- **Service Trait** `ServiceName` — a `tonic`-compatible async trait for gRPC service implementations.
Nested message types are placed in a `pub mod message_name { ... }` module (snake_case of the Nested message types are placed in a `pub mod message_name { ... }` module (snake_case of the
parent message name) within the same generated file. parent message name) within the same generated file.
@@ -314,12 +322,4 @@ The goal is to validate roto's implementation against the Proto3 specification.
### Unsupported Features ### Unsupported Features
- **Reserved Fields**: `reserved` statements are ignored. - **Reserved Fields**: `reserved` statements are ignored.
- **Services**: `service` and `rpc` definitions are ignored.
- **Options**: Field and message options are ignored. - **Options**: Field and message options are ignored.
### Tasks
- [x] Analyze `roto/codegen` to determine which protobuf constructs are supported during code generation.
- [x] Analyze `roto/runtime` to determine which wire types and protobuf types are supported during reading and writing.
- [x] Compare findings with the Proto3 spec (https://protobuf.dev/reference/protobuf/proto3-spec/).
- [x] Document supported and unsupported features in the README.
+44
View File
@@ -0,0 +1,44 @@
# Tasks for Tonic Integration
This document outlines the steps required to integrate the `roto` protobuf library with the `tonic` gRPC framework.
## Goals
- Provide a `tonic::codec::Codec` implementation for `roto` messages.
- Enable zero-allocation decoding of gRPC requests and responses.
- Support efficient encoding of gRPC messages using `roto`'s `Builder` pattern.
- Generate `tonic`-compatible service traits and client/server boilerplate via `protoc-gen-roto`.
## 1. Runtime Changes (`roto_runtime`)
- [x] Add `bytes` as a dependency to `roto_runtime`.
- [x] Modify `ProtoBuilder` to support writing to `bytes::BufMut` or provide a specialized `BufMut` based builder to facilitate integration with `tonic::codec::EncodeBuf`.
- [x] Design and implement "Owned" message support:
- [x] Create a mechanism (likely via codegen) to generate structs that hold `bytes::Bytes` and the field offsets (e.g., `OwnedMyMessage`).
- [x] These structs will serve as the owned types required by `tonic`'s `Codec`.
- [x] Add a method to convert an `OwnedMyMessage` into a zero-allocation `Reader` (e.g., `reader(&self) -> MyMessage<'_>`).
## 2. Tonic Codec Implementation
- [x] Implement `tonic::codec::Codec` for `roto` messages.
- [x] Implement `tonic::codec::Decoder` for `roto` messages:
- [x] The decoder should take a `DecodeBuf` and produce an `OwnedMyMessage`.
- [x] It must perform the initial scan of the buffer to populate the field offsets.
- [x] Implement `tonic::codec::Encoder` for `roto` messages:
- [x] The encoder should take an `OwnedMyMessage` and write its internal `bytes::Bytes` to the `EncodeBuf`.
- [x] Since `roto` responses are built using `Builder` directly into a buffer, the `Encoder` will primarily handle copying these pre-built buffers.
## 3. Code Generation Extensions (`protoc-gen-roto`)
- [x] Update the generator to produce `OwnedMyMessage` structs in addition to the `Reader` and `Builder` structs.
- [x] Generate the `OwnedMyMessage::new(data: bytes::Bytes)` method for initial decoding.
- [x] Implement gRPC service generation:
- [x] Generate `tonic`-compatible service traits (using `#[tonic::async_trait]`).
- [x] Generate client and server boilerplate that uses `OwnedMyMessage` as the request and response types.
- [x] Ensure the generated code correctly maps protobuf services to Rust traits.
## 4. Testing and Validation
- [x] Create a test gRPC project using the updated `protoc-gen-roto`.
- [ ] Implement a sample gRPC service with:
- [ ] A unary call.
- [ ] A server-streaming call.
- [ ] A client-streaming call.
- [ ] A bidirectional-streaming call.
- [ ] Verify that the integration is zero-allocation on the reading path.
- [ ] Perform benchmark tests to compare performance with `prost`.
+77 -3
View File
@@ -1,6 +1,6 @@
use crate::google::protobuf::descriptor::{ use crate::google::protobuf::descriptor::{
DescriptorProto, EnumDescriptorProto, FieldDescriptorProto, FileDescriptorProto, DescriptorProto, EnumDescriptorProto, FieldDescriptorProto, FileDescriptorProto,
FileDescriptorSet, MessageOptions, OneofDescriptorProto, FileDescriptorSet, MessageOptions, MethodDescriptorProto, OneofDescriptorProto, ServiceDescriptorProto,
}; };
use roto_runtime::ProtoAccessor; use roto_runtime::ProtoAccessor;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@@ -434,6 +434,26 @@ fn write_message(msg_proto: &DescriptorProto, output: &mut String) {
output.push_str(&format!(" pub fn finish(self) -> roto_runtime::Result<&'b mut [u8]> {{\n self.builder.finish()\n }}\n}}\n\n")); output.push_str(&format!(" pub fn finish(self) -> roto_runtime::Result<&'b mut [u8]> {{\n self.builder.finish()\n }}\n}}\n\n"));
output.push_str(&format!("pub struct Owned{} {{\n", msg_name));
output.push_str(" pub data: bytes::Bytes,\n");
output.push_str("}\n\n");
output.push_str(&format!("impl roto_runtime::RotoOwned for Owned{} {{\n", msg_name));
output.push_str(&format!(" type Reader<'a> = {}<'a>;\n", msg_name));
output.push_str(&format!(" fn reader(&self) -> {}<'_> {{\n", msg_name));
output.push_str(&format!(" {}::new(&self.data).expect(\"failed to create reader\")\n", msg_name));
output.push_str(" }\n");
output.push_str("}\n\n");
output.push_str(&format!("impl roto_runtime::RotoMessage for Owned{} {{\n", msg_name));
output.push_str(" fn decode(buf: bytes::Bytes) -> roto_runtime::Result<Self> {\n");
output.push_str(&format!(" Ok(Owned{} {{ data: buf }})\n", msg_name));
output.push_str(" }\n\n");
output.push_str(" fn bytes(&self) -> bytes::Bytes {\n");
output.push_str(" self.data.clone()\n");
output.push_str(" }\n");
output.push_str("}\n\n");
let mut nested_enums = Vec::new(); let mut nested_enums = Vec::new();
for e_res in msg_proto.enum_type() { for e_res in msg_proto.enum_type() {
if let Ok((e, _)) = e_res { if let Ok((e, _)) = e_res {
@@ -524,9 +544,13 @@ pub fn generate_rust_code(
let mut output = String::new(); let mut output = String::new();
output.push_str("// @generated by protoc-gen-roto — do not edit\n"); output.push_str("// @generated by protoc-gen-roto — do not edit\n");
output.push_str("#![allow(unused_imports)]\n\n"); output.push_str("#[allow(unused_imports)]\n\n");
output.push_str("use roto_runtime::{ProtoAccessor, ProtoBuilder, Result, RotoError, read_varint, RepeatedFieldIterator};\n"); output.push_str("use roto_runtime::{ProtoAccessor, ProtoBuilder, Result, RotoError, read_varint, RepeatedFieldIterator};\n");
output.push_str("use std::str;\n\n"); output.push_str("use std::str;\n");
output.push_str("use bytes::Bytes;\n");
output.push_str("use tonic::{Request, Response, Status};\n");
output.push_str("use tokio_stream::Stream;\n");
output.push_str("use std::pin::Pin;\n\n");
for dep_res in file_proto.dependency() { for dep_res in file_proto.dependency() {
let (dep_data, _) = dep_res.expect("Failed to iterate dependency"); let (dep_data, _) = dep_res.expect("Failed to iterate dependency");
@@ -553,6 +577,15 @@ pub fn generate_rust_code(
&mut output, &mut output,
); );
} }
// Services
for svc_res in file_proto.service() {
let (svc_data, _) = svc_res.expect("Failed to iterate service");
write_service(
&ServiceDescriptorProto::new(svc_data).expect("Failed to parse ServiceDescriptorProto"),
&mut output,
);
}
generated_files.push((rust_file_name, output)); generated_files.push((rust_file_name, output));
} }
@@ -608,3 +641,44 @@ pub fn generate_rust_code(
generated_files generated_files
} }
fn write_service(svc_proto: &ServiceDescriptorProto, output: &mut String) {
let svc_name = to_pascal_case(svc_proto.name().unwrap());
output.push_str(&format!("#[tonic::async_trait]\npub trait {}: Send + Sync + 'static {{\n", svc_name));
for method_res in svc_proto.method() {
let (method_data, _) = method_res.expect("Failed to iterate method");
let method_proto = MethodDescriptorProto::new(method_data).expect("Failed to parse MethodDescriptorProto");
let method_name = to_snake_case(method_proto.name().unwrap());
let input_full_name = method_proto.input_type().unwrap();
let output_full_name = method_proto.output_type().unwrap();
let input_type = input_full_name.split('.').last().unwrap();
let output_type = output_full_name.split('.').last().unwrap();
let input_owned = format!("Owned{}", input_type);
let output_owned = format!("Owned{}", output_type);
let client_streaming = method_proto.client_streaming().unwrap_or(false);
let server_streaming = method_proto.server_streaming().unwrap_or(false);
let req_type = if client_streaming {
format!("Request<tonic::Streaming<{}>>", input_owned)
} else {
format!("Request<{}>", input_owned)
};
let resp_type = if server_streaming {
format!("Response<Pin<Box<dyn Stream<Item = Result<{}, Status>> + Send>>>", output_owned)
} else {
format!("Response<{}>", output_owned)
};
output.push_str(&format!(
" async fn {}(&self, request: {}) -> std::result::Result<{}, Status>;\n",
method_name, req_type, resp_type
));
}
output.push_str("}\n\n");
}
+1 -1
View File
@@ -58,7 +58,7 @@ fn test_generated_code_builds() {
let cargo_toml_content = let cargo_toml_content =
fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml"); fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml");
let updated_cargo_toml = format!( let updated_cargo_toml = format!(
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\n\n[workspace]\n", "{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\nbytes = \"1.7\"\ntonic = \"0.12\"\ntokio-stream = \"0.1\"\n\n[workspace]\n",
cargo_toml_content cargo_toml_content
); );
fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml"); fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml");
+1 -1
View File
@@ -38,7 +38,7 @@ fn test_map_generated_code_builds() {
let cargo_toml_content = let cargo_toml_content =
fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml"); fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml");
let updated_cargo_toml = format!( let updated_cargo_toml = format!(
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\n\n[workspace]\n", "{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\nbytes = \"1.7\"\ntonic = \"0.12\"\ntokio-stream = \"0.1\"\n\n[workspace]\n",
cargo_toml_content cargo_toml_content
); );
fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml"); fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml");
+1 -1
View File
@@ -38,7 +38,7 @@ fn test_types_generated_code_builds() {
let cargo_toml_content = let cargo_toml_content =
fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml"); fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml");
let updated_cargo_toml = format!( let updated_cargo_toml = format!(
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\n\n[workspace]\n", "{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\nbytes = \"1.7\"\ntonic = \"0.12\"\ntokio-stream = \"0.1\"\n\n[workspace]\n",
cargo_toml_content cargo_toml_content
); );
fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml"); fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml");
+29
View File
@@ -0,0 +1,29 @@
[package]
name = "hello-world"
version = "0.1.0"
edition = "2024"
[[bin]]
name = "server"
path = "src/bin/server.rs"
[[bin]]
name = "client"
path = "src/bin/client.rs"
[dependencies]
roto-runtime = { path = "../../runtime" }
roto-tonic = { path = "../../roto-tonic" }
tonic = "0.12"
tokio = { version = "1.38", features = ["full"] }
tokio-stream = "0.1"
bytes = "1.7"
prost = "0.13"
tower = "0.4"
futures-util = "0.3"
http-body-util = "0.1"
http = "1.1"
http-body = "1.0"
[build-dependencies]
tonic-build = "0.12"
+27
View File
@@ -0,0 +1,27 @@
fn main() {
let proto_file = "proto/hello.proto";
let out_dir = std::env::var("OUT_DIR").unwrap();
let dest_path = std::path::Path::new(&out_dir).join("hello.rs");
// Find the protoc-gen-roto binary
// In a real scenario, this should be passed as an environment variable or found in PATH
// For this example, we'll try to find it in the target directory
let target_dir = std::env::current_dir().unwrap().join("../../target/debug");
let plugin_path = target_dir.join("protoc-gen-roto");
if !plugin_path.exists() {
panic!("protoc-gen-roto plugin not found at {:?}", plugin_path);
}
let status = std::process::Command::new("protoc")
.arg(format!("--plugin=protoc-gen-roto={}", plugin_path.display()))
.arg(format!("--roto_out={}", out_dir))
.arg(format!("--roto_opt=src=proto")) // Assuming the plugin handles this or we just pass it
.arg(proto_file)
.status()
.expect("Failed to execute protoc");
if !status.success() {
panic!("protoc failed with status {}", status);
}
}
+15
View File
@@ -0,0 +1,15 @@
syntax = "proto3";
package hello;
service HelloWorldService {
rpc HelloWorld (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
+210
View File
@@ -0,0 +1,210 @@
// @generated by protoc-gen-roto — do not edit
#[allow(unused_imports)]
use roto_runtime::{ProtoAccessor, ProtoBuilder, Result, RotoError, read_varint, RepeatedFieldIterator};
use std::str;
use bytes::Bytes;
use tonic::{Request, Response, Status};
use tokio_stream::Stream;
use std::pin::Pin;
pub struct HelloRequest<'a> {
accessor: roto_runtime::ProtoAccessor<'a>,
name_offset: Option<usize>,
}
impl<'a> HelloRequest<'a> {
pub fn new(data: &'a [u8]) -> roto_runtime::Result<Self> {
let accessor = roto_runtime::ProtoAccessor::new(data)?;
let mut name_offset = None;
for item in accessor.fields() {
let (offset, tag, _) = item?;
if tag.field_number == 1 { name_offset = Some(offset); }
}
Ok(Self {
accessor,
name_offset,
})
}
pub fn name(&self) -> roto_runtime::Result<&'a str> {
let offset = self.name_offset.ok_or(roto_runtime::RotoError::FieldNotFound)?;
let (bytes, _) = self.accessor.get_value_at(offset)?;
str::from_utf8(bytes).map_err(|_| roto_runtime::RotoError::WireFormatViolation)
}
pub fn name_or_default(&self) -> roto_runtime::Result<&'a str> {
self.name().or(Ok(""))
}
pub fn has_name(&self) -> bool { self.name_offset.is_some() }
pub fn raw_fields(&self) -> roto_runtime::RawFieldIterator<'a> {
self.accessor.raw_fields()
}
}
pub struct HelloRequestBuilder<'b> {
builder: roto_runtime::ProtoBuilder<'b>,
name_written: bool,
}
impl<'b> HelloRequestBuilder<'b> {
pub fn builder(buf: &mut [u8]) -> HelloRequestBuilder<'_> {
HelloRequestBuilder {
builder: roto_runtime::ProtoBuilder::new(buf),
name_written: false,
}
}
pub fn name(mut self, value: &str) -> roto_runtime::Result<Self> {
self.builder.write_string(1, value)?;
self.name_written = true;
Ok(self)
}
pub fn with(mut self, msg: &HelloRequest<'_>) -> roto_runtime::Result<Self> {
for item in msg.raw_fields() {
let (field_number, raw_bytes) = item?;
let is_written = match field_number {
1 => self.name_written,
_ => false,
};
if !is_written {
self.builder.write_raw(raw_bytes)?;
}
}
Ok(self)
}
pub fn finish(self) -> roto_runtime::Result<&'b mut [u8]> {
self.builder.finish()
}
}
pub struct OwnedHelloRequest {
pub data: bytes::Bytes,
}
impl roto_runtime::RotoOwned for OwnedHelloRequest {
type Reader<'a> = HelloRequest<'a>;
fn reader(&self) -> HelloRequest<'_> {
HelloRequest::new(&self.data).expect("failed to create reader")
}
}
impl roto_runtime::RotoMessage for OwnedHelloRequest {
fn decode(buf: bytes::Bytes) -> roto_runtime::Result<Self> {
Ok(OwnedHelloRequest { data: buf })
}
fn bytes(&self) -> bytes::Bytes {
self.data.clone()
}
}
pub struct HelloResponse<'a> {
accessor: roto_runtime::ProtoAccessor<'a>,
message_offset: Option<usize>,
}
impl<'a> HelloResponse<'a> {
pub fn new(data: &'a [u8]) -> roto_runtime::Result<Self> {
let accessor = roto_runtime::ProtoAccessor::new(data)?;
let mut message_offset = None;
for item in accessor.fields() {
let (offset, tag, _) = item?;
if tag.field_number == 1 { message_offset = Some(offset); }
}
Ok(Self {
accessor,
message_offset,
})
}
pub fn message(&self) -> roto_runtime::Result<&'a str> {
let offset = self.message_offset.ok_or(roto_runtime::RotoError::FieldNotFound)?;
let (bytes, _) = self.accessor.get_value_at(offset)?;
str::from_utf8(bytes).map_err(|_| roto_runtime::RotoError::WireFormatViolation)
}
pub fn message_or_default(&self) -> roto_runtime::Result<&'a str> {
self.message().or(Ok(""))
}
pub fn has_message(&self) -> bool { self.message_offset.is_some() }
pub fn raw_fields(&self) -> roto_runtime::RawFieldIterator<'a> {
self.accessor.raw_fields()
}
}
pub struct HelloResponseBuilder<'b> {
builder: roto_runtime::ProtoBuilder<'b>,
message_written: bool,
}
impl<'b> HelloResponseBuilder<'b> {
pub fn builder(buf: &mut [u8]) -> HelloResponseBuilder<'_> {
HelloResponseBuilder {
builder: roto_runtime::ProtoBuilder::new(buf),
message_written: false,
}
}
pub fn message(mut self, value: &str) -> roto_runtime::Result<Self> {
self.builder.write_string(1, value)?;
self.message_written = true;
Ok(self)
}
pub fn with(mut self, msg: &HelloResponse<'_>) -> roto_runtime::Result<Self> {
for item in msg.raw_fields() {
let (field_number, raw_bytes) = item?;
let is_written = match field_number {
1 => self.message_written,
_ => false,
};
if !is_written {
self.builder.write_raw(raw_bytes)?;
}
}
Ok(self)
}
pub fn finish(self) -> roto_runtime::Result<&'b mut [u8]> {
self.builder.finish()
}
}
pub struct OwnedHelloResponse {
pub data: bytes::Bytes,
}
impl roto_runtime::RotoOwned for OwnedHelloResponse {
type Reader<'a> = HelloResponse<'a>;
fn reader(&self) -> HelloResponse<'_> {
HelloResponse::new(&self.data).expect("failed to create reader")
}
}
impl roto_runtime::RotoMessage for OwnedHelloResponse {
fn decode(buf: bytes::Bytes) -> roto_runtime::Result<Self> {
Ok(OwnedHelloResponse { data: buf })
}
fn bytes(&self) -> bytes::Bytes {
self.data.clone()
}
}
#[tonic::async_trait]
pub trait HelloWorldService: Send + Sync + 'static {
async fn hello_world(&self, request: Request<OwnedHelloRequest>) -> std::result::Result<Response<OwnedHelloResponse>, Status>;
}
+64
View File
@@ -0,0 +1,64 @@
use tonic::Request;
use roto_tonic::RotoCodec;
use hello::{HelloWorldService, OwnedHelloRequest, OwnedHelloResponse};
use roto_runtime::RotoOwned;
use std::task::{Context, Poll};
use tower::Service;
pub mod hello {
include!("../../proto/hello.rs");
}
struct ReadyService<S>(S);
impl<S, Req> Service<Req> for ReadyService<S>
where
S: Service<Req>,
{
type Response = S::Response;
type Error = S::Error;
type Future = S::Future;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Req) -> S::Future {
self.0.call(req)
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let channel = tonic::transport::Channel::from_static("http://[::1]:50051")
.connect()
.await?;
let ready_channel = ReadyService(channel);
let mut client = tonic::client::Grpc::new(ready_channel);
// We need to specify the method path. For HelloWorldService/HelloWorld, it is "/hello.HelloWorldService/HelloWorld"
let mut buf = vec![0u8; 1024];
let slice = hello::HelloRequestBuilder::builder(&mut buf)
.name("Roto").unwrap()
.finish().unwrap();
let request = OwnedHelloRequest {
data: bytes::Bytes::copy_from_slice(slice),
};
// In tonic's Grpc client, we specify the codec separately.
let response = client
.unary(
Request::new(request),
http::uri::PathAndQuery::from_static("/hello.HelloWorldService/HelloWorld"),
RotoCodec::<OwnedHelloResponse, OwnedHelloRequest>::default(),
)
.await?;
let response_msg: OwnedHelloResponse = response.into_inner();
let reader = response_msg.reader();
println!("Server responded: {}", reader.message().unwrap_or("No message"));
Ok(())
}
+165
View File
@@ -0,0 +1,165 @@
use std::pin::Pin;
use std::future::Future;
use std::task::{Context, Poll};
use std::sync::Arc;
use tonic::{transport::Server, Request, Response, Status};
use roto_tonic::RotoCodec;
use hello::{HelloWorldService, OwnedHelloRequest, OwnedHelloResponse};
use tower::Service;
use bytes::{Bytes, Buf, BufMut};
use tonic::body::BoxBody;
use futures_util::StreamExt;
use roto_runtime::{RotoOwned, RotoMessage};
use http_body_util::BodyExt;
use http_body::Body;
pub mod hello {
include!("../../proto/hello.rs");
}
#[derive(Default, Clone)]
pub struct MyHelloWorld {}
#[tonic::async_trait]
impl HelloWorldService for MyHelloWorld {
async fn hello_world(
&self,
request: Request<OwnedHelloRequest>,
) -> Result<Response<OwnedHelloResponse>, Status> {
let req = request.into_inner();
let reader = req.reader();
let name = reader.name().unwrap_or("Unknown");
let mut buf = vec![0u8; 1024];
let slice = hello::HelloResponseBuilder::builder(&mut buf)
.message(&format!("Hello {}!", name)).unwrap()
.finish().unwrap();
let reply = OwnedHelloResponse {
data: bytes::Bytes::copy_from_slice(slice),
};
Ok(Response::new(reply))
}
}
// --- Tonic Glue ---
#[derive(Clone)]
pub struct HelloWorldServer {
inner: Arc<MyHelloWorld>,
}
impl HelloWorldServer {
pub fn new(inner: MyHelloWorld) -> Self {
Self { inner: Arc::new(inner) }
}
}
impl tonic::server::NamedService for HelloWorldServer {
const NAME: &'static str = "hello.HelloWorldService";
}
struct StatusBody(Option<Bytes>);
impl Body for StatusBody {
type Data = Bytes;
type Error = Status;
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
if let Some(data) = self.0.take() {
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
} else {
Poll::Ready(None)
}
}
}
impl Service<http::Request<BoxBody>> for HelloWorldServer {
type Response = http::Response<BoxBody>;
type Error = std::convert::Infallible;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
let inner = self.inner.clone();
println!("Server received request: {} {}", req.method(), req.uri());
Box::pin(async move {
let body = req.into_body();
let bytes_vec = body.collect().await.map_err(|e| {
println!("Body collect error: {}", e);
panic!("Body collect error: {}", e);
})?.to_bytes();
println!("Collected body bytes: {} bytes", bytes_vec.len());
if bytes_vec.len() < 5 {
println!("Body too short: {} bytes", bytes_vec.len());
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
return Ok(http::Response::builder()
.status(200)
.body(res_body)
.unwrap());
}
let data = &bytes_vec[5..];
println!("Decoding request from {} bytes", data.len());
let request_msg = match OwnedHelloRequest::decode(Bytes::copy_from_slice(data)) {
Ok(msg) => msg,
Err(e) => {
println!("Decode error: {}", e);
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
}
};
println!("Request decoded successfully");
let response = match inner.hello_world(Request::new(request_msg)).await {
Ok(res) => res,
Err(e) => {
println!("Service error: {}", e);
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
}
};
let response_msg = response.into_inner();
let response_bytes = response_msg.bytes();
println!("Service responded with {} bytes", response_bytes.len());
let mut res_buf = vec![0u8; 5 + response_bytes.len()];
res_buf[0] = 0;
let len = response_bytes.len() as u32;
res_buf[1..5].copy_from_slice(&len.to_be_bytes());
res_buf[5..].copy_from_slice(&response_bytes);
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(res_buf))));
Ok(http::Response::builder()
.status(200)
.header("content-type", "application/grpc")
.body(res_body)
.unwrap())
})
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr: std::net::SocketAddr = "[::1]:50051".parse()?;
let hello = MyHelloWorld::default();
println!("Server listening on {}", addr);
Server::builder()
.add_service(HelloWorldServer::new(hello))
.serve(addr)
.await?;
Ok(())
}
+10
View File
@@ -0,0 +1,10 @@
[package]
name = "roto-tonic"
version = "0.1.0"
edition = "2024"
[dependencies]
roto-runtime = { path = "../runtime" }
tonic = "0.12"
bytes = "1.7"
prost = "0.13"
+72
View File
@@ -0,0 +1,72 @@
use std::marker::PhantomData;
use tonic::codec::{Codec, Decoder, Encoder, DecodeBuf, EncodeBuf};
use bytes::{Buf, BufMut};
use roto_runtime::RotoMessage;
pub struct RotoCodec<T, U> {
_phantom: PhantomData<(T, U)>,
}
impl<T, U> Default for RotoCodec<T, U> {
fn default() -> Self {
Self {
_phantom: PhantomData,
}
}
}
impl<T, U> Codec for RotoCodec<T, U>
where
T: RotoMessage + Send + 'static,
U: RotoMessage + Send + 'static,
{
type Encode = U;
type Decode = T;
type Encoder = RotoEncoder<U>;
type Decoder = RotoDecoder<T>;
fn encoder(&mut self) -> Self::Encoder {
RotoEncoder(PhantomData)
}
fn decoder(&mut self) -> Self::Decoder {
RotoDecoder(PhantomData)
}
}
pub struct RotoEncoder<U>(PhantomData<U>);
impl<U> Encoder for RotoEncoder<U>
where
U: RotoMessage,
{
type Item = U;
type Error = tonic::Status;
fn encode(&mut self, message: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
buf.put_slice(&message.bytes());
Ok(())
}
}
pub struct RotoDecoder<T>(PhantomData<T>);
impl<T> Decoder for RotoDecoder<T>
where
T: RotoMessage,
{
type Item = T;
type Error = tonic::Status;
fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
if buf.remaining() == 0 {
return Ok(None);
}
let bytes = buf.copy_to_bytes(buf.remaining());
match T::decode(bytes) {
Ok(msg) => Ok(Some(msg)),
Err(e) => Err(tonic::Status::internal(format!("Roto decode error: {}", e))),
}
}
}
+1
View File
@@ -4,3 +4,4 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
bytes = "1.7"
+94 -1
View File
@@ -1,4 +1,5 @@
use std::fmt; use std::fmt;
use bytes::BufMut;
pub struct MapFieldIterator<'a> { pub struct MapFieldIterator<'a> {
inner: RepeatedFieldIterator<'a>, inner: RepeatedFieldIterator<'a>,
@@ -54,8 +55,17 @@ impl std::error::Error for RotoError {}
pub type Result<T> = std::result::Result<T, RotoError>; pub type Result<T> = std::result::Result<T, RotoError>;
pub trait RotoOwned {
type Reader<'a> where Self: 'a;
fn reader(&self) -> Self::Reader<'_>;
}
pub trait RotoMessage: Sized {
fn decode(buf: bytes::Bytes) -> Result<Self>;
fn bytes(&self) -> bytes::Bytes;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum WireType { pub enum WireType {
Varint = 0, Varint = 0,
Fixed64 = 1, Fixed64 = 1,
@@ -818,3 +828,86 @@ impl<'a> ProtoBuilder<'a> {
Ok(&mut self.buf[..self.pos]) Ok(&mut self.buf[..self.pos])
} }
} }
pub struct BufMutBuilder<'a, B: BufMut> {
buf: &'a mut B,
}
impl<'a, B: BufMut> BufMutBuilder<'a, B> {
pub fn new(buf: &'a mut B) -> Self {
Self { buf }
}
fn write_tag(&mut self, field_number: u32, wire_type: WireType) -> Result<()> {
let mut temp = [0u8; 10];
let len = Tag::encode(field_number, wire_type, &mut temp)?;
self.buf.put_slice(&temp[..len]);
Ok(())
}
pub fn write_varint(&mut self, field_number: u32, value: u64) -> Result<()> {
self.write_tag(field_number, WireType::Varint)?;
let mut temp = [0u8; 10];
let len = write_varint(value, &mut temp)?;
self.buf.put_slice(&temp[..len]);
Ok(())
}
pub fn write_int32(&mut self, field_number: u32, value: i32) -> Result<()> {
self.write_varint(field_number, value as u64)
}
pub fn write_string(&mut self, field_number: u32, value: &str) -> Result<()> {
self.write_tag(field_number, WireType::LengthDelimited)?;
let bytes = value.as_bytes();
let mut len_buf = [0u8; 10];
let len_len = write_varint(bytes.len() as u64, &mut len_buf)?;
self.buf.put_slice(&len_buf[..len_len]);
self.buf.put_slice(bytes);
Ok(())
}
pub fn write_fixed32(&mut self, field_number: u32, value: u32) -> Result<()> {
self.write_tag(field_number, WireType::Fixed32)?;
self.buf.put_slice(&value.to_le_bytes());
Ok(())
}
pub fn write_fixed64(&mut self, field_number: u32, value: u64) -> Result<()> {
self.write_tag(field_number, WireType::Fixed64)?;
self.buf.put_slice(&value.to_le_bytes());
Ok(())
}
pub fn write_bytes(&mut self, field_number: u32, value: &[u8]) -> Result<()> {
self.write_tag(field_number, WireType::LengthDelimited)?;
let mut len_buf = [0u8; 10];
let len_len = write_varint(value.len() as u64, &mut len_buf)?;
self.buf.put_slice(&len_buf[..len_len]);
self.buf.put_slice(value);
Ok(())
}
pub fn write_raw(&mut self, raw_bytes: &[u8]) -> Result<()> {
self.buf.put_slice(raw_bytes);
Ok(())
}
pub fn write_map_entry(
&mut self,
field_number: u32,
key_encoded: &[u8],
value_encoded: &[u8],
) -> Result<()> {
let entry_len = key_encoded.len() + value_encoded.len();
self.write_tag(field_number, WireType::LengthDelimited)?;
let mut len_buf = [0u8; 10];
let len_len = write_varint(entry_len as u64, &mut len_buf)?;
self.buf.put_slice(&len_buf[..len_len]);
self.buf.put_slice(key_encoded);
self.buf.put_slice(value_encoded);
Ok(())
}
}