Kafka: распределённый брокер сообщений
Концепции
Producer → Topic (partitions) → Consumer Group (consumers)
↓
Broker Cluster (хранение на диске, репликация)Topic: именованный канал сообщений. Аналог таблицы в БД. Сообщения в топике хранятся на диске, не удаляются при чтении.
Partition: физическое разделение топика. Каждая партиция представляет собой упорядоченный, неизменяемый лог (append-only). Внутри партиции сообщения имеют уникальный offset (порядковый номер). Партиции распределяются по брокерам.
Offset: порядковый номер сообщения внутри партиции. Монотонно растёт. Consumer запоминает свой offset, то есть «где остановился». Offset 0 = начало, offset -1 = конец.
Broker: один сервер Kafka. Хранит партиции, обслуживает клиентов. Кластер = несколько брокеров.
Producer: публикует сообщения в топик. Выбирает партицию по ключу сообщения (или round-robin).
Consumer: читает сообщения из топика. Хранит свой offset (позицию чтения).
Consumer Group: группа consumer-ов, читающих один топик совместно. Каждая партиция назначается ровно одному consumer-у в группе. Если consumer-ов больше, чем партиций, лишние простаивают.
Replication Factor: количество копий каждой партиции. RF=3 → данные на 3 брокерах. Leader обслуживает чтение/запись, follower-ы реплицируют.
ISR (In-Sync Replicas): реплики, не отстающие от лидера. Сообщение считается committed, когда записано во все ISR.
Topic: orders (3 партиции, RF=2)
──────────────────────────────────────
Partition 0: [msg0, msg1, msg2, msg3, ...] → Broker 1 (leader), Broker 2 (follower)
Partition 1: [msg0, msg1, msg2, ...] → Broker 2 (leader), Broker 3 (follower)
Partition 2: [msg0, msg1, ...] → Broker 3 (leader), Broker 1 (follower)
Consumer Group: order-service (3 consumer-а)
consumer-1 ← Partition 0
consumer-2 ← Partition 1
consumer-3 ← Partition 2Гарантии доставки
At-most-once — сообщение может быть потеряно, но не дублировано
(commit offset до обработки)
At-least-once — сообщение доставлено хотя бы раз, возможны дубликаты (по умолчанию)
(commit offset после обработки)
Exactly-once — ровно один раз (idempotent producer + transactions)
(самый медленный, включается явно)Kafka vs очереди (RabbitMQ, Redis)
Традиционная очередь Kafka
──────────────────────── ────────────────────────
Сообщение удаляется при чтении Сообщение хранится до retention
Один потребитель на сообщение Множество consumer group читают независимо
Push-модель (брокер → consumer) Pull-модель (consumer → брокер)
Сложная маршрутизация Простая модель (topic + partition)
Малые объёмы, low latency Огромные объёмы, высокий throughputУстановка
Docker Compose (KRaft, без ZooKeeper)
# compose.yaml
services:
kafka:
image: apache/kafka:3.9
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_LOG_RETENTION_BYTES: 1073741824
volumes:
- kafka_data:/var/lib/kafka/data
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
depends_on:
- kafka
volumes:
kafka_data:Кластер из 3 брокеров
# compose.yaml
services:
kafka-1:
image: apache/kafka:3.9
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,EXTERNAL://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_MIN_INSYNC_REPLICAS: 2
ports:
- "9092:9092"
volumes:
- kafka1_data:/var/lib/kafka/data
kafka-2:
image: apache/kafka:3.9
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
ports:
- "9093:9093"
volumes:
- kafka2_data:/var/lib/kafka/data
kafka-3:
image: apache/kafka:3.9
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,EXTERNAL://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
ports:
- "9094:9094"
volumes:
- kafka3_data:/var/lib/kafka/data
volumes:
kafka1_data:
kafka2_data:
kafka3_data:CLI: kafka-* утилиты
# Если из Docker:
docker exec -it kafka bash
# Утилиты в /opt/kafka/bin/
# Или через kcat (kafkacat) — удобнее для разработки
# Arch: sudo pacman -S kcat
# macOS: brew install kcat
# Ubuntu: sudo apt install kafkacatТопики
# Создать топик
kafka-topics.sh --bootstrap-server localhost:9092 \
--create --topic orders \
--partitions 6 \
--replication-factor 3
# Список топиков
kafka-topics.sh --bootstrap-server localhost:9092 --list
# Описание топика
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic orders
# Увеличить партиции (уменьшить нельзя!)
kafka-topics.sh --bootstrap-server localhost:9092 \
--alter --topic orders --partitions 12
# Удалить топик
kafka-topics.sh --bootstrap-server localhost:9092 \
--delete --topic orders
# Изменить конфигурацию топика
kafka-configs.sh --bootstrap-server localhost:9092 \
--alter --entity-type topics --entity-name orders \
--add-config retention.ms=604800000 # retention 7 дней
# Посмотреть конфигурацию
kafka-configs.sh --bootstrap-server localhost:9092 \
--describe --entity-type topics --entity-name ordersProducer (CLI)
# Простая отправка (stdin → Kafka)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic orders
> {"order_id": 1, "amount": 99.99}
> {"order_id": 2, "amount": 49.99}
> ^C
# С ключом (key:value)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic orders \
--property parse.key=true \
--property key.separator=:
> user42:{"order_id": 1, "amount": 99.99}
> user43:{"order_id": 2, "amount": 49.99}
# Из файла
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic orders < messages.txt
# kcat (проще)
echo '{"order_id": 1}' | kcat -b localhost:9092 -t orders -P
kcat -b localhost:9092 -t orders -P -K: < messages.txt # key:valueConsumer (CLI)
# Читать новые сообщения
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders
# С начала
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders --from-beginning
# С ключами и метаданными
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders --from-beginning \
--property print.key=true \
--property print.timestamp=true \
--property print.partition=true \
--property print.offset=true
# В составе consumer group
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic orders --group order-processor
# kcat (проще)
kcat -b localhost:9092 -t orders -C # новые
kcat -b localhost:9092 -t orders -C -o beginning # с начала
kcat -b localhost:9092 -t orders -C -o end # только новые
kcat -b localhost:9092 -t orders -C -o -10 # последние 10
kcat -b localhost:9092 -t orders -C -p 0 -o 100 # partition 0, offset 100
kcat -b localhost:9092 -t orders -C -f '%T %k %s\n' # формат: timestamp key value
# Метаданные кластера
kcat -b localhost:9092 -L # список топиков и брокеров
kcat -b localhost:9092 -L -t orders # детали по топикуConsumer Groups
# Список consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# Описание группы (offset, lag)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# Вывод:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-proc orders 0 1000 1050 50
# order-proc orders 1 2000 2000 0
# LAG = отставание
# Сбросить offset (все consumer-ы в группе должны быть остановлены!)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --to-earliest --execute
# Сброс на конкретный offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --to-offset 1000 --execute
# Сброс по времени
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --to-datetime 2025-03-15T00:00:00.000 --execute
# Сдвиг offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group order-processor --topic orders \
--reset-offsets --shift-by -100 --execute
# Удалить группу
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--delete --group order-processorКонфигурация
Брокер (server.properties / переменные окружения)
# Идентификация
node.id=1
process.roles=broker,controller
# Сеть
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-host:9092
# Логи (хранилище сообщений)
log.dirs=/var/lib/kafka/data
log.retention.hours=168 # хранить 7 дней (по умолчанию)
log.retention.bytes=-1 # без лимита по размеру (-1)
log.segment.bytes=1073741824 # размер сегмента 1GB
log.cleanup.policy=delete # delete или compact
# Репликация
default.replication.factor=3
min.insync.replicas=2 # минимум ISR для acks=all
unclean.leader.election.enable=false # не выбирать не-ISR реплику лидером
# Топики
num.partitions=6 # партиции по умолчанию
auto.create.topics.enable=false # не создавать топики автоматически
# Производительность
num.io.threads=8
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400Конфигурация топика
# При создании
kafka-topics.sh --create --topic events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete \
--config max.message.bytes=10485760
# Изменение
kafka-configs.sh --alter --entity-type topics --entity-name events \
--add-config retention.ms=259200000 # 3 дня| Параметр | Описание | По умолчанию |
|---|---|---|
retention.ms | Время хранения сообщений | 604800000 (7 дней) |
retention.bytes | Лимит размера партиции | -1 (без лимита) |
cleanup.policy | delete или compact | delete |
max.message.bytes | Максимальный размер сообщения | 1048576 (1MB) |
min.insync.replicas | Минимум ISR для acks=all | 1 |
segment.bytes | Размер лог-сегмента | 1073741824 (1GB) |
compression.type | Сжатие: none, gzip, snappy, lz4, zstd | producer |
Cleanup policies
delete — удалять сегменты старше retention.ms или при превышении retention.bytes
compact — оставлять только последнее сообщение для каждого ключа (log compaction)Log compaction: для топиков-снимков (состояние, конфигурация). Сохраняет последнее значение для каждого ключа, удаляя старые. Сообщение с value=null (tombstone) удаляет ключ.
kafka-topics.sh --create --topic user-profiles \
--partitions 6 \
--config cleanup.policy=compact \
--config min.cleanable.dirty.ratio=0.5 \
--config delete.retention.ms=86400000Producer
Конфигурация
| Параметр | Описание | Рекомендация |
|---|---|---|
bootstrap.servers | Адреса брокеров | Минимум 2-3 для отказоустойчивости |
acks | Подтверждение записи | all для надёжности |
retries | Повторы при ошибке | 2147483647 (бесконечно) |
delivery.timeout.ms | Общий таймаут доставки | 120000 (2 мин) |
linger.ms | Задержка для батчинга | 5-100 (больше = больший batch) |
batch.size | Размер batch в байтах | 16384-65536 |
compression.type | Сжатие | lz4 или zstd |
enable.idempotence | Идемпотентность (дедупликация) | true |
max.in.flight.requests.per.connection | Параллельные запросы | 5 (с идемпотентностью) |
key.serializer | Сериализатор ключей | StringSerializer / Avro / Protobuf |
value.serializer | Сериализатор значений | StringSerializer / Avro / Protobuf |
acks: уровни подтверждения
acks=0 — не ждать подтверждения (максимальная скорость, возможна потеря)
acks=1 — ждать подтверждения от leader-а (компромисс)
acks=all — ждать подтверждения от всех ISR (максимальная надёжность)
Работает в связке с min.insync.replicasДля продакшена: acks=all + min.insync.replicas=2 + replication.factor=3.
Partitioning (выбор партиции)
1. Явная партиция — producer указывает номер (редко)
2. По ключу — hash(key) % num_partitions (основной способ)
3. Round-robin — без ключа, равномерное распределение
4. Custom partitioner — своя логикаСообщения с одинаковым ключом всегда попадают в одну партицию → гарантирован порядок для одного ключа.
Key = user_id → все события одного пользователя в одной партиции → упорядочены
Key = null → round-robin → порядок не гарантированИдемпотентный producer
enable.idempotence=true (по умолчанию с Kafka 3.0)
Каждый producer получает уникальный PID (producer ID).
Каждое сообщение — sequence number.
Брокер отклоняет дубликаты по (PID, sequence).
→ Exactly-once на уровне producer → partition.Транзакции (Exactly-Once Semantics)
transactional.id=my-producer-1
Producer:
beginTransaction()
send(topic1, message1)
send(topic2, message2)
sendOffsetsToTransaction(offsets, consumerGroupId) // для consume-transform-produce
commitTransaction() // или abortTransaction()
→ Все сообщения атомарно: либо все видны consumer-ам, либо ни одного.Consumer
Конфигурация
| Параметр | Описание | Рекомендация |
|---|---|---|
bootstrap.servers | Адреса брокеров | Минимум 2-3 |
group.id | Consumer group | Обязателен для групповой работы |
auto.offset.reset | Стратегия при отсутствии offset | earliest или latest |
enable.auto.commit | Автоматический commit offset | false для at-least-once |
auto.commit.interval.ms | Интервал автокоммита | 5000 |
max.poll.records | Записей за один poll | 500 |
max.poll.interval.ms | Максимум между poll-ами | 300000 (5 мин) |
session.timeout.ms | Таймаут сессии (heartbeat) | 45000 |
heartbeat.interval.ms | Интервал heartbeat | 3000 |
fetch.min.bytes | Минимум данных для ответа | 1 |
fetch.max.wait.ms | Максимальное ожидание данных | 500 |
isolation.level | read_committed / read_uncommitted | read_committed для EOS |
auto.offset.reset
earliest — начать с начала топика (если нет сохранённого offset)
latest — начать с конца (только новые сообщения)
none — ошибка, если нет сохранённого offsetCommit стратегии
Auto-commit (enable.auto.commit=true):
Фоновый коммит каждые auto.commit.interval.ms.
Просто, но может привести к потере сообщений (commit до обработки)
или дубликатам (crash до commit).
Manual sync commit:
consumer.commitSync() — после успешной обработки batch-а.
At-least-once: при crash повторная обработка от последнего commit.
Manual async commit:
consumer.commitAsync() — не блокирует.
Callback для обработки ошибок.
Commit per-partition:
commitSync(Map<TopicPartition, OffsetAndMetadata>) — точный контроль.Rebalancing
Когда consumer присоединяется/покидает группу, партиции перераспределяются:
Eager (по умолчанию до Kafka 2.4):
Отзывает все партиции → перераспределяет.
Пауза в обработке на время ребалансировки.
Cooperative/Incremental (рекомендуемый):
partition.assignment.strategy=
org.apache.kafka.clients.consumer.CooperativeStickyAssignor
Перераспределяет только изменившиеся партиции.
Минимальная пауза.Клиенты
Go (segmentio/kafka-go)
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"time"
)
// --- Producer ---
func produce() {
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "orders",
Balancer: &kafka.Hash{}, // по ключу
BatchTimeout: 10 * time.Millisecond,
BatchSize: 100,
Compression: kafka.Lz4,
RequiredAcks: kafka.RequireAll,
}
defer writer.Close()
err := writer.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("user-42"),
Value: []byte(`{"order_id": 1, "amount": 99.99}`),
Headers: []kafka.Header{
{Key: "source", Value: []byte("api")},
},
},
)
if err != nil {
panic(err)
}
}
// --- Consumer ---
func consume() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: "orders",
GroupID: "order-processor",
MinBytes: 1,
MaxBytes: 10e6, // 10MB
CommitInterval: 0, // manual commit
StartOffset: kafka.FirstOffset, // earliest
})
defer reader.Close()
for {
msg, err := reader.FetchMessage(context.Background())
if err != nil {
panic(err)
}
fmt.Printf("partition=%d offset=%d key=%s value=%s\n",
msg.Partition, msg.Offset, msg.Key, msg.Value)
// Обработка сообщения...
// Manual commit (at-least-once)
if err := reader.CommitMessages(context.Background(), msg); err != nil {
panic(err)
}
}
}Go (IBM/sarama)
package main
import (
"fmt"
"github.com/IBM/sarama"
)
// --- Producer ---
func produce() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Idempotent = true
config.Producer.Return.Successes = true
config.Producer.Compression = sarama.CompressionLZ4
config.Net.MaxOpenRequests = 1
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
if err != nil {
panic(err)
}
defer producer.Close()
msg := &sarama.ProducerMessage{
Topic: "orders",
Key: sarama.StringEncoder("user-42"),
Value: sarama.StringEncoder(`{"order_id": 1, "amount": 99.99}`),
}
partition, offset, err := producer.SendMessage(msg)
fmt.Printf("partition=%d offset=%d\n", partition, offset)
}
// --- Consumer Group ---
type handler struct{}
func (h handler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (h handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("partition=%d offset=%d key=%s value=%s\n",
msg.Partition, msg.Offset, msg.Key, msg.Value)
// Обработка...
session.MarkMessage(msg, "") // пометить как обработанное
}
return nil
}Python (confluent-kafka)
from confluent_kafka import Producer, Consumer, KafkaError
import json
# --- Producer ---
producer = Producer({
'bootstrap.servers': 'localhost:9092',
'acks': 'all',
'enable.idempotence': True,
'compression.type': 'lz4',
'linger.ms': 10,
'batch.size': 65536,
})
def delivery_report(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
producer.produce(
'orders',
key='user-42',
value=json.dumps({'order_id': 1, 'amount': 99.99}),
callback=delivery_report,
)
producer.flush()
# --- Consumer ---
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
})
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
print(f'partition={msg.partition()} offset={msg.offset()} '
f'key={msg.key()} value={msg.value()}')
# Обработка...
consumer.commit(message=msg) # manual commit
finally:
consumer.close()Node.js (kafkajs)
import { Kafka, CompressionTypes } from 'kafkajs';
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
});
// --- Producer ---
const producer = kafka.producer({
idempotent: true,
maxInFlightRequests: 5,
});
await producer.connect();
await producer.send({
topic: 'orders',
compression: CompressionTypes.LZ4,
messages: [
{
key: 'user-42',
value: JSON.stringify({ order_id: 1, amount: 99.99 }),
headers: { source: 'api' },
},
],
});
await producer.disconnect();
// --- Consumer ---
const consumer = kafka.consumer({ groupId: 'order-processor' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
key: message.key?.toString(),
value: message.value?.toString(),
});
// Обработка...
// auto-commit по умолчанию (после eachMessage)
},
});Java (Spring Kafka)
// application.yml
// spring:
// kafka:
// bootstrap-servers: localhost:9092
// producer:
// acks: all
// key-serializer: org.apache.kafka.common.serialization.StringSerializer
// value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
// consumer:
// group-id: order-processor
// auto-offset-reset: earliest
// enable-auto-commit: false
// key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
// value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
// Producer
@Service
public class OrderProducer {
private final KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void send(String userId, OrderEvent event) {
kafkaTemplate.send("orders", userId, event);
}
}
// Consumer
@Service
public class OrderConsumer {
@KafkaListener(topics = "orders", groupId = "order-processor")
public void consume(@Payload OrderEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
// Обработка...
ack.acknowledge(); // manual commit
}
}Schema Registry
Централизованное хранилище схем (Avro, Protobuf, JSON Schema). Гарантирует совместимость producer/consumer.
# compose.yaml (добавить к кластеру)
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.7
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092# Зарегистрировать схему
curl -X POST http://localhost:8081/subjects/orders-value/versions \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{
"schemaType": "AVRO",
"schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"user_id\",\"type\":\"string\"}]}"
}'
# Список subjects
curl http://localhost:8081/subjects
# Версии схемы
curl http://localhost:8081/subjects/orders-value/versions
# Проверить совместимость
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schemaType": "AVRO", "schema": "..."}'
# Уровни совместимости
# BACKWARD — новый consumer может читать старые данные (по умолчанию)
# FORWARD — старый consumer может читать новые данные
# FULL — оба направления
# NONE — без проверки
curl -X PUT http://localhost:8081/config/orders-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "FULL"}'Kafka Connect
Интеграция Kafka с внешними системами без кода:
# compose.yaml
services:
connect:
image: confluentinc/cp-kafka-connect:7.7
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:29092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter# Список доступных коннекторов
curl http://localhost:8083/connector-plugins
# Создать Source-коннектор (PostgreSQL → Kafka)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "pg-source",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "replicator",
"database.password": "secret",
"database.dbname": "myapp",
"topic.prefix": "cdc",
"table.include.list": "public.orders,public.users",
"plugin.name": "pgoutput"
}
}'
# → топики: cdc.public.orders, cdc.public.users (CDC — Change Data Capture)
# Создать Sink-коннектор (Kafka → Elasticsearch)
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "es-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "orders",
"connection.url": "http://elasticsearch:9200",
"key.ignore": "false",
"schema.ignore": "true"
}
}'
# Статус коннекторов
curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/pg-source/status
# Перезапустить
curl -X POST http://localhost:8083/connectors/pg-source/restart
# Удалить
curl -X DELETE http://localhost:8083/connectors/pg-sourceМониторинг
Ключевые метрики
Брокер:
UnderReplicatedPartitions — партиции с отстающими репликами (должно быть 0)
ActiveControllerCount — 1 controller в кластере (ровно 1)
OfflinePartitionsCount — недоступные партиции (должно быть 0)
BytesInPerSec / BytesOutPerSec — throughput
RequestsPerSec — запросы к брокеру
LogFlushRateAndTimeMs — скорость записи на диск
Consumer:
records-lag-max — максимальное отставание (lag)
records-consumed-rate — скорость потребления
commit-latency-avg — задержка commit
Producer:
record-send-rate — скорость отправки
record-error-rate — ошибки отправки
request-latency-avg — задержка запроса
batch-size-avg — средний размер batchПроверка через CLI
# Consumer lag (основная метрика здоровья)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-processor
# Replica status
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --under-replicated-partitions
# Log dirs (диск)
kafka-log-dirs.sh --bootstrap-server localhost:9092 \
--describe --topic-list ordersKafka UI
Визуальный интерфейс, проще всего для мониторинга:
# compose.yaml
services:
kafka-ui:
image: provectuslabs/kafka-ui
ports:
- "8080:8080"
environment:
KAFKA_CLUSTERS_0_NAME: production
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083Паттерны
Event Sourcing
Хранить не текущее состояние, а все события. Текущее состояние = replay событий.
Topic: account-events (compacted=false, retention=forever)
key=account-42 value={"type":"created","balance":0}
key=account-42 value={"type":"deposited","amount":100}
key=account-42 value={"type":"withdrawn","amount":30}
→ текущий баланс = 0 + 100 - 30 = 70
Materialized view (consumer):
Слушает account-events → обновляет состояние в PostgreSQL/RedisCQRS (Command Query Responsibility Segregation)
Write path: API → Command → Kafka topic → DB
Read path: API → Read model (materialized view, другая БД)
Topic: orders (source of truth)
Consumer 1: → PostgreSQL (для OLTP-запросов)
Consumer 2: → Elasticsearch (для поиска)
Consumer 3: → ClickHouse (для аналитики)
Consumer 4: → Redis (для кеша)Saga (распределённые транзакции)
Оркестрация через топики:
Topic: order-commands → Order Service → Topic: order-events
Topic: payment-commands → Payment Service → Topic: payment-events
Topic: shipping-commands → Shipping Service → Topic: shipping-events
Saga orchestrator:
1. Send "create-order" → order-commands
2. Listen order-events → "order-created"
3. Send "process-payment" → payment-commands
4. Listen payment-events → "payment-succeeded" или "payment-failed"
5. Если failed → Send "cancel-order" → order-commands (компенсация)
6. Если succeeded → Send "ship-order" → shipping-commandsDead Letter Queue (DLQ)
Consumer не может обработать сообщение → не терять, а отправить в DLQ.
Topic: orders → Consumer → обработка
↓ (ошибка после N попыток)
Topic: orders.dlq → ручной разбор или автоповтор# Пример: retry с DLQ
MAX_RETRIES = 3
for msg in consumer:
retries = int(msg.headers.get('retries', 0))
try:
process(msg)
consumer.commit(msg)
except Exception as e:
if retries >= MAX_RETRIES:
producer.produce('orders.dlq', key=msg.key, value=msg.value,
headers={'error': str(e), 'original_topic': 'orders'})
else:
producer.produce('orders.retry', key=msg.key, value=msg.value,
headers={'retries': str(retries + 1)})
consumer.commit(msg)Outbox Pattern
Проблема: нужно атомарно записать в БД И отправить в Kafka.
Решение: писать событие в outbox-таблицу в той же транзакции.
1. BEGIN
INSERT INTO orders (...) VALUES (...);
INSERT INTO outbox (topic, key, value) VALUES ('orders', 'user-42', '{"order_id":1}');
COMMIT;
2. Outbox publisher (или Debezium CDC) читает outbox → публикует в Kafka → помечает отправленным.Consume-Transform-Produce
Прочитать из одного топика → трансформировать → записать в другой.
С Exactly-Once Semantics:
consumer.poll() → transform(message) → producer.send(output_topic)
→ producer.sendOffsetsToTransaction(consumer_offsets)
→ producer.commitTransaction()
Все три операции в одной Kafka-транзакции.Выбор количества партиций
Правило: partitions >= max(expected_consumers)
Факторы:
Throughput: больше партиций → больше параллелизм → выше throughput
Ordering: порядок гарантирован ТОЛЬКО внутри одной партиции
Consumers: consumer-ов в группе не может быть больше, чем партиций
Overhead: каждая партиция = файлы на диске, память, открытые дескрипторы
Rebalancing: больше партиций → дольше rebalancing
Рекомендации:
Маленький проект: 3-6 партиций
Средний: 6-12 партиций
Высокая нагрузка: 12-50 партиций
Экстремальная: 50-200 (редко нужно)
Партиции можно увеличить, но НЕ уменьшить!
При увеличении — ключи перераспределяются (hash(key) % new_partitions).Частые проблемы
Consumer lag растёт:
# Проверить lag
kafka-consumer-groups.sh --describe --group my-group \
--bootstrap-server localhost:9092Решения:
- Увеличить количество consumer-ов (до числа партиций)
- Увеличить
max.poll.records - Оптимизировать обработку сообщений (async, batch)
- Увеличить количество партиций + consumer-ов
- Профилировать consumer, найти bottleneck
Rebalancing слишком частый:
Симптомы: consumer-ы постоянно теряют/получают партиции.
Причины:
1. max.poll.interval.ms слишком маленький — обработка не успевает
2. session.timeout.ms слишком маленький — heartbeat не доходит
3. Consumer-ы часто падают/перезапускаются
Решения:
max.poll.interval.ms=600000 # 10 минут
session.timeout.ms=45000 # 45 секунд
heartbeat.interval.ms=3000 # 3 секунды
partition.assignment.strategy=CooperativeStickyAssignorСообщения теряются:
Проверить:
Producer: acks=all? retries > 0? delivery.timeout.ms достаточный?
Broker: min.insync.replicas=2? replication.factor=3?
Consumer: enable.auto.commit=false? commit после обработки?
Retention: retention.ms не слишком маленький?Сообщения дублируются:
Причина: consumer упал после обработки, но до commit offset.
Решения:
1. Идемпотентная обработка (INSERT ON CONFLICT, upsert)
2. Дедупликация по уникальному ID сообщения
3. Exactly-once semantics (транзакции)
4. Хранить offset в той же БД что и результат (atomic commit)Порядок сообщений нарушен:
Порядок гарантирован ТОЛЬКО внутри одной партиции.
Решения:
1. Использовать ключ сообщения — все сообщения одного entity в одной партиции
2. max.in.flight.requests.per.connection=1 (без идемпотентности)
или =5 с enable.idempotence=true
3. Одна партиция (жертвуем throughput)Диск заканчивается:
# Проверить размер топиков
kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe
# Уменьшить retention
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
--add-config retention.ms=86400000 # 1 день
# Удалить старые сегменты
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
--add-config retention.bytes=10737418240 # 10GB на партицию
# Включить сжатие (если не включено)
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
--add-config compression.type=zstdUnderReplicatedPartitions > 0:
Реплики отстают от лидера.
Причины:
1. Брокер перегружен (диск, сеть, CPU)
2. Брокер недоступен
3. Сеть между брокерами
Проверить:
kafka-topics.sh --describe --under-replicated-partitions
# Посмотреть, какой брокер — follower для проблемных партиций