Kafka

What is Apache Kafka?

  • Events/Messages Streaming Platform
  • Critical piece of the Big Data puzzle
  • Open-source with commercial options
  • Arguably the most popular messaging platform in the world
  • Producers (Publishers) push messages to Kafka
  • Consumers (Subscribers) listen and receive messages

Kafka Data Functions:

  • Collection
  • Storage
  • Transport
  • Distribution
  • Tracking

Benefits of Kafka

  • High throughput
  • Low latency
  • Fault tolerance
  • decoupling
  • Back pressure handling
  • Horizontal scalability
  • Streaming & Batching

Kafka Use Cases:

  • Asynchronous Messaging
  • Realtime Stream Processing
  • Logging & Monitoring
  • Event Sourcing
  • Realtime Analytics

Kafka Messages

  • Event
  • Unit of Data
    • Row, record, Map, Blob
  • Byte Array
    • Structure imposed by publisher, Understood by Consumer
  • Size limits exist in Kafka
  • Can be Batched for efficiency

Message content

  • Key
    • Need not be unique
    • Used for partitioning
  • Value
    • Content of the message
    • User defined
  • Timestamp
    • Automatically timestamped
    • Event Time vs Ingestion Time

Topics in Kafka

  • Holder of messages (similar in context)
    • Files with papers
    • Database Table with records
  • Queue for similar messages
    • Sales Transactions, Audit Logs, Video files
  • Multiple topics per Kafka instance (based on use case)
  • Support multiple Producers & Consumers
  • Multiple partitions per topic

Brokers in Kafka

  • A running Kafka instance
  • Listens on a specific port
  • Receives messages & stores
  • Subscription mgmt
  • Topics, partitions & logs mgmt
  • Clustering capabilities
  • Replication

Logs in Kafka

  • Physical files for storing data
  • Managed by Kafka brokers
  • Multiple files (by broker / topic / partition)
  • Rolling files
  • Pruned periodically
  • Physical space management
  • $KAFKA_ROOT/config/server.properties: log.dirs

Producers

  • Client that publishes data to Kafka
  • Client libraries for programming languages
  • Multiple concurrent producers per topic
  • Message key identification
  • Message serialization to bytes
  • Sync/Async options

Consumers

  • Consume messages from Kafka
  • Consume anytime (Streaming/batching)
  • Client libraries by programming language
  • Multiple concurrent consumers per topic
  • Workload scaling with consumer groups
  • Deserialize bytes to data structures
  • Offset management

Role of ZooKeeper

  • Every Kafka instance/cluster needs ZooKeeper
  • Central, realtime information store for Kafka
  • Broker management
    • Broker registry
    • Active controller
    • Broker failure management
  • Topic management
    • Topic registry
    • Partition leading management
$ cd /opt/bitnami/zookeeper/bin
$ ./zkCli.sh
[zk: localhost:2181(CONNECTED) 3] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[1001]
[zk: localhost:2181(CONNECTED) 8] get /brokers/ids/1001
{"listener_security_protocol_map":{"INTERNAL":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},"endpoints":["INTERNAL://kafka:29092","EXTERNAL://localhost:9092"],"jmx_port":-1,"features":{},"host":"kafka","timestamp":"1647435656926","port":29092,"version":5}
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics
[__consumer_offsets, kafka.learning.alerts, kafka.learning.tweets]
[zk: localhost:2181(CONNECTED) 10] get /brokers/topics/kafka.learning.tweets
{"removing_replicas":{},"partitions":{"0":[1001]},"topic_id":"h0vbdhd1SL-N5Fs13j7t4w","adding_replicas":{},"version":3}
[zk: localhost:2181(CONNECTED) 11] quit

Partitions in Kafka

  • Each topic can have 1-n partitions
  • Partitions allow Kafka to scale
  • Partitions have separate log files
  • Each Partition has a Leader broker
  • Enables consumers to share workloads through consumer groups
  • Partitions can be replicated
  • Each message goes to only 1 partition
  • Message ordering guaranteed within a partiton only
  • Same Message Key = Same partition
  • Partition count cannot be changed after topic creation

Consumer Groups

  • A group of consumers who share a topic workload
  • Each message goes to only one consumer in a group
  • Consumers split workload through partitions
    • No. of partitions >= no. of consumers
  • Multiple consumer groups - each group gets all messages
  • Brokers track and rebalance consumers

Offsets in Kafka

  • Number to track message consumption by consumer and partition
  • Broker keeps track of what is sent and acknowledged
  • Current offset: last message sent to a given consumer
  • Committed offset: last message acknowledged by consumer
  • Broker resends uncommitted messages in case of failure/timeout
  • Ensures at least one delivery
  • New consumer: can request from start/new/from-offset

Kafka Cluster

  • Group of Kafka brokers working together
  • Each broker has an ID
  • Share work by partitions
  • SYnc through Zookeeper
  • /brokers/ids: ephemeral nodes
    • Delete on loss of connection

Kafka Cluster Controller

  • One broker becauses the controller
  • A controller is also a broker
  • First broker to register with/controller in ZooKeeper
  • Controller functions
    • Monitor other brokers
    • Partition leader election
    • Replica management

Controller Resiliency

  • All brokers subscribe for controller state changes in ZooKeeper
  • Controller goes down
  • Ephemeral node removed; other brokers notified
  • Other brokers try to become controller; the first one wins
  • New controller takes over

Kafka Replication

  • Providers resiliency against individual broker failures
  • Maintains multiple copies of partition logs
  • Unit of replication = Partition
  • Each partition has multiple replicas (replication-factor)
  • One leader replica and other follower replicas
  • Leader broker manages writes, reads, and storage

Replica Distribution

  • Controller distributes replicas to brokers
  • Leader replicas of partitions are distributed across leader brokers for workload balancing
  • Follower replicas are distributed to brokers which are not leaders for the same partition
  • Replication factor <= Number of brokers

Kafka Partition Leaders

  • Broker owning the leader replica is the partition leader
  • Controller assigns partition leaders during topic creation
  • All read/writes go to the partition leader
  • Follower brokers
    • Own follower replicas
    • Subscribe to the lead broker
    • Gets new data and update local copy
  • Partition leadership is evenly distributed

Partition Leader Resilency

  • Partition leader goes down
  • Controller notified of lost broker
  • Controller chooses new partition leader
    • Based on available in-sync replicas
  • New leaders are notified by controller
  • New leader takes over read/writes
  • Partition followers now follow the new leader

Kafka Mirroring

  • Enables geo-replication (between data centers)
  • Provides resiliency over data center outages
  • Done by MirrorMaker tool between two clusters
  • Replicates
    • Topics (new topics detected)
    • Partitions
    • Data
    • Consumer groups and offsets

Kafka Security

  • Client authentication using SSL/SASL
    • Producers, consumers, brokers, ZooKeeper
  • Authorization of read/write operations
    • By topic and group
  • In-flight encryption using SSL
  • Storage encryption - encrypted disks

Kafka Security Limitations

  • Expected to be used within a trusted network with producers and consumers
  • Not recommended as public API
  • Multitenancy is hard to implement

Kafka Producer

  • Embedded within the client (as a library)
  • On creating a new producer
    • Contact the bootstrap server
    • Get info on other Kafka brokers
    • Get info on topics, partitions, and partition leaders
  • Keep track of metadata of changes

Publishing Messages

  • Clients sends a ProducerRecord using the send() method
  • Producer will:
    • Serialize the message
    • Choose a partition for the message
    • Add the message to a local batch for this partition
  • A separate thread pushes batch to partition leader
  • Acknowledgment received from partition leader

Producer: Publishing Modes

Synchronous Mode

  • client code sends message to local producer
  • Client code waits; is blocked until
    • Kafka broker confirms successful receipt
    • Failure to send to Kafka broker
  • Returns a RecordMetaData object
  • Guaranteed message delivery
  • Slow, does not exploit the producer’s batch capability

Asynchronous Mode with No-check

  • Client code sends message to local producer
  • Client code is not blocked; moves on
  • Local producer caches in batches
  • A separate thread publishes to Kafka
  • Low latency and scaling, as host does not wait
  • Failures are not tracked, so messages may be missed

Asynchronous Mode with Callback

  • Client code sends message to local producer
  • Client provides a callback function to process results
  • Client code is not blocked; moves on
  • Local producer caches, separate thread publishes data
  • Callback function called with RecordMetaData/exceptions
  • Client can republish if desired
  • Low latency, but error handling can be complex

Producer: Acknowledgment

  • A client may need to know if the message has been received and saved by the broker
  • Behavior can be controlled by the acks parameter
  • Set in the broker configuration for the producer
  • Determines number of replicas that must receive the message before the send is declared successful
  • Delivery guarantees vs. latency

Producer: Acknowledgment Values

Value Behavior
0 No acknowledgment needed High throughput
1 Default
Leader replica should receive the message
Throughput impact if synchronous mode used
all All in-sync replicas should receive the message
Throughput impact if synchronous mode used

Producer: Other Parameters

Parameters values Behavior
BUFFER.MEMORY Memory in bytes Total memory allowed for buffering records
COMPRESSION.TYPE None, gzip, snappy, lz4, zstd Compress data for lower payload sizes
BATCH.SIZE Size in bytes Payload sent to broker in batches
LINGER.MS Time in milliseconds Time to wait for more messages, before sending to broker

Kafka Consumer

Consumer: Group Coordinator

  • Consumers are typically part of consumer groups
  • Each consumer group has a group coordinator
  • Group coordinator
    • A Kafka broker in the cluster
    • Keeps track of active consumers
    • Receives heartbeats from consumers
    • Triggers rebalancing if heartbeats stop

Consumer: Group Leader

  • Each consumer group has a group leader
  • Group leader
    • A consumer in the consumer group
    • Typically, the first consumer to join the group
    • Receives info on all consumers from group coordinator
    • Allocates partitions to consumers
    • Works with coordinator for rebalancing

Consumer: Batching Messages

Consumer polling and Batching

  • Consumers periodically poll Kafka for new messages
  • Connect to partition leaders for their assigned partitions
  • Batching helps optimize network round-trips
  • Batching impacts latency
  • Choose right batch sizes to balance round-trips and latency

Batching: Parameters

Parameters Values Behavior
Poll Interval Milliseconds Time interval between consecutive polls from consumer to broker
FETCH.MIN.BYTES Bytes Minimum batch size. If not enough data, request will block. Default 1 byte
FETCH.MAX.WAIT.MS Milliseconds Maximum time to wait for minimum batch size, before returning data. Default 500 ms
MAX.PARTITION.FETCH.BYTES Bytes Maximum data per partition returned in one request. Default 1 MB

Consumer: Committing Messages

  • By default, consumers will auto commit on receipt
    • enable.auto.commit (true)
    • auto.commit.interval.ms (5000)
  • Auto commit does not guarantee reliable processing of messages
  • Determine auto vs. manual based on use case

Consumer: Manual Commit

  • Commit synchronously
    • Calling commitSync() method on consumer
    • Commit once every batch
    • Blocks until commit is done
    • Custom advanced commit options available
  • Commit asynchronously
    • Call commitAsync() method on consumer

      Managing Partition

Partition Sizing

  • Less partitions per topic results in:
    • Producer lag
    • Consumer lag
    • Starved or overloaded brokers and consumers
  • More partitions per broker results in:
    • More broker resources - file handles and memory
    • Replication load

Recommendations

  • Plan partition counts before creating topics
  • Partition count >= Number of consumers in a group
  • Use as many brokers as possible (including replicas)
  • One broker < 1,000 replicas
  • Number of unique key values > Number of partitions
  • Load testing to measure right size partition based on:
    • Overall peek load
    • Partition lag

Managing Messages

  • Keep message sizes small
  • Message formats
    • Choose binary standard formats like Avro
    • Use a schema repository to share message schema
  • Message keys
    • More unique values = Better partitioning
    • Even distribution of values = Better partitioning
    • Use keys only if message ordering is needed

Managing Consumer Settings

  • Choose consumer group size to match processing loads
  • Choose batching parameters based on use case
    • Use batching to reduce network round-trips
    • Use smaller batch sizes and polling intervals for low latency
  • Use manual commits to ensure processing reliability
  • Use non-blocking processing for higher throughput
  • Test failure conditions to verify desired results

Managing Resiliency

  • Use replication to safeguard against broker failures
  • Distribute brokers across different racks
  • Use acknowledgments in producer for guaranteed delivery
  • Use manual commits in consumers
  • Use ZooKeeper clusters for high availability
  • Use mirroring if geo-redundancy is required
  • Test resiliency use cases
Written on March 16, 2022