Kafka: распределённый брокер сообщений Концепции Official Docs | Confluent Developer
Архитектура KafkaProducer → Topic (partitions) → Consumer Group (consumers)
Broker Cluster — хранение на диске, репликация
Topic : именованный канал сообщений. Аналог таблицы в БД. Сообщения в топике хранятся на диске, не удаляются при чтении.
Partition : физическое разделение топика. Каждая партиция представляет собой упорядоченный, неизменяемый лог (append-only). Внутри партиции сообщения имеют уникальный offset (порядковый номер). Партиции распределяются по брокерам.
Offset : порядковый номер сообщения внутри партиции. Монотонно растёт. Consumer запоминает свой offset, то есть «где остановился». Offset 0 = начало, offset -1 = конец.
Broker : один сервер Kafka. Хранит партиции, обслуживает клиентов. Кластер = несколько брокеров.
Producer : публикует сообщения в топик. Выбирает партицию по ключу сообщения (или round-robin).
Consumer : читает сообщения из топика. Хранит свой offset (позицию чтения).
Consumer Group : группа consumer-ов, читающих один топик совместно. Каждая партиция назначается ровно одному consumer-у в группе. Если consumer-ов больше, чем партиций, лишние простаивают.
Replication Factor : количество копий каждой партиции. RF=3 → данные на 3 брокерах. Leader обслуживает чтение/запись, follower-ы реплицируют.
ISR (In-Sync Replicas) : реплики, не отстающие от лидера. Сообщение считается committed, когда записано во все ISR.
Распределение партиций и consumer-овTopic: orders (3 партиции, RF=2)
Partition 0 : [msg0, msg1, msg2, msg3, …] → Broker 1 (leader), Broker 2 (follower)Partition 1 : [msg0, msg1, msg2, …] → Broker 2 (leader), Broker 3 (follower)Partition 2 : [msg0, msg1, …] → Broker 3 (leader), Broker 1 (follower)Consumer Group: order-service (3 consumer-а)
consumer-1 ← Partition 0 consumer-2 ← Partition 1 consumer-3 ← Partition 2 Гарантии доставки Гарантия Описание Механизм At-most-once Сообщение может быть потеряно, но не дублировано commit offset до обработки At-least-once Сообщение доставлено хотя бы раз, возможны дубликаты (по умолчанию) commit offset после обработки Exactly-once Ровно один раз (самый медленный, включается явно) idempotent producer + transactions
Kafka vs очереди (RabbitMQ, Redis) Традиционная очередь Kafka Сообщение удаляется при чтении Сообщение хранится до retention Один потребитель на сообщение Множество consumer group читают независимо Push-модель (брокер → consumer) Pull-модель (consumer → брокер) Сложная маршрутизация Простая модель (topic + partition) Малые объёмы, low latency Огромные объёмы, высокий throughput
Установка Docker Compose (KRaft, без ZooKeeper) # compose.yaml
services :
kafka :
image : apache/kafka:3.9
ports :
- "9092:9092"
environment :
KAFKA_NODE_ID : 1
KAFKA_PROCESS_ROLES : broker,controller
KAFKA_LISTENERS : PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS : 1 @kafka:29093
KAFKA_CONTROLLER_LISTENER_NAMES : CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME : PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
KAFKA_LOG_RETENTION_HOURS : 168
KAFKA_LOG_RETENTION_BYTES : 1073741824
volumes :
- kafka_data:/var/lib/kafka/data
kafka-ui :
image : provectuslabs/kafka-ui
ports :
- "8080:8080"
environment :
KAFKA_CLUSTERS_0_NAME : local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS : kafka:29092
depends_on :
- kafka
volumes :
kafka_data: Кластер из 3 брокеров # compose.yaml
services :
kafka-1 :
image : apache/kafka:3.9
environment :
KAFKA_NODE_ID : 1
KAFKA_PROCESS_ROLES : broker,controller
KAFKA_LISTENERS : PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS : 1 @kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_CONTROLLER_LISTENER_NAMES : CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME : PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 3
KAFKA_DEFAULT_REPLICATION_FACTOR : 3
KAFKA_MIN_INSYNC_REPLICAS : 2
ports :
- "9092:9092"
volumes :
- kafka1_data:/var/lib/kafka/data
kafka-2 :
image : apache/kafka:3.9
environment :
KAFKA_NODE_ID : 2
KAFKA_PROCESS_ROLES : broker,controller
KAFKA_LISTENERS : PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://kafka-2:29092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS : 1 @kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_CONTROLLER_LISTENER_NAMES : CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME : PLAINTEXT
ports :
- "9093:9093"
volumes :
- kafka2_data:/var/lib/kafka/data
kafka-3 :
image : apache/kafka:3.9
environment :
KAFKA_NODE_ID : 3
KAFKA_PROCESS_ROLES : broker,controller
KAFKA_LISTENERS : PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://kafka-3:29092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP : PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS : 1 @kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_CONTROLLER_LISTENER_NAMES : CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME : PLAINTEXT
ports :
- "9094:9094"
volumes :
- kafka3_data:/var/lib/kafka/data
volumes :
kafka1_data :
kafka2_data :
kafka3_data: CLI: kafka-* утилиты # Если из Docker:
docker exec -it kafka bash
# Утилиты в /opt/kafka/bin/
# Или через kcat (kafkacat) — удобнее для разработки
# Arch: sudo pacman -S kcat
# macOS: brew install kcat
# Ubuntu: sudo apt install kafkacat Топики # Создать топик
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders \
--partitions 6 \
--replication-factor 3
# Список топиков
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Описание топика
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
# Увеличить партиции (уменьшить нельзя!)
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic orders --partitions 12
# Удалить топик
kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic orders
# Изменить конфигурацию топика
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name orders \
--add-config retention.ms= 604800000 # retention 7 дней
# Посмотреть конфигурацию
kafka-configs.sh --bootstrap-server localhost:9092 \
--describe --entity-type topics --entity-name orders Producer (CLI) # Простая отправка (stdin → Kafka)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic orders
> { "order_id" : 1, "amount" : 99.99}
> { "order_id" : 2, "amount" : 49.99}
> ^C
# С ключом (key:value)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic orders \
--property parse.key= true \
--property key.separator= :
> user42:{ "order_id" : 1, "amount" : 99.99}
> user43:{ "order_id" : 2, "amount" : 49.99}
# Из файла
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic orders < messages.txt
# kcat (проще)
echo '{"order_id": 1}' | kcat -b localhost:9092 -t orders -P
kcat -b localhost:9092 -t orders -P -K: < messages.txt # key:value Consumer (CLI) # Читать новые сообщения
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders
# С начала
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders --from-beginning
# С ключами и метаданными
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders --from-beginning \
--property print.key= true \
--property print.timestamp= true \
--property print.partition= true \
--property print.offset= true
# В составе consumer group
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders --group order-processor
# kcat (проще)
kcat -b localhost:9092 -t orders -C # новые
kcat -b localhost:9092 -t orders -C -o beginning # с начала
kcat -b localhost:9092 -t orders -C -o end # только новые
kcat -b localhost:9092 -t orders -C -o -10 # последние 10
kcat -b localhost:9092 -t orders -C -p 0 -o 100 # partition 0, offset 100
kcat -b localhost:9092 -t orders -C -f '%T %k %s\n' # формат: timestamp key value
# Метаданные кластера
kcat -b localhost:9092 -L # список топиков и брокеров
kcat -b localhost:9092 -L -t orders # детали по топику Consumer Groups # Список consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Описание группы (offset, lag)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# Вывод:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-proc orders 0 1000 1050 50
# order-proc orders 1 2000 2000 0
# LAG = отставание
# Сбросить offset (все consumer-ы в группе должны быть остановлены!)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --to-earliest --execute
# Сброс на конкретный offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --to-offset 1000 --execute
# Сброс по времени
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --to-datetime 2025-03-15T00:00:00.000 --execute
# Сдвиг offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --shift-by -100 --execute
# Удалить группу
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--delete --group order-processor Конфигурация Брокер (server.properties / переменные окружения) # Идентификация
node.id = 1
process.roles = broker,controller
# Сеть
listeners = PLAINTEXT://0.0.0.0:9092
advertised.listeners = PLAINTEXT://kafka-host:9092
# Логи (хранилище сообщений)
log.dirs = /var/lib/kafka/data
log.retention.hours = 168 # хранить 7 дней (по умолчанию)
log.retention.bytes = -1 # без лимита по размеру (-1)
log.segment.bytes = 1073741824 # размер сегмента 1GB
log.cleanup.policy = delete # delete или compact
# Репликация
default.replication.factor = 3
min.insync.replicas = 2 # минимум ISR для acks=all
unclean.leader.election.enable = false # не выбирать не-ISR реплику лидером
# Топики
num.partitions = 6 # партиции по умолчанию
auto.create.topics.enable = false # не создавать топики автоматически
# Производительность
num.io.threads = 8
num.network.threads = 3
socket.send.buffer.bytes = 102400
socket.receive.buffer.bytes = 102400 Конфигурация топика # При создании
kafka-topics.sh --create --topic events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms= 604800000 \
--config cleanup.policy= delete \
--config max.message.bytes= 10485760
# Изменение
kafka-configs.sh --alter --entity-type topics --entity-name events \
--add-config retention.ms= 259200000 # 3 дня Параметр Описание По умолчанию retention.msВремя хранения сообщений 604800000 (7 дней) retention.bytesЛимит размера партиции -1 (без лимита) cleanup.policydelete или compactdelete max.message.bytesМаксимальный размер сообщения 1048576 (1MB) min.insync.replicasМинимум ISR для acks=all 1 segment.bytesРазмер лог-сегмента 1073741824 (1GB) compression.typeСжатие: none, gzip, snappy, lz4, zstd producer
Cleanup policies
Cleanup policiesdelete — удалять сегменты старше retention.ms или при превышении retention.bytescompact — оставлять только последнее сообщение для каждого ключа (log compaction)Log compaction : для топиков-снимков (состояние, конфигурация). Сохраняет последнее значение для каждого ключа, удаляя старые. Сообщение с value=null (tombstone) удаляет ключ.
kafka-topics.sh --create --topic user-profiles \
--partitions 6 \
--config cleanup.policy= compact \
--config min.cleanable.dirty.ratio= 0.5 \
--config delete.retention.ms= 86400000 Producer Конфигурация Параметр Описание Рекомендация bootstrap.serversАдреса брокеров Минимум 2-3 для отказоустойчивости acksПодтверждение записи all для надёжностиretriesПовторы при ошибке 2147483647 (бесконечно)delivery.timeout.msОбщий таймаут доставки 120000 (2 мин)linger.msЗадержка для батчинга 5-100 (больше = больший batch)batch.sizeРазмер batch в байтах 16384-65536compression.typeСжатие lz4 или zstdenable.idempotenceИдемпотентность (дедупликация) truemax.in.flight.requests.per.connectionПараллельные запросы 5 (с идемпотентностью)key.serializerСериализатор ключей StringSerializer / Avro / Protobufvalue.serializerСериализатор значений StringSerializer / Avro / Protobuf
acks: уровни подтверждения Уровень Описание acks=0Не ждать подтверждения (максимальная скорость, возможна потеря) acks=1Ждать подтверждения от leader-а (компромисс) acks=allЖдать подтверждения от всех ISR (максимальная надёжность). Работает в связке с min.insync.replicas
Для продакшена: acks=all + min.insync.replicas=2 + replication.factor=3.
Partitioning (выбор партиции)
Стратегии выбора партицииЯвная партиция — producer указывает номер (редко)По ключу — hash(key) % num_partitions (основной способ)Round-robin — без ключа, равномерное распределениеCustom partitioner — своя логикаСообщения с одинаковым ключом всегда попадают в одну партицию → гарантирован порядок для одного ключа.
Влияние ключа на порядокKey = user_id → все события одного пользователя в одной партиции → упорядоченыKey = null → round-robin → порядок не гарантированИдемпотентный producer
Идемпотентный producerenable.idempotence=true (по умолчанию с Kafka 3.0)
Каждый producer получает уникальный PID (producer ID). Каждое сообщение получает sequence number . Брокер отклоняет дубликаты по (PID, sequence).
Результат: Exactly-once на уровне producer → partition.
Транзакции (Exactly-Once Semantics)
Транзакции (Exactly-Once Semantics)transactional.id=my-producer-1
Producer:
beginTransaction()send(topic1, message1)send(topic2, message2)sendOffsetsToTransaction(offsets, consumerGroupId) — для consume-transform-producecommitTransaction() или abortTransaction()Все сообщения атомарно: либо все видны consumer-ам, либо ни одного.
Consumer Конфигурация Параметр Описание Рекомендация bootstrap.serversАдреса брокеров Минимум 2-3 group.idConsumer group Обязателен для групповой работы auto.offset.resetСтратегия при отсутствии offset earliest или latestenable.auto.commitАвтоматический commit offset false для at-least-onceauto.commit.interval.msИнтервал автокоммита 5000max.poll.recordsЗаписей за один poll 500max.poll.interval.msМаксимум между poll-ами 300000 (5 мин)session.timeout.msТаймаут сессии (heartbeat) 45000heartbeat.interval.msИнтервал heartbeat 3000fetch.min.bytesМинимум данных для ответа 1fetch.max.wait.msМаксимальное ожидание данных 500isolation.levelread_committed / read_uncommittedread_committed для EOS
auto.offset.reset Значение Описание earliestНачать с начала топика (если нет сохранённого offset) latestНачать с конца (только новые сообщения) noneОшибка, если нет сохранённого offset
Commit стратегии
Commit стратегииAuto-commit (enable.auto.commit=true):
Фоновый коммит каждые auto.commit.interval.ms. Просто, но может привести к потере сообщений (commit до обработки) или дубликатам (crash до commit).
Manual sync commit :
consumer.commitSync() — после успешной обработки batch-а. At-least-once: при crash повторная обработка от последнего commit.
Manual async commit :
consumer.commitAsync() — не блокирует. Callback для обработки ошибок.
Commit per-partition :
commitSync(Map<TopicPartition, OffsetAndMetadata>) — точный контроль.
Rebalancing Когда consumer присоединяется/покидает группу, партиции перераспределяются:
Стратегии RebalancingEager (по умолчанию до Kafka 2.4):
Отзывает все партиции → перераспределяет. Пауза в обработке на время ребалансировки.
Cooperative/Incremental (рекомендуемый):
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Перераспределяет только изменившиеся партиции. Минимальная пауза.
Клиенты Go (segmentio/kafka-go) package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"time"
)
// --- Producer ---
func produce () {
writer := & kafka .Writer {
Addr : kafka .TCP ("localhost:9092" ),
Topic : "orders" ,
Balancer : & kafka .Hash {}, // по ключу
BatchTimeout : 10 * time .Millisecond ,
BatchSize : 100 ,
Compression : kafka .Lz4 ,
RequiredAcks : kafka .RequireAll ,
}
defer writer .Close ()
err := writer .WriteMessages (context .Background (),
kafka .Message {
Key : []byte("user-42" ),
Value : []byte(`{"order_id": 1, "amount": 99.99}` ),
Headers : []kafka .Header {
{Key : "source" , Value : []byte("api" )},
},
},
)
if err != nil {
panic(err )
}
}
// --- Consumer ---
func consume () {
reader := kafka .NewReader (kafka .ReaderConfig {
Brokers : []string {"localhost:9092" },
Topic : "orders" ,
GroupID : "order-processor" ,
MinBytes : 1 ,
MaxBytes : 10e6 , // 10MB
CommitInterval : 0 , // manual commit
StartOffset : kafka .FirstOffset , // earliest
})
defer reader .Close ()
for {
msg , err := reader .FetchMessage (context .Background ())
if err != nil {
panic(err )
}
fmt .Printf ("partition=%d offset=%d key=%s value=%s\n" ,
msg .Partition , msg .Offset , msg .Key , msg .Value )
// Обработка сообщения...
// Manual commit (at-least-once)
if err := reader .CommitMessages (context .Background (), msg ); err != nil {
panic(err )
}
}
} Go (IBM/sarama) package main
import (
"fmt"
"github.com/IBM/sarama"
)
// --- Producer ---
func produce () {
config := sarama .NewConfig ()
config .Producer .RequiredAcks = sarama .WaitForAll
config .Producer .Idempotent = true
config .Producer .Return .Successes = true
config .Producer .Compression = sarama .CompressionLZ4
config .Net .MaxOpenRequests = 1
producer , err := sarama .NewSyncProducer ([]string {"localhost:9092" }, config )
if err != nil {
panic(err )
}
defer producer .Close ()
msg := & sarama .ProducerMessage {
Topic : "orders" ,
Key : sarama .StringEncoder ("user-42" ),
Value : sarama .StringEncoder (`{"order_id": 1, "amount": 99.99}` ),
}
partition , offset , err := producer .SendMessage (msg )
fmt .Printf ("partition=%d offset=%d\n" , partition , offset )
}
// --- Consumer Group ---
type handler struct {}
func (h handler ) Setup (_ sarama .ConsumerGroupSession ) error { return nil }
func (h handler ) Cleanup (_ sarama .ConsumerGroupSession ) error { return nil }
func (h handler ) ConsumeClaim (session sarama .ConsumerGroupSession , claim sarama .ConsumerGroupClaim ) error {
for msg := range claim .Messages () {
fmt .Printf ("partition=%d offset=%d key=%s value=%s\n" ,
msg .Partition , msg .Offset , msg .Key , msg .Value )
// Обработка...
session .MarkMessage (msg , "" ) // пометить как обработанное
}
return nil
} Python (confluent-kafka) from confluent_kafka import Producer, Consumer, KafkaError
import json
# --- Producer ---
producer = Producer({
'bootstrap.servers' : 'localhost:9092' ,
'acks' : 'all' ,
'enable.idempotence' : True ,
'compression.type' : 'lz4' ,
'linger.ms' : 10 ,
'batch.size' : 65536 ,
})
def delivery_report (err, msg):
if err:
print(f 'Delivery failed: { err} ' )
else :
print(f 'Delivered to { msg. topic()} [ { msg. partition()} ] @ { msg. offset()} ' )
producer. produce(
'orders' ,
key= 'user-42' ,
value= json. dumps({'order_id' : 1 , 'amount' : 99.99 }),
callback= delivery_report,
)
producer. flush()
# --- Consumer ---
consumer = Consumer({
'bootstrap.servers' : 'localhost:9092' ,
'group.id' : 'order-processor' ,
'auto.offset.reset' : 'earliest' ,
'enable.auto.commit' : False ,
})
consumer. subscribe(['orders' ])
try :
while True :
msg = consumer. poll(timeout= 1.0 )
if msg is None :
continue
if msg. error():
if msg. error(). code() == KafkaError. _PARTITION_EOF:
continue
raise Exception (msg. error())
print(f 'partition= { msg. partition()} offset= { msg. offset()} '
f 'key= { msg. key()} value= { msg. value()} ' )
# Обработка...
consumer. commit(message= msg) # manual commit
finally :
consumer. close() Node.js (kafkajs) import { Kafka , CompressionTypes } from 'kafkajs' ;
const kafka = new Kafka ({
clientId : 'my-app' ,
brokers : ['localhost:9092' ],
});
// --- Producer ---
const producer = kafka .producer ({
idempotent : true ,
maxInFlightRequests : 5 ,
});
await producer .connect ();
await producer .send ({
topic : 'orders' ,
compression : CompressionTypes .LZ4 ,
messages : [
{
key : 'user-42' ,
value : JSON .stringify ({ order_id : 1 , amount : 99.99 }),
headers : { source : 'api' },
},
],
});
await producer .disconnect ();
// --- Consumer ---
const consumer = kafka .consumer ({ groupId : 'order-processor' });
await consumer .connect ();
await consumer .subscribe ({ topic : 'orders' , fromBeginning : true });
await consumer .run ({
eachMessage : async ({ topic , partition , message }) => {
console .log ({
partition ,
offset : message .offset ,
key : message .key ? .toString (),
value : message .value ? .toString (),
});
// Обработка...
// auto-commit по умолчанию (после eachMessage)
},
}); Java (Spring Kafka) // application.yml
// spring:
// kafka:
// bootstrap-servers: localhost:9092
// producer:
// acks: all
// key-serializer: org.apache.kafka.common.serialization.StringSerializer
// value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
// consumer:
// group-id: order-processor
// auto-offset-reset: earliest
// enable-auto-commit: false
// key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
// value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// Producer
@Service
public class OrderProducer {
private final KafkaTemplate< String, OrderEvent> kafkaTemplate;
public void send (String userId, OrderEvent event) {
kafkaTemplate.send ("orders" , userId, event);
}
}
// Consumer
@Service
public class OrderConsumer {
@KafkaListener (topics = "orders" , groupId = "order-processor" )
public void consume (@Payload OrderEvent event,
@Header (KafkaHeaders.RECEIVED_PARTITION ) int partition,
@Header (KafkaHeaders.OFFSET ) long offset,
Acknowledgment ack) {
// Обработка...
ack.acknowledge (); // manual commit
}
} Schema Registry Централизованное хранилище схем (Avro, Protobuf, JSON Schema). Гарантирует совместимость producer/consumer.
# compose.yaml (добавить к кластеру)
services :
schema-registry :
image : confluentinc/cp-schema-registry:7.7
ports :
- "8081:8081"
environment :
SCHEMA_REGISTRY_HOST_NAME : schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS : kafka:29092 # Зарегистрировать схему
curl -X POST http://localhost:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"user_id\",\"type\":\"string\"}]}"
}'
# Список subjects
curl http://localhost:8081/subjects
# Версии схемы
curl http://localhost:8081/subjects/orders-value/versions
# Проверить совместимость
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schemaType": "AVRO", "schema": "..."}'
# Уровни совместимости
# BACKWARD — новый consumer может читать старые данные (по умолчанию)
# FORWARD — старый consumer может читать новые данные
# FULL — оба направления
# NONE — без проверки
curl -X PUT http://localhost:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}' Kafka Connect Интеграция Kafka с внешними системами без кода:
# compose.yaml
services :
connect :
image : confluentinc/cp-kafka-connect:7.7
ports :
- "8083:8083"
environment :
CONNECT_BOOTSTRAP_SERVERS : kafka:29092
CONNECT_REST_PORT : 8083
CONNECT_GROUP_ID : connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC : connect-configs
CONNECT_OFFSET_STORAGE_TOPIC : connect-offsets
CONNECT_STATUS_STORAGE_TOPIC : connect-status
CONNECT_KEY_CONVERTER : org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER : org.apache.kafka.connect.json.JsonConverter # Список доступных коннекторов
curl http://localhost:8083/connector-plugins
# Создать Source-коннектор (PostgreSQL → Kafka)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "pg-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.password": "secret",
"database.dbname": "myapp",
"topic.prefix": "cdc",
"table.include.list": "public.orders,public.users",
"plugin.name": "pgoutput"
}
}'
# → топики: cdc.public.orders, cdc.public.users (CDC — Change Data Capture)
# Создать Sink-коннектор (Kafka → Elasticsearch)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "orders",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "false",
"schema.ignore": "true"
}
}'
# Статус коннекторов
curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/pg-source/status
# Перезапустить
curl -X POST http://localhost:8083/connectors/pg-source/restart
# Удалить
curl -X DELETE http://localhost:8083/connectors/pg-source Мониторинг Ключевые метрики
Ключевые метрики KafkaБрокер:
UnderReplicatedPartitions — партиции с отстающими репликами (должно быть 0)ActiveControllerCount — 1 controller в кластере (ровно 1)OfflinePartitionsCount — недоступные партиции (должно быть 0)BytesInPerSec / BytesOutPerSec — throughputRequestsPerSec — запросы к брокеруLogFlushRateAndTimeMs — скорость записи на дискConsumer:
records-lag-max — максимальное отставание (lag)records-consumed-rate — скорость потребленияcommit-latency-avg — задержка commitProducer:
record-send-rate — скорость отправкиrecord-error-rate — ошибки отправкиrequest-latency-avg — задержка запросаbatch-size-avg — средний размер batchПроверка через CLI # Consumer lag (основная метрика здоровья)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# Replica status
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --under-replicated-partitions
# Log dirs (диск)
kafka-log-dirs.sh --bootstrap-server localhost:9092 \
--describe --topic-list orders Kafka UI Визуальный интерфейс, проще всего для мониторинга:
# compose.yaml
services :
kafka-ui :
image : provectuslabs/kafka-ui
ports :
- "8080:8080"
environment :
KAFKA_CLUSTERS_0_NAME : production
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS : kafka:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY : http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME : connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS : http://connect:8083 Паттерны Event Sourcing
Event SourcingХранить не текущее состояние, а все события. Текущее состояние = replay событий.
Topic: account-events (compacted=false, retention=forever)
key=account-42 value={"type":"created","balance":0} key=account-42 value={"type":"deposited","amount":100} key=account-42 value={"type":"withdrawn","amount":30} Текущий баланс = 0 + 100 - 30 = 70 Materialized view (consumer):
Слушает account-events → обновляет состояние в PostgreSQL/Redis
CQRS (Command Query Responsibility Segregation)
CQRS (Command Query Responsibility Segregation)Write path: API → Command → Kafka topic → DB
Read path: API → Read model (materialized view, другая БД)
Topic: orders (source of truth)
Consumer 1 → PostgreSQL (для OLTP-запросов) Consumer 2 → Elasticsearch (для поиска) Consumer 3 → ClickHouse (для аналитики) Consumer 4 → Redis (для кеша) Saga (распределённые транзакции)
Saga (распределённые транзакции)Оркестрация через топики:
order-commands → Order Service → order-events payment-commands → Payment Service → payment-events shipping-commands → Shipping Service → shipping-events Saga orchestrator:
Send “create-order” → order-commands Listen order-events → “order-created” Send “process-payment” → payment-commands Listen payment-events → “payment-succeeded” или “payment-failed” Если failed → Send “cancel-order” → order-commands (компенсация ) Если succeeded → Send “ship-order” → shipping-commands Dead Letter Queue (DLQ)
Dead Letter Queue (DLQ)Consumer не может обработать сообщение → не терять, а отправить в DLQ .
Topic: orders → Consumer → обработка → (ошибка после N попыток) → Topic: orders.dlq → ручной разбор или автоповтор
# Пример: retry с DLQ
MAX_RETRIES = 3
for msg in consumer:
retries = int(msg. headers. get('retries' , 0 ))
try :
process(msg)
consumer. commit(msg)
except Exception as e:
if retries >= MAX_RETRIES:
producer. produce('orders.dlq' , key= msg. key, value= msg. value,
headers= {'error' : str(e), 'original_topic' : 'orders' })
else :
producer. produce('orders.retry' , key= msg. key, value= msg. value,
headers= {'retries' : str(retries + 1 )})
consumer. commit(msg) Outbox Pattern
Outbox PatternПроблема: нужно атомарно записать в БД И отправить в Kafka.
Решение: писать событие в outbox-таблицу в той же транзакции.
BEGIN → INSERT INTO orders (...) → INSERT INTO outbox (topic, key, value) VALUES ('orders', 'user-42', '{"order_id":1}') → COMMITOutbox publisher (или Debezium CDC) читает outbox → публикует в Kafka → помечает отправленным.
Consume-Transform-ProduceПрочитать из одного топика → трансформировать → записать в другой. С Exactly-Once Semantics :
consumer.poll() → transform(message)producer.send(output_topic)producer.sendOffsetsToTransaction(consumer_offsets)producer.commitTransaction()Все операции в одной Kafka-транзакции .
Выбор количества партиций
Выбор количества партицийПравило: partitions >= max(expected_consumers)
Факторы:
Throughput — больше партиций → больше параллелизм → выше throughputOrdering — порядок гарантирован ТОЛЬКО внутри одной партицииConsumers — consumer-ов в группе не может быть больше, чем партицийOverhead — каждая партиция = файлы на диске, память, открытые дескрипторыRebalancing — больше партиций → дольше rebalancingРекомендации:
Маленький проект: 3-6 партиций Средний: 6-12 партиций Высокая нагрузка: 12-50 партиций Экстремальная: 50-200 (редко нужно) Партиции можно увеличить, но НЕ уменьшить! При увеличении — ключи перераспределяются (hash(key) % new_partitions).
Частые проблемы Consumer lag растёт:
# Проверить lag
kafka-consumer-groups.sh --describe --group my-group \
--bootstrap-server localhost:9092 Решения:
Увеличить количество consumer-ов (до числа партиций) Увеличить max.poll.records Оптимизировать обработку сообщений (async, batch) Увеличить количество партиций + consumer-ов Профилировать consumer, найти bottleneck Rebalancing слишком частый:
Частый Rebalancing: причины и решенияСимптомы: consumer-ы постоянно теряют/получают партиции.
Причины:
max.poll.interval.ms слишком маленький — обработка не успеваетsession.timeout.ms слишком маленький — heartbeat не доходитConsumer-ы часто падают/перезапускаются Решения:
max.poll.interval.ms=600000 (10 минут)session.timeout.ms=45000 (45 секунд)heartbeat.interval.ms=3000 (3 секунды)partition.assignment.strategy=CooperativeStickyAssignorСообщения теряются:
Сообщения теряются: чеклистProducer: acks=all? retries > 0? delivery.timeout.ms достаточный?Broker: min.insync.replicas=2? replication.factor=3?Consumer: enable.auto.commit=false? commit после обработки?Retention: retention.ms не слишком маленький?Сообщения дублируются:
Сообщения дублируются: причины и решенияПричина: consumer упал после обработки, но до commit offset.
Решения:
Идемпотентная обработка (INSERT ON CONFLICT, upsert)Дедупликация по уникальному ID сообщенияExactly-once semantics (транзакции)Хранить offset в той же БД что и результат (atomic commit) Порядок сообщений нарушен:
Порядок сообщений нарушенПорядок гарантирован ТОЛЬКО внутри одной партиции.
Решения:
Использовать ключ сообщения — все сообщения одного entity в одной партиции max.in.flight.requests.per.connection=1 (без идемпотентности) или =5 с enable.idempotence=trueОдна партиция (жертвуем throughput) Диск заканчивается:
# Проверить размер топиков
kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
# Уменьшить retention
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
--add-config retention.ms= 86400000 # 1 день
# Удалить старые сегменты
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
--add-config retention.bytes= 10737418240 # 10GB на партицию
# Включить сжатие (если не включено)
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
--add-config compression.type= zstd UnderReplicatedPartitions > 0:
UnderReplicatedPartitions > 0Реплики отстают от лидера.
Причины:
Брокер перегружен (диск, сеть, CPU) Брокер недоступен Сеть между брокерами Проверить:
kafka-topics.sh --describe --under-replicated-partitions — посмотреть, какой брокер является follower для проблемных партиций.