Kafka
kafka java pythonWhat is Apache Kafka?
- Events/Messages Streaming Platform
- Critical piece of the Big Data puzzle
- Open-source with commercial options
- Arguably the most popular messaging platform in the world
- Producers (Publishers) push messages to Kafka
- Consumers (Subscribers) listen and receive messages
Kafka Data Functions:
- Collection
- Storage
- Transport
- Distribution
- Tracking
Benefits of Kafka
- High throughput
- Low latency
- Fault tolerance
- decoupling
- Back pressure handling
- Horizontal scalability
- Streaming & Batching
Kafka Use Cases:
- Asynchronous Messaging
- Realtime Stream Processing
- Logging & Monitoring
- Event Sourcing
- Realtime Analytics
Kafka Messages
- Event
- Unit of Data
- Row, record, Map, Blob
- Byte Array
- Structure imposed by publisher, Understood by Consumer
- Size limits exist in Kafka
- Can be Batched for efficiency
Message content
- Key
- Need not be unique
- Used for partitioning
- Value
- Content of the message
- User defined
- Timestamp
- Automatically timestamped
- Event Time vs Ingestion Time
Topics in Kafka
- Holder of messages (similar in context)
- Files with papers
- Database Table with records
- Queue for similar messages
- Sales Transactions, Audit Logs, Video files
- Multiple topics per Kafka instance (based on use case)
- Support multiple Producers & Consumers
- Multiple partitions per topic
Brokers in Kafka
- A running Kafka instance
- Listens on a specific port
- Receives messages & stores
- Subscription mgmt
- Topics, partitions & logs mgmt
- Clustering capabilities
- Replication
Logs in Kafka
- Physical files for storing data
- Managed by Kafka brokers
- Multiple files (by broker / topic / partition)
- Rolling files
- Pruned periodically
- Physical space management
- $KAFKA_ROOT/config/server.properties: log.dirs
Producers
- Client that publishes data to Kafka
- Client libraries for programming languages
- Multiple concurrent producers per topic
- Message key identification
- Message serialization to bytes
- Sync/Async options
Consumers
- Consume messages from Kafka
- Consume anytime (Streaming/batching)
- Client libraries by programming language
- Multiple concurrent consumers per topic
- Workload scaling with consumer groups
- Deserialize bytes to data structures
- Offset management
Role of ZooKeeper
- Every Kafka instance/cluster needs ZooKeeper
- Central, realtime information store for Kafka
- Broker management
- Broker registry
- Active controller
- Broker failure management
- Topic management
- Topic registry
- Partition leading management
$ cd /opt/bitnami/zookeeper/bin
$ ./zkCli.sh
[zk: localhost:2181(CONNECTED) 3] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[1001]
[zk: localhost:2181(CONNECTED) 8] get /brokers/ids/1001
{"listener_security_protocol_map":{"INTERNAL":"PLAINTEXT","EXTERNAL":"PLAINTEXT"},"endpoints":["INTERNAL://kafka:29092","EXTERNAL://localhost:9092"],"jmx_port":-1,"features":{},"host":"kafka","timestamp":"1647435656926","port":29092,"version":5}
[zk: localhost:2181(CONNECTED) 9] ls /brokers/topics
[__consumer_offsets, kafka.learning.alerts, kafka.learning.tweets]
[zk: localhost:2181(CONNECTED) 10] get /brokers/topics/kafka.learning.tweets
{"removing_replicas":{},"partitions":{"0":[1001]},"topic_id":"h0vbdhd1SL-N5Fs13j7t4w","adding_replicas":{},"version":3}
[zk: localhost:2181(CONNECTED) 11] quit
Partitions in Kafka
- Each topic can have
1-n
partitions - Partitions allow Kafka to scale
- Partitions have separate log files
- Each Partition has a Leader broker
- Enables consumers to share workloads through consumer groups
- Partitions can be replicated
- Each message goes to only 1 partition
- Message ordering guaranteed within a partiton only
- Same Message Key = Same partition
- Partition count cannot be changed after topic creation
Consumer Groups
- A group of consumers who share a topic workload
- Each message goes to only one consumer in a group
- Consumers split workload through partitions
- No. of partitions >= no. of consumers
- Multiple consumer groups - each group gets all messages
- Brokers track and rebalance consumers
Offsets in Kafka
- Number to track message consumption by consumer and partition
- Broker keeps track of what is sent and acknowledged
- Current offset: last message sent to a given consumer
- Committed offset: last message acknowledged by consumer
- Broker resends uncommitted messages in case of failure/timeout
- Ensures at least one delivery
- New consumer: can request from start/new/from-offset
Kafka Cluster
- Group of Kafka brokers working together
- Each broker has an ID
- Share work by partitions
- SYnc through Zookeeper
- /brokers/ids: ephemeral nodes
- Delete on loss of connection
Kafka Cluster Controller
- One broker becauses the controller
- A controller is also a broker
- First broker to register with/controller in ZooKeeper
- Controller functions
- Monitor other brokers
- Partition leader election
- Replica management
Controller Resiliency
- All brokers subscribe for controller state changes in ZooKeeper
- Controller goes down
- Ephemeral node removed; other brokers notified
- Other brokers try to become controller; the first one wins
- New controller takes over
Kafka Replication
- Providers resiliency against individual broker failures
- Maintains multiple copies of partition logs
- Unit of replication = Partition
- Each partition has multiple replicas (replication-factor)
- One leader replica and other follower replicas
- Leader broker manages writes, reads, and storage
Replica Distribution
- Controller distributes replicas to brokers
- Leader replicas of partitions are distributed across leader brokers for workload balancing
- Follower replicas are distributed to brokers which are not leaders for the same partition
- Replication factor <= Number of brokers
Kafka Partition Leaders
- Broker owning the leader replica is the partition leader
- Controller assigns partition leaders during topic creation
- All read/writes go to the partition leader
- Follower brokers
- Own follower replicas
- Subscribe to the lead broker
- Gets new data and update local copy
- Partition leadership is evenly distributed
Partition Leader Resilency
- Partition leader goes down
- Controller notified of lost broker
- Controller chooses new partition leader
- Based on available in-sync replicas
- New leaders are notified by controller
- New leader takes over read/writes
- Partition followers now follow the new leader
Kafka Mirroring
- Enables geo-replication (between data centers)
- Provides resiliency over data center outages
- Done by MirrorMaker tool between two clusters
- Replicates
- Topics (new topics detected)
- Partitions
- Data
- Consumer groups and offsets
Kafka Security
- Client authentication using SSL/SASL
- Producers, consumers, brokers, ZooKeeper
- Authorization of read/write operations
- By topic and group
- In-flight encryption using SSL
- Storage encryption - encrypted disks
Kafka Security Limitations
- Expected to be used within a trusted network with producers and consumers
- Not recommended as public API
- Multitenancy is hard to implement
Kafka Producer
- Embedded within the client (as a library)
- On creating a new producer
- Contact the bootstrap server
- Get info on other Kafka brokers
- Get info on topics, partitions, and partition leaders
- Keep track of metadata of changes
Publishing Messages
- Clients sends a ProducerRecord using the send() method
- Producer will:
- Serialize the message
- Choose a partition for the message
- Add the message to a local batch for this partition
- A separate thread pushes batch to partition leader
- Acknowledgment received from partition leader
Producer: Publishing Modes
Synchronous Mode
- client code sends message to local producer
- Client code waits; is blocked until
- Kafka broker confirms successful receipt
- Failure to send to Kafka broker
- Returns a RecordMetaData object
- Guaranteed message delivery
- Slow, does not exploit the producer’s batch capability
Asynchronous Mode with No-check
- Client code sends message to local producer
- Client code is not blocked; moves on
- Local producer caches in batches
- A separate thread publishes to Kafka
- Low latency and scaling, as host does not wait
- Failures are not tracked, so messages may be missed
Asynchronous Mode with Callback
- Client code sends message to local producer
- Client provides a callback function to process results
- Client code is not blocked; moves on
- Local producer caches, separate thread publishes data
- Callback function called with RecordMetaData/exceptions
- Client can republish if desired
- Low latency, but error handling can be complex
Producer: Acknowledgment
- A client may need to know if the message has been received and saved by the broker
- Behavior can be controlled by the acks parameter
- Set in the broker configuration for the producer
- Determines number of replicas that must receive the message before the send is declared successful
- Delivery guarantees vs. latency
Producer: Acknowledgment Values
Value | Behavior |
---|---|
0 | No acknowledgment needed High throughput |
1 | Default Leader replica should receive the message Throughput impact if synchronous mode used |
all | All in-sync replicas should receive the message Throughput impact if synchronous mode used |
Producer: Other Parameters
Parameters | values | Behavior |
---|---|---|
BUFFER.MEMORY | Memory in bytes | Total memory allowed for buffering records |
COMPRESSION.TYPE | None, gzip, snappy, lz4, zstd | Compress data for lower payload sizes |
BATCH.SIZE | Size in bytes | Payload sent to broker in batches |
LINGER.MS | Time in milliseconds | Time to wait for more messages, before sending to broker |
Kafka Consumer
Consumer: Group Coordinator
- Consumers are typically part of consumer groups
- Each consumer group has a group coordinator
- Group coordinator
- A Kafka broker in the cluster
- Keeps track of active consumers
- Receives heartbeats from consumers
- Triggers rebalancing if heartbeats stop
Consumer: Group Leader
- Each consumer group has a group leader
- Group leader
- A consumer in the consumer group
- Typically, the first consumer to join the group
- Receives info on all consumers from group coordinator
- Allocates partitions to consumers
- Works with coordinator for rebalancing
Consumer: Batching Messages
Consumer polling and Batching
- Consumers periodically poll Kafka for new messages
- Connect to partition leaders for their assigned partitions
- Batching helps optimize network round-trips
- Batching impacts latency
- Choose right batch sizes to balance round-trips and latency
Batching: Parameters
Parameters | Values | Behavior |
---|---|---|
Poll Interval | Milliseconds | Time interval between consecutive polls from consumer to broker |
FETCH.MIN.BYTES | Bytes | Minimum batch size. If not enough data, request will block. Default 1 byte |
FETCH.MAX.WAIT.MS | Milliseconds | Maximum time to wait for minimum batch size, before returning data. Default 500 ms |
MAX.PARTITION.FETCH.BYTES | Bytes | Maximum data per partition returned in one request. Default 1 MB |
Consumer: Committing Messages
- By default, consumers will auto commit on receipt
- enable.auto.commit (true)
- auto.commit.interval.ms (5000)
- Auto commit does not guarantee reliable processing of messages
- Determine auto vs. manual based on use case
Consumer: Manual Commit
- Commit synchronously
- Calling
commitSync()
method on consumer - Commit once every batch
- Blocks until commit is done
- Custom advanced commit options available
- Calling
- Commit asynchronously
- Call
commitAsync()
method on consumerManaging Partition
- Call
Partition Sizing
- Less partitions per topic results in:
- Producer lag
- Consumer lag
- Starved or overloaded brokers and consumers
- More partitions per broker results in:
- More broker resources - file handles and memory
- Replication load
Recommendations
- Plan partition counts before creating topics
- Partition count >= Number of consumers in a group
- Use as many brokers as possible (including replicas)
- One broker < 1,000 replicas
- Number of unique key values > Number of partitions
- Load testing to measure right size partition based on:
- Overall peek load
- Partition lag
Managing Messages
- Keep message sizes small
- Message formats
- Choose binary standard formats like Avro
- Use a schema repository to share message schema
- Message keys
- More unique values = Better partitioning
- Even distribution of values = Better partitioning
- Use keys only if message ordering is needed
Managing Consumer Settings
- Choose consumer group size to match processing loads
- Choose batching parameters based on use case
- Use batching to reduce network round-trips
- Use smaller batch sizes and polling intervals for low latency
- Use manual commits to ensure processing reliability
- Use non-blocking processing for higher throughput
- Test failure conditions to verify desired results
Managing Resiliency
- Use replication to safeguard against broker failures
- Distribute brokers across different racks
- Use acknowledgments in producer for guaranteed delivery
- Use manual commits in consumers
- Use ZooKeeper clusters for high availability
- Use mirroring if geo-redundancy is required
- Test resiliency use cases
Written on March 16, 2022