Looking at Kafka from the perspective of storing data
Look at this, Willem, he admits he doesn’t know the law and at the same time insists he’s innocent.
-- Franz Kafka, Der Prozess
I believe that now, it is time to properly introduce Apache Kafka. To quote its website:
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
And that describes it pretty well in the most succinct terms.
Kafka allows us to send messages (ie. events) into logs. These logs contain messages sequentially, and can be iterated over quite effectively.
So that we bring our terminology more on par with what is used when discussing Kafka specifically, we must include a couple new terms.
Message
A message is a piece of data that you add to a topic (think log for the couple next lines until we get to it). A message has two parts:
- a key
- a value
Unlike for KV-stores which we discussed in the previous chapters, the key might be null, ie. missing, or it may be duplicate. Message keys serve as means to filter out certain groups of data from the log.
For example, imagine the following situation: You are a weather station, and you have a number of sensors, which all report the same type of data, and you want to differentiate between which sender is responsible for which message. In Kafka, the correct way to do this would be to use some sort of unique sensor identifier as the key.
Message content - Avro example
The value in Kafka can be essentially whatever. A format that's used commonly specifically with Kafka is Apache Avro. Apache Avro is a bit similar to JSON, but it stores its own schema.
Schema for Person from the previous chapter might look something like this:
{
"namespace": "example",
"type": "record",
"name": "Person",
"fields": [
{ "name": "id", "type": "int"},
{ "name": "name", "type": "string"},
{ "name": "age", "type": "int"},
{ "name": "nationality", "type": "string"}
]
}
However, although it looks JSONy like this, that's just a human readable representation of Apache Avro, in reality, it is stored in a compact binary format. The benefit of Avro is that because it contains its own scheme, you can parse Avro data without having to learn the schema beforehand from a different source.
Topic
To make good on my promise from a couple lines above, the place where messages are sorted out to is actually called a topic, whereas a log is the name we use for the logical collection of various data segments of a topic present on the disk. In other words, topic is the concept, log is the realization.
Logs are further split into segments. The existence of segments is generally out of reach for regular users of Kafka, and have to do with things like persistence and effective storage.
Some topics in Kafka might have compact logs. Topic log compaction modifies the behavior of the key part of the message, such that only the newest message with one key is preserved.
This is useful in cases, where rather than caring about history, you care about current state, and it more resembles what we might be used to when reasoning about types of databases like key-value and wide-column. This significantly saves space, and also time in cases we need to go through the entire topic to get to the bottom of things.
Offset
Messages sorted into a topic are identified by a number called the offset. The default behavior for Kafka is that offset starts at 0 and only keeps incrementing. Since topics are logs, we need to keep track of the offset to be able to figure out where we want to read from.
If you specify offset 0, then you will read all messages in a topic.
Partitions
Kafka's topics are divided into partitions. While a topic represents a concept for the storage of logs, a partition represents the smallest storage unit that holds a subset of records owned by a topic. Every partition is a single log file, where records are written to, generally in an append-only fashion.
Partitions serve the important function of both distributing data and providing redundancy for it. That is because one topic may have partitions across several brokers.
Brokers
When talking about horizontal scaling, which is something that Kafka is particularly known to be good at, a broker represents a node. A Kafka cluster is composed of machines running Kafka called brokers. Each broker has a number of partitions.
There is a number of partitioning strategies and you are free to configure it, so that it both suits your needs and corresponds to the hardware dedicated to the brokers. You can also easily repartition Kafka.
Brokers also serve as so called bootstrap servers, and all brokers are bootstrap servers. In distributed systems, a bootstrap server is one you connect to discover other nodes so you can connect to them. Typically, when connecting to a Kafka cluster, you specify at least two nodes, in case one becomes unavailable. This will make sure that your service still starts successfully even if there is a dead node.
Consumers and producers
The Kafka architecture is asymmetrical when we discuss the terminology of clients. Here, clients are not universal, but divided into two groups, each performing a particular function.
Producers are responsible for creating messages and storing them in topics. Here is a simple example of a Kafka producer, written in Rust:
#![allow(unused)] fn main() { async fn produce(brokers: &str, topic_name: &str) { let producer: &FutureProducer = &ClientConfig::new() .set("bootstrap.servers", brokers) .set("message.timeout.ms", "5000") .create() .expect("Producer creation error"); // This loop is non blocking: all messages will be sent one after the other, without waiting // for the results. let futures = (0..5) .map(|i| async move { // The send operation on the topic returns a future, which will be // completed once the result or failure from Kafka is received. let delivery_status = producer .send( FutureRecord::to(topic_name) .payload(&format!("Message {}", i)) .key(&format!("Key {}", i)) .headers(OwnedHeaders::new().insert(Header { key: "header_key", value: Some("header_value"), })), Duration::from_secs(0), ) .await; // This will be executed when the result is received. info!("Delivery status for message {} received", i); delivery_status }) .collect::<Vec<_>>(); // This loop will wait until all delivery statuses have been received. for future in futures { info!("Future completed. Result: {:?}", future.await); } } }
(this is with the rdkafka crate, which we will discuss later)
On the other hand, consumers are the reader-counterpart of producers. They read messages, optionally processing them, and sending the results elsewhere.
Here is an example of a consumer:
#![allow(unused)] fn main() { async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) { let context = CustomContext; let consumer: LoggingConsumer = ClientConfig::new() .set("group.id", group_id) .set("bootstrap.servers", brokers) .set("enable.partition.eof", "false") .set("session.timeout.ms", "6000") .set("enable.auto.commit", "true") .set_log_level(RDKafkaLogLevel::Debug) .create_with_context(context) .expect("Consumer creation failed"); consumer .subscribe(&topics.to_vec()) .expect("Can't subscribe to specified topics"); loop { match consumer.recv().await { Err(e) => warn!("Kafka error: {}", e), Ok(m) => { let payload = match m.payload_view::<str>() { None => "", Some(Ok(s)) => s, Some(Err(e)) => { warn!("Error while deserializing message payload: {:?}", e); "" } }; info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}", m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp()); if let Some(headers) = m.headers() { for header in headers.iter() { info!(" Header {:#?}: {:?}", header.key, header.value); } } consumer.commit_message(&m, CommitMode::Async).unwrap(); } }; } } }
The producer and consumer we created process string messages with string keys. That is the simplest example. You can see a particular architectural difference. Consumers are typically running in an endless loop, but producers do not have to be.
We can look at all the messages in a topic from command-line by using the kcat tool:
kcat -C -b localhost:9092 -t <topicname>
Beware that doing this for a topic that has too many messages in it might take some time, so consider limiting what messages you want to view in that case.
Task
For starters, it's quite easy, start Kafka on your machine, and try to get the two examples up there running :)
You can see how to start Kafka on your computer by the visiting Appendix: Kafka setup.
I recommend also installing kcat, see this link: https://github.com/edenhill/kcat
You need the rdkafka library installed in the system to be able to build and use kcat.