Streams in gRPC
"It's streamin' time."
-- Morbius, probably
Thus far, we have spoken about gRPC as about a purely request-response protocol. However, as great as this paradigm is for many applications, it is not always what is needed exactly.
Sometimes, we may want to establish a recurring communication channel with a client and it would be great to be able to do this with gRPC and spare ourselves from having to introduce yet another technology to our stack, which would require developers to learn and maintain.
Furthermore, there are things which are not optimal to be sent in a single go, such as files, and data that you can start processing before all of it has arrived. Having to wait for it would have been ineffective and cumbersome.
For this reason, gRPC has introduced the concept of gRPC streams.
We can liken streams to Rust iterators, or to, well, async streams in
general, as seen for example in the futures crate.
This similarity is quite handy because it lets us implement the Stream
trait for the Tonic's representation of gRPC streams, and so we can ergonomically
handle them as such.
You can see that the implementation exists and what else is available here:
https://docs.rs/tonic/latest/tonic/struct.Streaming.html
Declaring a stream in our .proto files is quite easy, we use the
stream keyword.
Streams may be present both on the client-side and the server-side. The terminology refers to the originator of said stream.
Server-side streaming
We are talking about Server-side streaming when a client sends a request and gets back a stream to read a sequence of messages in return. The client reads from the returned stream until there are no more messages to be read.
You can specify a server-side streaming RPC by placing the stream
keyword before the response type
#![allow(unused)] fn main() { // Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {} }
Here is a full example from the Rust side of things:
pub mod pb { tonic::include_proto!("grpc.examples.echo"); } use futures::Stream; use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; type EchoResult<T> = Result<Response<T>, Status>; type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>; fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { let mut err: &(dyn Error + 'static) = err_status; loop { if let Some(io_err) = err.downcast_ref::<std::io::Error>() { return Some(io_err); } // h2::Error do not expose std::io::Error with `source()` // https://github.com/hyperium/h2/pull/462 if let Some(h2_err) = err.downcast_ref::<h2::Error>() { if let Some(io_err) = h2_err.get_io() { return Some(io_err); } } err = match err.source() { Some(err) => err, None => return None, }; } } #[derive(Debug)] pub struct EchoServer {} #[tonic::async_trait] impl pb::echo_server::Echo for EchoServer { async fn unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse> { Err(Status::unimplemented("not implemented")) } type ServerStreamingEchoStream = ResponseStream; async fn server_streaming_echo( &self, req: Request<EchoRequest>, ) -> EchoResult<Self::ServerStreamingEchoStream> { println!("EchoServer::server_streaming_echo"); println!("\tclient connected from: {:?}", req.remote_addr()); // creating infinite stream with requested message let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message, }); let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); // spawn and channel are required if you want handle "disconnect" functionality // the `out_stream` will not be polled after client disconnect let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::<_, Status>::Ok(item)).await { Ok(_) => { // item (server response) was queued to be send to client } Err(_item) => { // output_stream was build from rx and both are dropped break; } } } println!("\tclient disconnected"); }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ServerStreamingEchoStream )) } async fn client_streaming_echo( &self, _: Request<Streaming<EchoRequest>>, ) -> EchoResult<EchoResponse> { Err(Status::unimplemented("not implemented")) } type BidirectionalStreamingEchoStream = ResponseStream; async fn bidirectional_streaming_echo( &self, req: Request<Streaming<EchoRequest>>, ) -> EchoResult<Self::BidirectionalStreamingEchoStream> { println!("EchoServer::bidirectional_streaming_echo"); let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); // this spawn here is required if you want to handle connection error. // If we just map `in_stream` and write it back as `out_stream` the `out_stream` // will be drooped when connection error occurs and error will never be propagated // to mapped version of `in_stream`. tokio::spawn(async move { while let Some(result) = in_stream.next().await { match result { Ok(v) => tx .send(Ok(EchoResponse { message: v.message })) .await .expect("working rx"), Err(err) => { if let Some(io_err) = match_for_io_error(&err) { if io_err.kind() == ErrorKind::BrokenPipe { // here you can handle special case when client // disconnected in unexpected way eprintln!("\tclient disconnected: broken pipe"); break; } } match tx.send(Err(err)).await { Ok(_) => (), Err(_err) => break, // response was droped } } } } println!("\tstream ended"); }); // echo just write the same data that was received let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream )) } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let server = EchoServer {}; Server::builder() .add_service(pb::echo_server::EchoServer::new(server)) .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap()) .await .unwrap(); Ok(()) }
Client-side streams
Client-side streaming RPC is when a client writes a sequence of messages and sends them to the server. Once the client is done writing all of the messages, it waits for the other side to read them and return its response.
Similarly to the previous case, you use the stream keyword, but this time
for the message parameter:
#![allow(unused)] fn main() { // Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {} }
And on the Rust side, it might look something like this:
pub mod pb { tonic::include_proto!("grpc.examples.echo"); } use futures::Stream; use std::{error::Error, io::ErrorKind, net::ToSocketAddrs, pin::Pin, time::Duration}; use tokio::sync::mpsc; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; type EchoResult<T> = Result<Response<T>, Status>; type ResponseStream = Pin<Box<dyn Stream<Item = Result<EchoResponse, Status>> + Send>>; fn match_for_io_error(err_status: &Status) -> Option<&std::io::Error> { let mut err: &(dyn Error + 'static) = err_status; loop { if let Some(io_err) = err.downcast_ref::<std::io::Error>() { return Some(io_err); } // h2::Error do not expose std::io::Error with `source()` // https://github.com/hyperium/h2/pull/462 if let Some(h2_err) = err.downcast_ref::<h2::Error>() { if let Some(io_err) = h2_err.get_io() { return Some(io_err); } } err = match err.source() { Some(err) => err, None => return None, }; } } #[derive(Debug)] pub struct EchoServer {} #[tonic::async_trait] impl pb::echo_server::Echo for EchoServer { async fn unary_echo(&self, _: Request<EchoRequest>) -> EchoResult<EchoResponse> { Err(Status::unimplemented("not implemented")) } type ServerStreamingEchoStream = ResponseStream; async fn server_streaming_echo( &self, req: Request<EchoRequest>, ) -> EchoResult<Self::ServerStreamingEchoStream> { println!("EchoServer::server_streaming_echo"); println!("\tclient connected from: {:?}", req.remote_addr()); // creating infinite stream with requested message let repeat = std::iter::repeat(EchoResponse { message: req.into_inner().message, }); let mut stream = Box::pin(tokio_stream::iter(repeat).throttle(Duration::from_millis(200))); // spawn and channel are required if you want handle "disconnect" functionality // the `out_stream` will not be polled after client disconnect let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { while let Some(item) = stream.next().await { match tx.send(Result::<_, Status>::Ok(item)).await { Ok(_) => { // item (server response) was queued to be send to client } Err(_item) => { // output_stream was build from rx and both are dropped break; } } } println!("\tclient disconnected"); }); let output_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(output_stream) as Self::ServerStreamingEchoStream )) } async fn client_streaming_echo( &self, _: Request<Streaming<EchoRequest>>, ) -> EchoResult<EchoResponse> { Err(Status::unimplemented("not implemented")) } type BidirectionalStreamingEchoStream = ResponseStream; async fn bidirectional_streaming_echo( &self, req: Request<Streaming<EchoRequest>>, ) -> EchoResult<Self::BidirectionalStreamingEchoStream> { println!("EchoServer::bidirectional_streaming_echo"); let mut in_stream = req.into_inner(); let (tx, rx) = mpsc::channel(128); // this spawn here is required if you want to handle connection error. // If we just map `in_stream` and write it back as `out_stream` the `out_stream` // will be drooped when connection error occurs and error will never be propagated // to mapped version of `in_stream`. tokio::spawn(async move { while let Some(result) = in_stream.next().await { match result { Ok(v) => tx .send(Ok(EchoResponse { message: v.message })) .await .expect("working rx"), Err(err) => { if let Some(io_err) = match_for_io_error(&err) { if io_err.kind() == ErrorKind::BrokenPipe { // here you can handle special case when client // disconnected in unexpected way eprintln!("\tclient disconnected: broken pipe"); break; } } match tx.send(Err(err)).await { Ok(_) => (), Err(_err) => break, // response was droped } } } } println!("\tstream ended"); }); // echo just write the same data that was received let out_stream = ReceiverStream::new(rx); Ok(Response::new( Box::pin(out_stream) as Self::BidirectionalStreamingEchoStream )) } } #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { let server = EchoServer {}; Server::builder() .add_service(pb::echo_server::EchoServer::new(server)) .serve("[::1]:50051".to_socket_addrs().unwrap().next().unwrap()) .await .unwrap(); Ok(()) }
Bi-directional
Streaming can also be done bi-directionally, where both the input and the result of the RPC are streams. The two streams will operate independently, so clients and servers can write message in whatever order they prefer.
This means that you can either "chat" over the RPC call, or you might in-fact have a use case where you send a sequence of messages and only when you are done sending do you read the response messages.
Bi-directional streams are great in cases where you can design your application with stream-oriented thinking in mind.
Of course, bi-directional streams use the stream keyword on both sides of the
exchange:
#![allow(unused)] fn main() { // Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} }
Load balancing
gRPC does not work effectively with a number of load balancers. Level 4 load balancers distribute TCP connections across endpoints. That is fine with an HTTP1.2 API, but not for gRPC with HTTP2, since it multiplexes calls on a single TCP connection. All gRPC calls then go to one endpoint and there is no load-balancing.
Also keep in mind that you can only load balance gRPC calls that do not have any streams, since for a stream, once the call is established, all messages sent over that stream go to one endpoint.
If you need to balance gRPC, you have essentially two options:
- client-side load balancing, where you implement it yourself
- proxy load balancing, where you use a level 7 (application) proxy
The proxy you want to use must understand HTTP2, and be able to distribute gRPC calls multiplexed on one HTTP2 connection across multiple endpoints. Some suitable proxies are:
- Envoy
- Linkerd
- YARP (Yet Another Reverse Proxy)
Using streams for performance
In cases, where you have a lot of unary calls and are encountering a performance bottleneck, a common optimization is to instead use a bi-directional stream, wherein messages sent back and broth work fill the role of the previous unary calls.
This is because streamed data is sent in the same HTTP2 request and that eliminates the overhead of creating a new request for each call over and over again.
What to watch out for with streams
Streams are not completely magic. They may be interrupted by errors in the service or in the connection. You need to handle that eventuality and restart the stream if necessary. You also need to design your application in such a way that a stream being interrupted is not catastrophic, and optionally, in a way such that restarting the stream does not require redoing all of the work you have already done.
Writing into streams may not be thread-safe in some implementations. In Rust, this will be mostly handled by Rust itself, as you will get an error at compile time, generally, but that may not be the case for other languages. Keep in mind that this may happen, for example in Python applications, and ensure that you are only using thread-safe utilities in a multi-threaded context. Corruption via data races may also lead to gRPC errors, so you should make sure to handle that eventuality, especially if you do not trust the other side that you are talking to.
Lastly, a gRPC streaming method is limited to only one sending type of message and only one receiving type of message. If you expect to be sending many different types of data, and do not expect this particular usecase to be a bottleneck (as a matter of fact, it is better to verify that to expect bottlenecks, so that we can prevent over-optimization with dubious benefits), consider just splitting it into multiple calls. This will also make your API clearer in case it were to get too complication.