Compare commits
3 Commits
17ab0d1670
...
daa42d2d07
| Author | SHA1 | Date | |
|---|---|---|---|
| daa42d2d07 | |||
| 804ff3ead0 | |||
| 02a0b0d908 |
@@ -2,3 +2,4 @@
|
||||
test_gen_project
|
||||
test_types_gen_project
|
||||
test_map_gen_project
|
||||
test_grpc_project
|
||||
|
||||
Generated
+1143
-4
File diff suppressed because it is too large
Load Diff
@@ -4,6 +4,8 @@ members = [
|
||||
"codegen",
|
||||
"protos",
|
||||
"benches",
|
||||
"roto-tonic", "test_grpc_project",
|
||||
"examples/hello_world",
|
||||
]
|
||||
|
||||
exclude = [
|
||||
|
||||
@@ -8,8 +8,11 @@ Instead of deserializing binary protobuf data into Rust structs, roto scans a me
|
||||
construction — recording the byte offset of each field — then reads fields on demand directly from
|
||||
the original bytes. No heap allocation, no data copying, no full deserialization upfront.
|
||||
|
||||
Writing works the same way: you provide a fixed buffer and a builder writes fields directly into it,
|
||||
returning a slice of the bytes written.
|
||||
It also provides a first-class integration with the `tonic` gRPC framework via the `roto-tonic` crate,
|
||||
enabling zero-allocation request/response processing.
|
||||
|
||||
Writing works the same way: you provide a fixed buffer (or a `bytes::BufMut`) and a builder writes
|
||||
fields directly into it, returning a slice of the bytes written.
|
||||
|
||||
## Design
|
||||
|
||||
@@ -28,10 +31,15 @@ This will generate a file, src/hackers.rs.
|
||||
|
||||
## Generated code
|
||||
|
||||
For each protobuf message roto generates two types:
|
||||
For each protobuf message roto generates three types:
|
||||
|
||||
- **Reader struct** `MessageName<'a>` — borrows the original byte slice, zero-copy.
|
||||
- **Builder struct** `MessageNameBuilder<'b>` — writes into a caller-provided `&mut [u8]`.
|
||||
- **Builder struct** `MessageNameBuilder<'b>` — writes into a caller-provided `&mut [u8]` or `BufMut`.
|
||||
- **Owned struct** `OwnedMessageName` — owns the byte buffer and implements `RotoOwned`, providing a bridge to the `Reader`.
|
||||
|
||||
For each protobuf service, roto generates:
|
||||
|
||||
- **Service Trait** `ServiceName` — a `tonic`-compatible async trait for gRPC service implementations.
|
||||
|
||||
Nested message types are placed in a `pub mod message_name { ... }` module (snake_case of the
|
||||
parent message name) within the same generated file.
|
||||
@@ -314,12 +322,4 @@ The goal is to validate roto's implementation against the Proto3 specification.
|
||||
### Unsupported Features
|
||||
|
||||
- **Reserved Fields**: `reserved` statements are ignored.
|
||||
- **Services**: `service` and `rpc` definitions are ignored.
|
||||
- **Options**: Field and message options are ignored.
|
||||
|
||||
### Tasks
|
||||
|
||||
- [x] Analyze `roto/codegen` to determine which protobuf constructs are supported during code generation.
|
||||
- [x] Analyze `roto/runtime` to determine which wire types and protobuf types are supported during reading and writing.
|
||||
- [x] Compare findings with the Proto3 spec (https://protobuf.dev/reference/protobuf/proto3-spec/).
|
||||
- [x] Document supported and unsupported features in the README.
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
# Tasks for Tonic Integration
|
||||
|
||||
This document outlines the steps required to integrate the `roto` protobuf library with the `tonic` gRPC framework.
|
||||
|
||||
## Goals
|
||||
- Provide a `tonic::codec::Codec` implementation for `roto` messages.
|
||||
- Enable zero-allocation decoding of gRPC requests and responses.
|
||||
- Support efficient encoding of gRPC messages using `roto`'s `Builder` pattern.
|
||||
- Generate `tonic`-compatible service traits and client/server boilerplate via `protoc-gen-roto`.
|
||||
|
||||
## 1. Runtime Changes (`roto_runtime`)
|
||||
- [x] Add `bytes` as a dependency to `roto_runtime`.
|
||||
- [x] Modify `ProtoBuilder` to support writing to `bytes::BufMut` or provide a specialized `BufMut` based builder to facilitate integration with `tonic::codec::EncodeBuf`.
|
||||
- [x] Design and implement "Owned" message support:
|
||||
- [x] Create a mechanism (likely via codegen) to generate structs that hold `bytes::Bytes` and the field offsets (e.g., `OwnedMyMessage`).
|
||||
- [x] These structs will serve as the owned types required by `tonic`'s `Codec`.
|
||||
- [x] Add a method to convert an `OwnedMyMessage` into a zero-allocation `Reader` (e.g., `reader(&self) -> MyMessage<'_>`).
|
||||
|
||||
## 2. Tonic Codec Implementation
|
||||
- [x] Implement `tonic::codec::Codec` for `roto` messages.
|
||||
- [x] Implement `tonic::codec::Decoder` for `roto` messages:
|
||||
- [x] The decoder should take a `DecodeBuf` and produce an `OwnedMyMessage`.
|
||||
- [x] It must perform the initial scan of the buffer to populate the field offsets.
|
||||
- [x] Implement `tonic::codec::Encoder` for `roto` messages:
|
||||
- [x] The encoder should take an `OwnedMyMessage` and write its internal `bytes::Bytes` to the `EncodeBuf`.
|
||||
- [x] Since `roto` responses are built using `Builder` directly into a buffer, the `Encoder` will primarily handle copying these pre-built buffers.
|
||||
|
||||
## 3. Code Generation Extensions (`protoc-gen-roto`)
|
||||
- [x] Update the generator to produce `OwnedMyMessage` structs in addition to the `Reader` and `Builder` structs.
|
||||
- [x] Generate the `OwnedMyMessage::new(data: bytes::Bytes)` method for initial decoding.
|
||||
- [x] Implement gRPC service generation:
|
||||
- [x] Generate `tonic`-compatible service traits (using `#[tonic::async_trait]`).
|
||||
- [x] Generate client and server boilerplate that uses `OwnedMyMessage` as the request and response types.
|
||||
- [x] Ensure the generated code correctly maps protobuf services to Rust traits.
|
||||
|
||||
## 4. Testing and Validation
|
||||
- [x] Create a test gRPC project using the updated `protoc-gen-roto`.
|
||||
- [ ] Implement a sample gRPC service with:
|
||||
- [ ] A unary call.
|
||||
- [ ] A server-streaming call.
|
||||
- [ ] A client-streaming call.
|
||||
- [ ] A bidirectional-streaming call.
|
||||
- [ ] Verify that the integration is zero-allocation on the reading path.
|
||||
- [ ] Perform benchmark tests to compare performance with `prost`.
|
||||
@@ -1,6 +1,6 @@
|
||||
use crate::google::protobuf::descriptor::{
|
||||
DescriptorProto, EnumDescriptorProto, FieldDescriptorProto, FileDescriptorProto,
|
||||
FileDescriptorSet, MessageOptions, OneofDescriptorProto,
|
||||
FileDescriptorSet, MessageOptions, MethodDescriptorProto, OneofDescriptorProto, ServiceDescriptorProto,
|
||||
};
|
||||
use roto_runtime::ProtoAccessor;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
@@ -434,6 +434,26 @@ fn write_message(msg_proto: &DescriptorProto, output: &mut String) {
|
||||
|
||||
output.push_str(&format!(" pub fn finish(self) -> roto_runtime::Result<&'b mut [u8]> {{\n self.builder.finish()\n }}\n}}\n\n"));
|
||||
|
||||
output.push_str(&format!("pub struct Owned{} {{\n", msg_name));
|
||||
output.push_str(" pub data: bytes::Bytes,\n");
|
||||
output.push_str("}\n\n");
|
||||
|
||||
output.push_str(&format!("impl roto_runtime::RotoOwned for Owned{} {{\n", msg_name));
|
||||
output.push_str(&format!(" type Reader<'a> = {}<'a>;\n", msg_name));
|
||||
output.push_str(&format!(" fn reader(&self) -> {}<'_> {{\n", msg_name));
|
||||
output.push_str(&format!(" {}::new(&self.data).expect(\"failed to create reader\")\n", msg_name));
|
||||
output.push_str(" }\n");
|
||||
output.push_str("}\n\n");
|
||||
|
||||
output.push_str(&format!("impl roto_runtime::RotoMessage for Owned{} {{\n", msg_name));
|
||||
output.push_str(" fn decode(buf: bytes::Bytes) -> roto_runtime::Result<Self> {\n");
|
||||
output.push_str(&format!(" Ok(Owned{} {{ data: buf }})\n", msg_name));
|
||||
output.push_str(" }\n\n");
|
||||
output.push_str(" fn bytes(&self) -> bytes::Bytes {\n");
|
||||
output.push_str(" self.data.clone()\n");
|
||||
output.push_str(" }\n");
|
||||
output.push_str("}\n\n");
|
||||
|
||||
let mut nested_enums = Vec::new();
|
||||
for e_res in msg_proto.enum_type() {
|
||||
if let Ok((e, _)) = e_res {
|
||||
@@ -524,9 +544,13 @@ pub fn generate_rust_code(
|
||||
|
||||
let mut output = String::new();
|
||||
output.push_str("// @generated by protoc-gen-roto — do not edit\n");
|
||||
output.push_str("#![allow(unused_imports)]\n\n");
|
||||
output.push_str("#[allow(unused_imports)]\n\n");
|
||||
output.push_str("use roto_runtime::{ProtoAccessor, ProtoBuilder, Result, RotoError, read_varint, RepeatedFieldIterator};\n");
|
||||
output.push_str("use std::str;\n\n");
|
||||
output.push_str("use std::str;\n");
|
||||
output.push_str("use bytes::Bytes;\n");
|
||||
output.push_str("use tonic::{Request, Response, Status};\n");
|
||||
output.push_str("use tokio_stream::Stream;\n");
|
||||
output.push_str("use std::pin::Pin;\n\n");
|
||||
|
||||
for dep_res in file_proto.dependency() {
|
||||
let (dep_data, _) = dep_res.expect("Failed to iterate dependency");
|
||||
@@ -553,6 +577,15 @@ pub fn generate_rust_code(
|
||||
&mut output,
|
||||
);
|
||||
}
|
||||
|
||||
// Services
|
||||
for svc_res in file_proto.service() {
|
||||
let (svc_data, _) = svc_res.expect("Failed to iterate service");
|
||||
write_service(
|
||||
&ServiceDescriptorProto::new(svc_data).expect("Failed to parse ServiceDescriptorProto"),
|
||||
&mut output,
|
||||
);
|
||||
}
|
||||
generated_files.push((rust_file_name, output));
|
||||
}
|
||||
|
||||
@@ -608,3 +641,44 @@ pub fn generate_rust_code(
|
||||
|
||||
generated_files
|
||||
}
|
||||
|
||||
fn write_service(svc_proto: &ServiceDescriptorProto, output: &mut String) {
|
||||
let svc_name = to_pascal_case(svc_proto.name().unwrap());
|
||||
output.push_str(&format!("#[tonic::async_trait]\npub trait {}: Send + Sync + 'static {{\n", svc_name));
|
||||
|
||||
for method_res in svc_proto.method() {
|
||||
let (method_data, _) = method_res.expect("Failed to iterate method");
|
||||
let method_proto = MethodDescriptorProto::new(method_data).expect("Failed to parse MethodDescriptorProto");
|
||||
let method_name = to_snake_case(method_proto.name().unwrap());
|
||||
|
||||
let input_full_name = method_proto.input_type().unwrap();
|
||||
let output_full_name = method_proto.output_type().unwrap();
|
||||
|
||||
let input_type = input_full_name.split('.').last().unwrap();
|
||||
let output_type = output_full_name.split('.').last().unwrap();
|
||||
|
||||
let input_owned = format!("Owned{}", input_type);
|
||||
let output_owned = format!("Owned{}", output_type);
|
||||
|
||||
let client_streaming = method_proto.client_streaming().unwrap_or(false);
|
||||
let server_streaming = method_proto.server_streaming().unwrap_or(false);
|
||||
|
||||
let req_type = if client_streaming {
|
||||
format!("Request<tonic::Streaming<{}>>", input_owned)
|
||||
} else {
|
||||
format!("Request<{}>", input_owned)
|
||||
};
|
||||
|
||||
let resp_type = if server_streaming {
|
||||
format!("Response<Pin<Box<dyn Stream<Item = Result<{}, Status>> + Send>>>", output_owned)
|
||||
} else {
|
||||
format!("Response<{}>", output_owned)
|
||||
};
|
||||
|
||||
output.push_str(&format!(
|
||||
" async fn {}(&self, request: {}) -> std::result::Result<{}, Status>;\n",
|
||||
method_name, req_type, resp_type
|
||||
));
|
||||
}
|
||||
output.push_str("}\n\n");
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ fn test_generated_code_builds() {
|
||||
let cargo_toml_content =
|
||||
fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml");
|
||||
let updated_cargo_toml = format!(
|
||||
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\n\n[workspace]\n",
|
||||
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\nbytes = \"1.7\"\ntonic = \"0.12\"\ntokio-stream = \"0.1\"\n\n[workspace]\n",
|
||||
cargo_toml_content
|
||||
);
|
||||
fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml");
|
||||
|
||||
@@ -38,7 +38,7 @@ fn test_map_generated_code_builds() {
|
||||
let cargo_toml_content =
|
||||
fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml");
|
||||
let updated_cargo_toml = format!(
|
||||
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\n\n[workspace]\n",
|
||||
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\nbytes = \"1.7\"\ntonic = \"0.12\"\ntokio-stream = \"0.1\"\n\n[workspace]\n",
|
||||
cargo_toml_content
|
||||
);
|
||||
fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml");
|
||||
|
||||
@@ -38,7 +38,7 @@ fn test_types_generated_code_builds() {
|
||||
let cargo_toml_content =
|
||||
fs::read_to_string(&cargo_toml_path).expect("Failed to read Cargo.toml");
|
||||
let updated_cargo_toml = format!(
|
||||
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\n\n[workspace]\n",
|
||||
"{}\n\nroto-codegen = {{ path = \"..\" }}\nroto-runtime = {{ path = \"../../runtime\" }}\nbytes = \"1.7\"\ntonic = \"0.12\"\ntokio-stream = \"0.1\"\n\n[workspace]\n",
|
||||
cargo_toml_content
|
||||
);
|
||||
fs::write(cargo_toml_path, updated_cargo_toml).expect("Failed to write Cargo.toml");
|
||||
|
||||
@@ -0,0 +1,29 @@
|
||||
[package]
|
||||
name = "hello-world"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[[bin]]
|
||||
name = "server"
|
||||
path = "src/bin/server.rs"
|
||||
|
||||
[[bin]]
|
||||
name = "client"
|
||||
path = "src/bin/client.rs"
|
||||
|
||||
[dependencies]
|
||||
roto-runtime = { path = "../../runtime" }
|
||||
roto-tonic = { path = "../../roto-tonic" }
|
||||
tonic = "0.12"
|
||||
tokio = { version = "1.38", features = ["full"] }
|
||||
tokio-stream = "0.1"
|
||||
bytes = "1.7"
|
||||
prost = "0.13"
|
||||
tower = "0.4"
|
||||
futures-util = "0.3"
|
||||
http-body-util = "0.1"
|
||||
http = "1.1"
|
||||
http-body = "1.0"
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.12"
|
||||
@@ -0,0 +1,27 @@
|
||||
fn main() {
|
||||
let proto_file = "proto/hello.proto";
|
||||
let out_dir = std::env::var("OUT_DIR").unwrap();
|
||||
let dest_path = std::path::Path::new(&out_dir).join("hello.rs");
|
||||
|
||||
// Find the protoc-gen-roto binary
|
||||
// In a real scenario, this should be passed as an environment variable or found in PATH
|
||||
// For this example, we'll try to find it in the target directory
|
||||
let target_dir = std::env::current_dir().unwrap().join("../../target/debug");
|
||||
let plugin_path = target_dir.join("protoc-gen-roto");
|
||||
|
||||
if !plugin_path.exists() {
|
||||
panic!("protoc-gen-roto plugin not found at {:?}", plugin_path);
|
||||
}
|
||||
|
||||
let status = std::process::Command::new("protoc")
|
||||
.arg(format!("--plugin=protoc-gen-roto={}", plugin_path.display()))
|
||||
.arg(format!("--roto_out={}", out_dir))
|
||||
.arg(format!("--roto_opt=src=proto")) // Assuming the plugin handles this or we just pass it
|
||||
.arg(proto_file)
|
||||
.status()
|
||||
.expect("Failed to execute protoc");
|
||||
|
||||
if !status.success() {
|
||||
panic!("protoc failed with status {}", status);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package hello;
|
||||
|
||||
service HelloWorldService {
|
||||
rpc HelloWorld (HelloRequest) returns (HelloResponse);
|
||||
}
|
||||
|
||||
message HelloRequest {
|
||||
string name = 1;
|
||||
}
|
||||
|
||||
message HelloResponse {
|
||||
string message = 1;
|
||||
}
|
||||
@@ -0,0 +1,210 @@
|
||||
// @generated by protoc-gen-roto — do not edit
|
||||
#[allow(unused_imports)]
|
||||
|
||||
use roto_runtime::{ProtoAccessor, ProtoBuilder, Result, RotoError, read_varint, RepeatedFieldIterator};
|
||||
use std::str;
|
||||
use bytes::Bytes;
|
||||
use tonic::{Request, Response, Status};
|
||||
use tokio_stream::Stream;
|
||||
use std::pin::Pin;
|
||||
|
||||
|
||||
pub struct HelloRequest<'a> {
|
||||
accessor: roto_runtime::ProtoAccessor<'a>,
|
||||
name_offset: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a> HelloRequest<'a> {
|
||||
pub fn new(data: &'a [u8]) -> roto_runtime::Result<Self> {
|
||||
let accessor = roto_runtime::ProtoAccessor::new(data)?;
|
||||
let mut name_offset = None;
|
||||
for item in accessor.fields() {
|
||||
let (offset, tag, _) = item?;
|
||||
if tag.field_number == 1 { name_offset = Some(offset); }
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
accessor,
|
||||
name_offset,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn name(&self) -> roto_runtime::Result<&'a str> {
|
||||
let offset = self.name_offset.ok_or(roto_runtime::RotoError::FieldNotFound)?;
|
||||
let (bytes, _) = self.accessor.get_value_at(offset)?;
|
||||
str::from_utf8(bytes).map_err(|_| roto_runtime::RotoError::WireFormatViolation)
|
||||
}
|
||||
|
||||
pub fn name_or_default(&self) -> roto_runtime::Result<&'a str> {
|
||||
self.name().or(Ok(""))
|
||||
}
|
||||
|
||||
pub fn has_name(&self) -> bool { self.name_offset.is_some() }
|
||||
|
||||
pub fn raw_fields(&self) -> roto_runtime::RawFieldIterator<'a> {
|
||||
self.accessor.raw_fields()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct HelloRequestBuilder<'b> {
|
||||
builder: roto_runtime::ProtoBuilder<'b>,
|
||||
name_written: bool,
|
||||
}
|
||||
|
||||
impl<'b> HelloRequestBuilder<'b> {
|
||||
pub fn builder(buf: &mut [u8]) -> HelloRequestBuilder<'_> {
|
||||
HelloRequestBuilder {
|
||||
builder: roto_runtime::ProtoBuilder::new(buf),
|
||||
name_written: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn name(mut self, value: &str) -> roto_runtime::Result<Self> {
|
||||
self.builder.write_string(1, value)?;
|
||||
self.name_written = true;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn with(mut self, msg: &HelloRequest<'_>) -> roto_runtime::Result<Self> {
|
||||
for item in msg.raw_fields() {
|
||||
let (field_number, raw_bytes) = item?;
|
||||
let is_written = match field_number {
|
||||
1 => self.name_written,
|
||||
_ => false,
|
||||
};
|
||||
if !is_written {
|
||||
self.builder.write_raw(raw_bytes)?;
|
||||
}
|
||||
}
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn finish(self) -> roto_runtime::Result<&'b mut [u8]> {
|
||||
self.builder.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OwnedHelloRequest {
|
||||
pub data: bytes::Bytes,
|
||||
}
|
||||
|
||||
impl roto_runtime::RotoOwned for OwnedHelloRequest {
|
||||
type Reader<'a> = HelloRequest<'a>;
|
||||
fn reader(&self) -> HelloRequest<'_> {
|
||||
HelloRequest::new(&self.data).expect("failed to create reader")
|
||||
}
|
||||
}
|
||||
|
||||
impl roto_runtime::RotoMessage for OwnedHelloRequest {
|
||||
fn decode(buf: bytes::Bytes) -> roto_runtime::Result<Self> {
|
||||
Ok(OwnedHelloRequest { data: buf })
|
||||
}
|
||||
|
||||
fn bytes(&self) -> bytes::Bytes {
|
||||
self.data.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct HelloResponse<'a> {
|
||||
accessor: roto_runtime::ProtoAccessor<'a>,
|
||||
message_offset: Option<usize>,
|
||||
}
|
||||
|
||||
impl<'a> HelloResponse<'a> {
|
||||
pub fn new(data: &'a [u8]) -> roto_runtime::Result<Self> {
|
||||
let accessor = roto_runtime::ProtoAccessor::new(data)?;
|
||||
let mut message_offset = None;
|
||||
for item in accessor.fields() {
|
||||
let (offset, tag, _) = item?;
|
||||
if tag.field_number == 1 { message_offset = Some(offset); }
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
accessor,
|
||||
message_offset,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn message(&self) -> roto_runtime::Result<&'a str> {
|
||||
let offset = self.message_offset.ok_or(roto_runtime::RotoError::FieldNotFound)?;
|
||||
let (bytes, _) = self.accessor.get_value_at(offset)?;
|
||||
str::from_utf8(bytes).map_err(|_| roto_runtime::RotoError::WireFormatViolation)
|
||||
}
|
||||
|
||||
pub fn message_or_default(&self) -> roto_runtime::Result<&'a str> {
|
||||
self.message().or(Ok(""))
|
||||
}
|
||||
|
||||
pub fn has_message(&self) -> bool { self.message_offset.is_some() }
|
||||
|
||||
pub fn raw_fields(&self) -> roto_runtime::RawFieldIterator<'a> {
|
||||
self.accessor.raw_fields()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct HelloResponseBuilder<'b> {
|
||||
builder: roto_runtime::ProtoBuilder<'b>,
|
||||
message_written: bool,
|
||||
}
|
||||
|
||||
impl<'b> HelloResponseBuilder<'b> {
|
||||
pub fn builder(buf: &mut [u8]) -> HelloResponseBuilder<'_> {
|
||||
HelloResponseBuilder {
|
||||
builder: roto_runtime::ProtoBuilder::new(buf),
|
||||
message_written: false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn message(mut self, value: &str) -> roto_runtime::Result<Self> {
|
||||
self.builder.write_string(1, value)?;
|
||||
self.message_written = true;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn with(mut self, msg: &HelloResponse<'_>) -> roto_runtime::Result<Self> {
|
||||
for item in msg.raw_fields() {
|
||||
let (field_number, raw_bytes) = item?;
|
||||
let is_written = match field_number {
|
||||
1 => self.message_written,
|
||||
_ => false,
|
||||
};
|
||||
if !is_written {
|
||||
self.builder.write_raw(raw_bytes)?;
|
||||
}
|
||||
}
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
pub fn finish(self) -> roto_runtime::Result<&'b mut [u8]> {
|
||||
self.builder.finish()
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OwnedHelloResponse {
|
||||
pub data: bytes::Bytes,
|
||||
}
|
||||
|
||||
impl roto_runtime::RotoOwned for OwnedHelloResponse {
|
||||
type Reader<'a> = HelloResponse<'a>;
|
||||
fn reader(&self) -> HelloResponse<'_> {
|
||||
HelloResponse::new(&self.data).expect("failed to create reader")
|
||||
}
|
||||
}
|
||||
|
||||
impl roto_runtime::RotoMessage for OwnedHelloResponse {
|
||||
fn decode(buf: bytes::Bytes) -> roto_runtime::Result<Self> {
|
||||
Ok(OwnedHelloResponse { data: buf })
|
||||
}
|
||||
|
||||
fn bytes(&self) -> bytes::Bytes {
|
||||
self.data.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
pub trait HelloWorldService: Send + Sync + 'static {
|
||||
async fn hello_world(&self, request: Request<OwnedHelloRequest>) -> std::result::Result<Response<OwnedHelloResponse>, Status>;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,64 @@
|
||||
use tonic::Request;
|
||||
use roto_tonic::RotoCodec;
|
||||
use hello::{HelloWorldService, OwnedHelloRequest, OwnedHelloResponse};
|
||||
use roto_runtime::RotoOwned;
|
||||
use std::task::{Context, Poll};
|
||||
use tower::Service;
|
||||
|
||||
pub mod hello {
|
||||
include!("../../proto/hello.rs");
|
||||
}
|
||||
|
||||
struct ReadyService<S>(S);
|
||||
|
||||
impl<S, Req> Service<Req> for ReadyService<S>
|
||||
where
|
||||
S: Service<Req>,
|
||||
{
|
||||
type Response = S::Response;
|
||||
type Error = S::Error;
|
||||
type Future = S::Future;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: Req) -> S::Future {
|
||||
self.0.call(req)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let channel = tonic::transport::Channel::from_static("http://[::1]:50051")
|
||||
.connect()
|
||||
.await?;
|
||||
|
||||
let ready_channel = ReadyService(channel);
|
||||
let mut client = tonic::client::Grpc::new(ready_channel);
|
||||
|
||||
// We need to specify the method path. For HelloWorldService/HelloWorld, it is "/hello.HelloWorldService/HelloWorld"
|
||||
let mut buf = vec![0u8; 1024];
|
||||
let slice = hello::HelloRequestBuilder::builder(&mut buf)
|
||||
.name("Roto").unwrap()
|
||||
.finish().unwrap();
|
||||
|
||||
let request = OwnedHelloRequest {
|
||||
data: bytes::Bytes::copy_from_slice(slice),
|
||||
};
|
||||
|
||||
// In tonic's Grpc client, we specify the codec separately.
|
||||
let response = client
|
||||
.unary(
|
||||
Request::new(request),
|
||||
http::uri::PathAndQuery::from_static("/hello.HelloWorldService/HelloWorld"),
|
||||
RotoCodec::<OwnedHelloResponse, OwnedHelloRequest>::default(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let response_msg: OwnedHelloResponse = response.into_inner();
|
||||
let reader = response_msg.reader();
|
||||
println!("Server responded: {}", reader.message().unwrap_or("No message"));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
use std::pin::Pin;
|
||||
use std::future::Future;
|
||||
use std::task::{Context, Poll};
|
||||
use std::sync::Arc;
|
||||
use tonic::{transport::Server, Request, Response, Status};
|
||||
use roto_tonic::RotoCodec;
|
||||
use hello::{HelloWorldService, OwnedHelloRequest, OwnedHelloResponse};
|
||||
use tower::Service;
|
||||
use bytes::{Bytes, Buf, BufMut};
|
||||
use tonic::body::BoxBody;
|
||||
use futures_util::StreamExt;
|
||||
use roto_runtime::{RotoOwned, RotoMessage};
|
||||
use http_body_util::BodyExt;
|
||||
use http_body::Body;
|
||||
|
||||
pub mod hello {
|
||||
include!("../../proto/hello.rs");
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct MyHelloWorld {}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl HelloWorldService for MyHelloWorld {
|
||||
async fn hello_world(
|
||||
&self,
|
||||
request: Request<OwnedHelloRequest>,
|
||||
) -> Result<Response<OwnedHelloResponse>, Status> {
|
||||
let req = request.into_inner();
|
||||
let reader = req.reader();
|
||||
let name = reader.name().unwrap_or("Unknown");
|
||||
|
||||
let mut buf = vec![0u8; 1024];
|
||||
let slice = hello::HelloResponseBuilder::builder(&mut buf)
|
||||
.message(&format!("Hello {}!", name)).unwrap()
|
||||
.finish().unwrap();
|
||||
|
||||
let reply = OwnedHelloResponse {
|
||||
data: bytes::Bytes::copy_from_slice(slice),
|
||||
};
|
||||
|
||||
Ok(Response::new(reply))
|
||||
}
|
||||
}
|
||||
|
||||
// --- Tonic Glue ---
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HelloWorldServer {
|
||||
inner: Arc<MyHelloWorld>,
|
||||
}
|
||||
|
||||
impl HelloWorldServer {
|
||||
pub fn new(inner: MyHelloWorld) -> Self {
|
||||
Self { inner: Arc::new(inner) }
|
||||
}
|
||||
}
|
||||
|
||||
impl tonic::server::NamedService for HelloWorldServer {
|
||||
const NAME: &'static str = "hello.HelloWorldService";
|
||||
}
|
||||
|
||||
struct StatusBody(Option<Bytes>);
|
||||
|
||||
impl Body for StatusBody {
|
||||
type Data = Bytes;
|
||||
type Error = Status;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
||||
if let Some(data) = self.0.take() {
|
||||
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
|
||||
} else {
|
||||
Poll::Ready(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Service<http::Request<BoxBody>> for HelloWorldServer {
|
||||
type Response = http::Response<BoxBody>;
|
||||
type Error = std::convert::Infallible;
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, req: http::Request<BoxBody>) -> Self::Future {
|
||||
let inner = self.inner.clone();
|
||||
println!("Server received request: {} {}", req.method(), req.uri());
|
||||
|
||||
Box::pin(async move {
|
||||
let body = req.into_body();
|
||||
let bytes_vec = body.collect().await.map_err(|e| {
|
||||
println!("Body collect error: {}", e);
|
||||
panic!("Body collect error: {}", e);
|
||||
})?.to_bytes();
|
||||
println!("Collected body bytes: {} bytes", bytes_vec.len());
|
||||
|
||||
if bytes_vec.len() < 5 {
|
||||
println!("Body too short: {} bytes", bytes_vec.len());
|
||||
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
|
||||
return Ok(http::Response::builder()
|
||||
.status(200)
|
||||
.body(res_body)
|
||||
.unwrap());
|
||||
}
|
||||
|
||||
let data = &bytes_vec[5..];
|
||||
println!("Decoding request from {} bytes", data.len());
|
||||
let request_msg = match OwnedHelloRequest::decode(Bytes::copy_from_slice(data)) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
println!("Decode error: {}", e);
|
||||
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
|
||||
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
|
||||
}
|
||||
};
|
||||
|
||||
println!("Request decoded successfully");
|
||||
let response = match inner.hello_world(Request::new(request_msg)).await {
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
println!("Service error: {}", e);
|
||||
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(vec![0, 0, 0, 0, 0]))));
|
||||
return Ok(http::Response::builder().status(200).body(res_body).unwrap());
|
||||
}
|
||||
};
|
||||
|
||||
let response_msg = response.into_inner();
|
||||
let response_bytes = response_msg.bytes();
|
||||
println!("Service responded with {} bytes", response_bytes.len());
|
||||
|
||||
let mut res_buf = vec![0u8; 5 + response_bytes.len()];
|
||||
res_buf[0] = 0;
|
||||
let len = response_bytes.len() as u32;
|
||||
res_buf[1..5].copy_from_slice(&len.to_be_bytes());
|
||||
res_buf[5..].copy_from_slice(&response_bytes);
|
||||
|
||||
let res_body = BoxBody::new(StatusBody(Some(Bytes::from(res_buf))));
|
||||
Ok(http::Response::builder()
|
||||
.status(200)
|
||||
.header("content-type", "application/grpc")
|
||||
.body(res_body)
|
||||
.unwrap())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let addr: std::net::SocketAddr = "[::1]:50051".parse()?;
|
||||
let hello = MyHelloWorld::default();
|
||||
|
||||
println!("Server listening on {}", addr);
|
||||
|
||||
Server::builder()
|
||||
.add_service(HelloWorldServer::new(hello))
|
||||
.serve(addr)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
[package]
|
||||
name = "roto-tonic"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
roto-runtime = { path = "../runtime" }
|
||||
tonic = "0.12"
|
||||
bytes = "1.7"
|
||||
prost = "0.13"
|
||||
@@ -0,0 +1,72 @@
|
||||
use std::marker::PhantomData;
|
||||
use tonic::codec::{Codec, Decoder, Encoder, DecodeBuf, EncodeBuf};
|
||||
use bytes::{Buf, BufMut};
|
||||
use roto_runtime::RotoMessage;
|
||||
|
||||
pub struct RotoCodec<T, U> {
|
||||
_phantom: PhantomData<(T, U)>,
|
||||
}
|
||||
|
||||
impl<T, U> Default for RotoCodec<T, U> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
_phantom: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, U> Codec for RotoCodec<T, U>
|
||||
where
|
||||
T: RotoMessage + Send + 'static,
|
||||
U: RotoMessage + Send + 'static,
|
||||
{
|
||||
type Encode = U;
|
||||
type Decode = T;
|
||||
type Encoder = RotoEncoder<U>;
|
||||
type Decoder = RotoDecoder<T>;
|
||||
|
||||
fn encoder(&mut self) -> Self::Encoder {
|
||||
RotoEncoder(PhantomData)
|
||||
}
|
||||
|
||||
fn decoder(&mut self) -> Self::Decoder {
|
||||
RotoDecoder(PhantomData)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RotoEncoder<U>(PhantomData<U>);
|
||||
|
||||
impl<U> Encoder for RotoEncoder<U>
|
||||
where
|
||||
U: RotoMessage,
|
||||
{
|
||||
type Item = U;
|
||||
type Error = tonic::Status;
|
||||
|
||||
fn encode(&mut self, message: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
|
||||
buf.put_slice(&message.bytes());
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct RotoDecoder<T>(PhantomData<T>);
|
||||
|
||||
impl<T> Decoder for RotoDecoder<T>
|
||||
where
|
||||
T: RotoMessage,
|
||||
{
|
||||
type Item = T;
|
||||
type Error = tonic::Status;
|
||||
|
||||
fn decode(&mut self, buf: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
|
||||
if buf.remaining() == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let bytes = buf.copy_to_bytes(buf.remaining());
|
||||
match T::decode(bytes) {
|
||||
Ok(msg) => Ok(Some(msg)),
|
||||
Err(e) => Err(tonic::Status::internal(format!("Roto decode error: {}", e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -4,3 +4,4 @@ version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
bytes = "1.7"
|
||||
|
||||
+94
-1
@@ -1,4 +1,5 @@
|
||||
use std::fmt;
|
||||
use bytes::BufMut;
|
||||
|
||||
pub struct MapFieldIterator<'a> {
|
||||
inner: RepeatedFieldIterator<'a>,
|
||||
@@ -54,8 +55,17 @@ impl std::error::Error for RotoError {}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, RotoError>;
|
||||
|
||||
pub trait RotoOwned {
|
||||
type Reader<'a> where Self: 'a;
|
||||
fn reader(&self) -> Self::Reader<'_>;
|
||||
}
|
||||
|
||||
pub trait RotoMessage: Sized {
|
||||
fn decode(buf: bytes::Bytes) -> Result<Self>;
|
||||
fn bytes(&self) -> bytes::Bytes;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
#[repr(u8)]
|
||||
pub enum WireType {
|
||||
Varint = 0,
|
||||
Fixed64 = 1,
|
||||
@@ -818,3 +828,86 @@ impl<'a> ProtoBuilder<'a> {
|
||||
Ok(&mut self.buf[..self.pos])
|
||||
}
|
||||
}
|
||||
|
||||
pub struct BufMutBuilder<'a, B: BufMut> {
|
||||
buf: &'a mut B,
|
||||
}
|
||||
|
||||
impl<'a, B: BufMut> BufMutBuilder<'a, B> {
|
||||
pub fn new(buf: &'a mut B) -> Self {
|
||||
Self { buf }
|
||||
}
|
||||
|
||||
fn write_tag(&mut self, field_number: u32, wire_type: WireType) -> Result<()> {
|
||||
let mut temp = [0u8; 10];
|
||||
let len = Tag::encode(field_number, wire_type, &mut temp)?;
|
||||
self.buf.put_slice(&temp[..len]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_varint(&mut self, field_number: u32, value: u64) -> Result<()> {
|
||||
self.write_tag(field_number, WireType::Varint)?;
|
||||
let mut temp = [0u8; 10];
|
||||
let len = write_varint(value, &mut temp)?;
|
||||
self.buf.put_slice(&temp[..len]);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_int32(&mut self, field_number: u32, value: i32) -> Result<()> {
|
||||
self.write_varint(field_number, value as u64)
|
||||
}
|
||||
|
||||
pub fn write_string(&mut self, field_number: u32, value: &str) -> Result<()> {
|
||||
self.write_tag(field_number, WireType::LengthDelimited)?;
|
||||
let bytes = value.as_bytes();
|
||||
let mut len_buf = [0u8; 10];
|
||||
let len_len = write_varint(bytes.len() as u64, &mut len_buf)?;
|
||||
self.buf.put_slice(&len_buf[..len_len]);
|
||||
self.buf.put_slice(bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_fixed32(&mut self, field_number: u32, value: u32) -> Result<()> {
|
||||
self.write_tag(field_number, WireType::Fixed32)?;
|
||||
self.buf.put_slice(&value.to_le_bytes());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_fixed64(&mut self, field_number: u32, value: u64) -> Result<()> {
|
||||
self.write_tag(field_number, WireType::Fixed64)?;
|
||||
self.buf.put_slice(&value.to_le_bytes());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_bytes(&mut self, field_number: u32, value: &[u8]) -> Result<()> {
|
||||
self.write_tag(field_number, WireType::LengthDelimited)?;
|
||||
let mut len_buf = [0u8; 10];
|
||||
let len_len = write_varint(value.len() as u64, &mut len_buf)?;
|
||||
self.buf.put_slice(&len_buf[..len_len]);
|
||||
self.buf.put_slice(value);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_raw(&mut self, raw_bytes: &[u8]) -> Result<()> {
|
||||
self.buf.put_slice(raw_bytes);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn write_map_entry(
|
||||
&mut self,
|
||||
field_number: u32,
|
||||
key_encoded: &[u8],
|
||||
value_encoded: &[u8],
|
||||
) -> Result<()> {
|
||||
let entry_len = key_encoded.len() + value_encoded.len();
|
||||
self.write_tag(field_number, WireType::LengthDelimited)?;
|
||||
|
||||
let mut len_buf = [0u8; 10];
|
||||
let len_len = write_varint(entry_len as u64, &mut len_buf)?;
|
||||
self.buf.put_slice(&len_buf[..len_len]);
|
||||
|
||||
self.buf.put_slice(key_encoded);
|
||||
self.buf.put_slice(value_encoded);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user