gRPC & Tower & Layers

The basics provided by gRPC libraries are sometimes not enough when modeling complex applications which may bundle several services together, preprocess the data they are working with, or intend to provide additional insight into the application.

In a previous chapter, we have seen that while some manipulation may be achieved using interceptors, they are still fairly limited, the information they provide is fairly small, and you cannot mutate data freely.

We suggested using layers from the tower crate.

https://docs.rs/tower/latest/tower/

Tower

Tower is a library for building networking applications, both clients and servers. It provides abstractions for modeling networking, and allows you to build components and services on top of one another. While in tower the term "layer" is used prevalently, you may be more familiar with the term middle-ware.

In other words, tower is a framework for building modular networking applications.

The term networking is important, as tower is mostly agnostic to the underlying protocol, and in fact you can use it to expose your service over multiple different protocols.

In case you haven't heard about the framework before, fret not, it is a trustworthy and well-established project, which is used for example by these major Rust systems:

  • The Noria streaming data-flow system
  • Rust runtime for AWS Lambda
  • Linkerd2 proxy
  • Toshi
  • TiKV

The two main concepts in tower are services and layers.

Services

Services form the backbone of your application, this is where your main application logic will likely be stored.

The Service trait itself is quite simple, let's take a look

#![allow(unused)]
fn main() {
pub trait Service<Request> {
    type Response;
    type Error;
    type Future: Future
    where
        <Self::Future as Future>::Output == Result<Self::Response, Self::Error>;

    fn poll_ready(
        &mut self, 
        cx: &mut Context<'_>
    ) -> Poll<Result<(), Self::Error>>;
    fn call(&mut self, req: Request) -> Self::Future;
}
}

As you can see, what you essentially need to do is, given a request, produce a Future<Response>. This makes tower asynchronous, although the trait itself is not an async trait. At the cost of a slightly more verbose API, this gives you the flexibility to construct your service as a reactor and determine to the async executor when it is and isn't ready.

You also don't have to work with the async-trait, which might produce confusing errors.

To use this trait and breathe life into your application, what you need to do is to supply at least a unit struct, and select the contained types for request and response.

These types may be the same, that is up to you.

Here is an example kindly borrowed from the official documentation:

#![allow(unused)]
fn main() {
use http::{Request, Response, StatusCode};

struct HelloWorld;

impl Service<Request<Vec<u8>>> for HelloWorld {
    type Response = Response<Vec<u8>>;
    type Error = http::Error;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Request<Vec<u8>>) -> Self::Future {
        // create the body
        let body: Vec<u8> = "hello, world!\n"
            .as_bytes()
            .to_owned();
        // Create the HTTP response
        let resp = Response::builder()
            .status(StatusCode::OK)
            .body(body)
            .expect("Unable to create `http::Response`");

        // create a response in a future.
        let fut = async {
            Ok(resp)
        };

        // Return the response as an immediate future
        Box::pin(fut)
    }
}
}

This simple example simply returns a HTTP response with the text "hello world", encoded as plain bytes.

Of course, you can implement the trait multiple times with different request types. This can let you, within one service conceptually, to do either different things, or the same thing while processing the data in several different formats. You can combine the two as well, but make sure that your application doesn't become confusing as a result.

To create a client, you create an instance of the type and supply the request.

Here is how a hypothetical redis client would look:

#![allow(unused)]
fn main() {
let redis_client = redis::Client::new()
    .connect("braiins-uni.mag.wiki:6379".parse().expect("BUG: Invalid URL"))
    .expect("BUG: Failed to create redis client");

let res = redis_client.call(Cmd::set("test", "test")).await?;

println!("response: {:?}", res);
}

The Cmd here would be the request type, and the best way to model it in Rust (at least if we only consider simple redis commands), would be with an enum, which might look something like this:

#![allow(unused)]
fn main() {
struct Enum<S>
    S: ToString
{
    Get(S),
    Set(S, S),
    Auth(S, S),
    Echo(S),
    Del(S),
    Exists(S),
}
}

Helper methods might have to be used to preserve ergonomics when dealing with more complex queries. You can see how this is dealt with in the redis crate, if you are curious: https://github.com/redis-rs/redis-rs

However, often there is duplication in logic and processing between the services, which is where middlewares, referred to as layers, come in.

Middlewares / Layers

Layer is functionality decoupled from the service, and can therefore be used across multiple services. Layers, as the name might imply might also be stacked on top of one another to help you transform, validate or inspect your data in the most tailored manner.

Here is the definition of the Layer trait:

#![allow(unused)]
fn main() {
pub trait Layer<S> {
    type Service;

    fn layer(&self, inner: S) -> Self::Service;
}
}

Typically, Layers are implemented to be generic over S, so that they can be used with all services (S with a trait bound, of course), for example, look at this simple logging layer:

#![allow(unused)]
fn main() {
pub struct LogLayer {
    target: &'static str,
}

impl<S> Layer<S> for LogLayer {
    type Service = LogService<S>;

    fn layer(&self, service: S) -> Self::Service {
        LogService {
            target: self.target,
            service
        }
    }
}

// This service implements the Log behavior
pub struct LogService<S> {
    target: &'static str,
    service: S,
}

impl<S, Request> Service<Request> for LogService<S>
where
    S: Service<Request>,
    Request: fmt::Debug,
{
    type Response = S::Response;
    type Error = S::Error;
    type Future = S::Future;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        self.service.poll_ready(cx)
    }

    fn call(&mut self, request: Request) -> Self::Future {
        // Insert log statement here or other functionality
        println!("request = {:?}, target = {:?}", request, self.target);
        self.service.call(request)
    }
}
}

As you can see, layers essentially transform one type of service into another, which combines the functionality of all services underlying.

This log implementation is completely decoupled from the underlying protocol and also from client/server concerns, meaning that the same middle were can be used in either.

We can take a look at another example:

#![allow(unused)]
fn main() {
use tower_service::Service;
use tower_layer::Layer;
use futures::FutureExt;
use std::future::Future;
use std::task::{Context, Poll};
use std::time::Duration;
use std::pin::Pin;
use std::fmt;
use std::error::Error;

// Our timeout service, which wraps another service and
// adds a timeout to its response future.
pub struct Timeout<T> {
    inner: T,
    timeout: Duration,
}

impl<T> Timeout<T> {
    pub fn new(inner: T, timeout: Duration) -> Timeout<T> {
        Timeout {
            inner,
            timeout
        }
    }
}

// The error returned if processing a request timed out
#[derive(Debug)]
pub struct Expired;

impl fmt::Display for Expired {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "expired")
    }
}

impl Error for Expired {}

// We can implement `Service` for `Timeout<T>` if `T` is a `Service`
impl<T, Request> Service<Request> for Timeout<T>
where
    T: Service<Request>,
    T::Future: 'static,
    T::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
    T::Response: 'static,
{
    // `Timeout` doesn't modify the response type, so we use `T`'s response type
    type Response = T::Response;
    // Errors may be either `Expired` if the timeout expired, or the inner service's
    // `Error` type. Therefore, we return a boxed `dyn Error + Send + Sync` trait object to erase
    // the error's type.
    type Error = Box<dyn Error + Send + Sync>;
    type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        // Our timeout service is ready if the inner service is ready.
        // This is how backpressure can be propagated through a tree of nested services.
       self.inner.poll_ready(cx).map_err(Into::into)
    }

    fn call(&mut self, req: Request) -> Self::Future {
        // Create a future that completes after `self.timeout`
        let timeout = tokio::time::sleep(self.timeout);

        // Call the inner service and get a future that resolves to the response
        let fut = self.inner.call(req);

        // Wrap those two futures in another future that completes when either one completes
        //
        // If the inner service is too slow the `sleep` future will complete first
        // And an error will be returned and `fut` will be dropped and not polled again
        //
        // We have to box the errors so the types match
        let f = async move {
            tokio::select! {
                res = fut => {
                    res.map_err(|err| err.into())
                },
                _ = timeout => {
                    Err(Box::new(Expired) as Box<dyn Error + Send + Sync>)
                },
            }
        };

        Box::pin(f)
    }
}

// A layer for wrapping services in `Timeout`
pub struct TimeoutLayer(Duration);

impl TimeoutLayer {
    pub fn new(delay: Duration) -> Self {
        TimeoutLayer(delay)
    }
}

impl<S> Layer<S> for TimeoutLayer {
    type Service = Timeout<S>;

    fn layer(&self, service: S) -> Timeout<S> {
        Timeout::new(service, self.0)
    }
}
}

This example, more elaborate that the previous shows how to introduce timeout functionality. Layers are extremely flexible, and can be used to greatly change the behavior of the services they are wrapping around.

To make it clearer: layers are a facility to mutate both requests and responses.

How does this mesh with gRPC?

Tower in gRPC

Well, the answer lies in the prevalent framework, tonic, which we have spoken about before in different chapters.

The tonic framework is implemented using tower. The client implements Service and other functionality is provided through layers.

As a matter of fact, we can use the aforementioned interceptors as layers too:

#![allow(unused)]
fn main() {
use tower::ServiceBuilder;
use std::time::Duration;
use tonic::{Request, Status, service::interceptor};

fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
    if valid_credentials(&request) {
        Ok(request)
    } else {
        Err(Status::unauthenticated("invalid credentials"))
    }
}

fn valid_credentials(request: &Request<()>) -> bool {
    // ...
}

fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
    Ok(request)
}

let layer = ServiceBuilder::new()
    .load_shed()
    .timeout(Duration::from_secs(30))
    .layer(interceptor(auth_interceptor))
    .layer(interceptor(some_other_interceptor))
    .into_inner();

Server::builder().layer(layer);
}

We can look at the entry-point of the service implementation here:

https://docs.rs/tonic/latest/src/tonic/transport/server/mod.rs.html#692-727

If we take a closer look, we can see that there is even a BoxService layer pre-applied:

https://docs.rs/tonic/latest/src/tonic/transport/server/mod.rs.html#800-801

BoxService

The BoxService type is a fairly important utility, as it allows turning a service into a trait object, allowing the response future type to be dynamic. However, both the service and response futures must be Send:

https://docs.rs/tower/0.4.8/tower/util/struct.UnsyncBoxService.html

In places where you cannot have the future type Send, you can use the UnsyncBoxService alternative:

https://docs.rs/tower/0.4.8/tower/util/struct.BoxService.html

Other useful utilities are found in the util module of tower:

https://docs.rs/tower/latest/tower/util/index.html

Suggestions for metrics and logging

We have seen a couple paragraphs above that layers can be used to great success with logging.

In practice, you will need to select a logging library.

Our experience in Braiins has revealed to us that the framework that meshes the best with async code, especially when performance is on the line, is tracing.

In tonic, a layer for it already exists and you can very easily introduce it to your application:

#![allow(unused)]
fn main() {
use http::{Request, Response};
use hyper::Body;
use tower::{ServiceBuilder, ServiceExt, Service};
use tower_http::trace::TraceLayer;
use std::convert::Infallible;

async fn handle(request: Request<Body>) -> Result<Response<Body>, Infallible> {
    Ok(Response::new(Body::from("foo")))
}

// Setup tracing
tracing_subscriber::fmt::init();

let mut service = ServiceBuilder::new()
    .layer(TraceLayer::new_for_http())
    .service_fn(handle);

let request = Request::new(Body::from("foo"));

let response = service
    .ready()
    .await?
    .call(request)
    .await?;
}

A similar functionality is providing metrics. For metrics, the industry standard is Prometheus. This metrics system has been explored in a previous chapter:

Metrics with Prometheus and Grafana

You simply create a layer, and record the prometheus metrics in there, probably based on the data of the request.