Braiins Kafka

We have now finally built up enough theoretical background to get down to the nitty gritty of why are we even concerning ourselves with Kafka here. The simple answer is because we use Kafka in Braiins.

The Kafka message streaming platform forms a key part of the Dynamo project, where it serves as a broker between services. We do not use Kafka as a final destination for data, that is, as an absolute source of truth, the data from mining-view ends up in a PostgreSQL database.

rdkafka

There exist two main implementations of Kafka clients (for producers and consumers) in Rust. A pure-Rust one exists, aptly called just kafka and then there is one that binds to librdkafka, a C library for communicating with Kafka, which is called rdkafka.

The second one is far more mature and stable, and so it is the one we elected to use. We have seen examples with rdkafka in the previous chapters.

The main library can be found here:

https://crates.io/crates/rdkafka

We maintain a particular version we use in the monorepo. As you can see, the rdkafka client has some nice features:

  • Support for all Kafka versions since 0.8.x. For more information about broker compatibility options, check the librdkafka documentation.
  • Consume from single or multiple topics.
  • Automatic consumer rebalancing.
  • Customizable rebalance, with pre and post rebalance callbacks.
  • Synchronous or asynchronous message production.
  • Customizable offset commit.
  • Create and delete topics and add and edit partitions.
  • Alter broker and topic configurations.
  • Access to cluster metadata (list of topic-partitions, replicas, active brokers etc).
  • Access to group metadata (list groups, list members of groups, hostnames, etc.).
  • Access to producer and consumer metrics, errors and callbacks.
  • Exactly-once semantics (EOS) via idempotent and transactional producers and read-committed consumers.

The benchmark shows that is is also quite performant.

However, the issue with rdkafka is that it is quite low-level, and we needed to write a more high-level interface.

twitch

Our interface has ended up in a library we call twitch. Twitch provides a safe way for streaming arbitrary data in and out of a Kafka cluster, handling storage formats and other necessities for us.

Twitch is found in the monorepo under lib-rs/twitch. The library is quite transparent and clearly written, so you are encouraged to look at its source code to familiarize yourself with it.

Here is an example of a producer written with twitch:

use std::env;
use std::time::Duration;
use tracing::*;
use tracing_subscriber::{self, EnvFilter, FmtSubscriber};
use twitch::warden::{KafkaCluster, TopicConfiguration};
use twitch::{Record, TopicPartition};
pub use rand::RngCore;
use rand::SeedableRng;
use rand_xorshift::XorShiftRng;
use serde::{Deserialize, Serialize};

pub const TOPIC: &'static str = "test_topic";
pub const TEST_SIZE: usize = 30_000;
pub const PAYLOAD_SIZE: usize = 320;
pub const TIMEOUT: Duration = Duration::from_secs(15);

#[derive(Serialize, Deserialize, Debug)]
pub enum TestMessage {
    Reset,
    Data(Box<[u8]>),
}

pub fn init_test_logging() {
    let env_filter = if env::var_os(EnvFilter::DEFAULT_ENV).is_some() {
        EnvFilter::from_default_env()
    } else {
        EnvFilter::new("debug")
    };

    let builder = FmtSubscriber::builder();
    builder.with_env_filter(env_filter).init();
}

pub fn make_rng() -> impl RngCore {
    XorShiftRng::seed_from_u64(0)
}

#[tokio::main]
async fn main() {
    init_test_logging();

    let brokers = env::var("KAFKA_BROKERS").unwrap_or("localhost:9092".into());
    let tp = TopicPartition::from_str(TOPIC, 0);
    let kafka_cluster = KafkaCluster::new(brokers.clone(), "example".into());

    let _ = kafka_cluster
        .wait_for_brokers()
        .await
        .expect("BUG: Failed to connect to brokers");
    info!("Broker seems to be up & running...");

    kafka_cluster
        .create_topics(&[TopicConfiguration::new_with_default(TOPIC)])
        .await
        .expect("BUG: Failed to create topic");
    info!("Test topic created/exists...");
    let mut writer = kafka_cluster
        .get_writer_for_partition(tp)
        .expect("BUG: cannot get writer");

    let reset_msg = bincode::serialize(&TestMessage::Reset).expect("BUG: Serde problem");
    writer
        .produce_and_flush(Record::new(reset_msg))
        .await
        .ok()
        .expect("BUG: failed to send Reset message to kafka");

    let mut rng = make_rng();
    let mut buffer: Vec<u8> = vec![0; PAYLOAD_SIZE];

    for i in 0..TEST_SIZE {
        rng.fill_bytes(buffer.as_mut());

        let msg = TestMessage::Data(buffer.clone().into_boxed_slice());
        let msg = bincode::serialize(&msg).expect("BUG: Could not serialize TestMessage");
        let record = Record::new(msg);
        writer.produce(record).await.expect("BUG: Send error");

        if (i + 1) % 100 == 0 {
            info!("Write: queued {} messages...", i + 1);
        }
    }

    info!("Write: queued {} messages...", TEST_SIZE);

    let _offset = writer.flush().expect("BUG: flush failed");
}

Try to get it working :)

Why we chose Kafka

For us, the main reason to choose Kafka were its persistence features. We want to keep the messages we receive, and we expect there to be a very large amount of messages.

This makes Redis unsuitable for us because RAM would be too much of a constraint, and it makes RabbitMQ unsuitable for us because it is not really built for keeping messages.

Therefore, our choice wasn't that much complicated, we had to choose Kafka.

The task:

You have tried using Kafka directly through rdkafka two chapters back.

Visit the repository with twitch and look at the examples to see how to write a consumer as well.

Now, let's design an application that can leverage Kafka.

Make sure your Kafka deployment has a test topic ready with three partitions.

  • Write a temp_gen crate, which is a producer that produces a random temperature. You can use rng.gen() - rng.gen() (methods on ThreadRng from the rand crate) to introduce variance to the temperature
  • Write an average_temp crate, which is a consumer, which calculates an incremental average and prints it to stdout.

First, try running it 1-to-1, then try running it 1-to-3. Then run it 1-to-3 with all three consumers having the same consumer group specified.