Kafka: распределённый брокер сообщений

Концепции

Official Docs | Confluent Developer

Архитектура Kafka

ProducerTopic (partitions) → Consumer Group (consumers)

Broker Cluster — хранение на диске, репликация

Topic: именованный канал сообщений. Аналог таблицы в БД. Сообщения в топике хранятся на диске, не удаляются при чтении.

Partition: физическое разделение топика. Каждая партиция представляет собой упорядоченный, неизменяемый лог (append-only). Внутри партиции сообщения имеют уникальный offset (порядковый номер). Партиции распределяются по брокерам.

Offset: порядковый номер сообщения внутри партиции. Монотонно растёт. Consumer запоминает свой offset, то есть «где остановился». Offset 0 = начало, offset -1 = конец.

Broker: один сервер Kafka. Хранит партиции, обслуживает клиентов. Кластер = несколько брокеров.

Producer: публикует сообщения в топик. Выбирает партицию по ключу сообщения (или round-robin).

Consumer: читает сообщения из топика. Хранит свой offset (позицию чтения).

Consumer Group: группа consumer-ов, читающих один топик совместно. Каждая партиция назначается ровно одному consumer-у в группе. Если consumer-ов больше, чем партиций, лишние простаивают.

Replication Factor: количество копий каждой партиции. RF=3 → данные на 3 брокерах. Leader обслуживает чтение/запись, follower-ы реплицируют.

ISR (In-Sync Replicas): реплики, не отстающие от лидера. Сообщение считается committed, когда записано во все ISR.

Распределение партиций и consumer-ов

Topic: orders (3 партиции, RF=2)

  • Partition 0: [msg0, msg1, msg2, msg3, …] → Broker 1 (leader), Broker 2 (follower)
  • Partition 1: [msg0, msg1, msg2, …] → Broker 2 (leader), Broker 3 (follower)
  • Partition 2: [msg0, msg1, …] → Broker 3 (leader), Broker 1 (follower)

Consumer Group: order-service (3 consumer-а)

  • consumer-1 ← Partition 0
  • consumer-2 ← Partition 1
  • consumer-3 ← Partition 2

Гарантии доставки

ГарантияОписаниеМеханизм
At-most-onceСообщение может быть потеряно, но не дублированоcommit offset до обработки
At-least-onceСообщение доставлено хотя бы раз, возможны дубликаты (по умолчанию)commit offset после обработки
Exactly-onceРовно один раз (самый медленный, включается явно)idempotent producer + transactions

Kafka vs очереди (RabbitMQ, Redis)

Традиционная очередьKafka
Сообщение удаляется при чтенииСообщение хранится до retention
Один потребитель на сообщениеМножество consumer group читают независимо
Push-модель (брокер → consumer)Pull-модель (consumer → брокер)
Сложная маршрутизацияПростая модель (topic + partition)
Малые объёмы, low latencyОгромные объёмы, высокий throughput

Установка

Docker Compose (KRaft, без ZooKeeper)

# compose.yaml
services:
  kafka:
    image: apache/kafka:3.9
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_RETENTION_BYTES: 1073741824
    volumes:
      - kafka_data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
    depends_on:
      - kafka

volumes:
  kafka_data:

Кластер из 3 брокеров

# compose.yaml
services:
  kafka-1:
    image: apache/kafka:3.9
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
    ports:
      - "9092:9092"
    volumes:
      - kafka1_data:/var/lib/kafka/data

  kafka-2:
    image: apache/kafka:3.9
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    ports:
      - "9093:9093"
    volumes:
      - kafka2_data:/var/lib/kafka/data

  kafka-3:
    image: apache/kafka:3.9
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    ports:
      - "9094:9094"
    volumes:
      - kafka3_data:/var/lib/kafka/data

volumes:
  kafka1_data:
  kafka2_data:
  kafka3_data:

CLI: kafka-* утилиты

# Если из Docker:
docker exec -it kafka bash
# Утилиты в /opt/kafka/bin/

# Или через kcat (kafkacat) — удобнее для разработки
# Arch: sudo pacman -S kcat
# macOS: brew install kcat
# Ubuntu: sudo apt install kafkacat

Топики

# Создать топик
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders \
  --partitions 6 \
  --replication-factor 3

# Список топиков
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Описание топика
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

# Увеличить партиции (уменьшить нельзя!)
kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter --topic orders --partitions 12

# Удалить топик
kafka-topics.sh --bootstrap-server localhost:9092 \
  --delete --topic orders

# Изменить конфигурацию топика
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --entity-type topics --entity-name orders \
  --add-config retention.ms=604800000        # retention 7 дней

# Посмотреть конфигурацию
kafka-configs.sh --bootstrap-server localhost:9092 \
  --describe --entity-type topics --entity-name orders

Producer (CLI)

# Простая отправка (stdin → Kafka)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders
> {"order_id": 1, "amount": 99.99}
> {"order_id": 2, "amount": 49.99}
> ^C

# С ключом (key:value)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders \
  --property parse.key=true \
  --property key.separator=:
> user42:{"order_id": 1, "amount": 99.99}
> user43:{"order_id": 2, "amount": 49.99}

# Из файла
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders < messages.txt

# kcat (проще)
echo '{"order_id": 1}' | kcat -b localhost:9092 -t orders -P
kcat -b localhost:9092 -t orders -P -K: < messages.txt   # key:value

Consumer (CLI)

# Читать новые сообщения
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders

# С начала
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders --from-beginning

# С ключами и метаданными
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders --from-beginning \
  --property print.key=true \
  --property print.timestamp=true \
  --property print.partition=true \
  --property print.offset=true

# В составе consumer group
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders --group order-processor

# kcat (проще)
kcat -b localhost:9092 -t orders -C                       # новые
kcat -b localhost:9092 -t orders -C -o beginning          # с начала
kcat -b localhost:9092 -t orders -C -o end                # только новые
kcat -b localhost:9092 -t orders -C -o -10                # последние 10
kcat -b localhost:9092 -t orders -C -p 0 -o 100           # partition 0, offset 100
kcat -b localhost:9092 -t orders -C -f '%T %k %s\n'       # формат: timestamp key value

# Метаданные кластера
kcat -b localhost:9092 -L                                 # список топиков и брокеров
kcat -b localhost:9092 -L -t orders                       # детали по топику

Consumer Groups

# Список consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Описание группы (offset, lag)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor

# Вывод:
# GROUP         TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-proc    orders   0          1000            1050            50
# order-proc    orders   1          2000            2000            0
#                                                             LAG = отставание

# Сбросить offset (все consumer-ы в группе должны быть остановлены!)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --to-earliest --execute

# Сброс на конкретный offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --to-offset 1000 --execute

# Сброс по времени
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --to-datetime 2025-03-15T00:00:00.000 --execute

# Сдвиг offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --shift-by -100 --execute

# Удалить группу
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --delete --group order-processor

Конфигурация

Брокер (server.properties / переменные окружения)

# Идентификация
node.id=1
process.roles=broker,controller

# Сеть
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-host:9092

# Логи (хранилище сообщений)
log.dirs=/var/lib/kafka/data
log.retention.hours=168                  # хранить 7 дней (по умолчанию)
log.retention.bytes=-1                   # без лимита по размеру (-1)
log.segment.bytes=1073741824             # размер сегмента 1GB
log.cleanup.policy=delete                # delete или compact

# Репликация
default.replication.factor=3
min.insync.replicas=2                    # минимум ISR для acks=all
unclean.leader.election.enable=false     # не выбирать не-ISR реплику лидером

# Топики
num.partitions=6                         # партиции по умолчанию
auto.create.topics.enable=false          # не создавать топики автоматически

# Производительность
num.io.threads=8
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

Конфигурация топика

# При создании
kafka-topics.sh --create --topic events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete \
  --config max.message.bytes=10485760

# Изменение
kafka-configs.sh --alter --entity-type topics --entity-name events \
  --add-config retention.ms=259200000    # 3 дня
ПараметрОписаниеПо умолчанию
retention.msВремя хранения сообщений604800000 (7 дней)
retention.bytesЛимит размера партиции-1 (без лимита)
cleanup.policydelete или compactdelete
max.message.bytesМаксимальный размер сообщения1048576 (1MB)
min.insync.replicasМинимум ISR для acks=all1
segment.bytesРазмер лог-сегмента1073741824 (1GB)
compression.typeСжатие: none, gzip, snappy, lz4, zstdproducer

Cleanup policies

Cleanup policies
  • delete — удалять сегменты старше retention.ms или при превышении retention.bytes
  • compact — оставлять только последнее сообщение для каждого ключа (log compaction)

Log compaction: для топиков-снимков (состояние, конфигурация). Сохраняет последнее значение для каждого ключа, удаляя старые. Сообщение с value=null (tombstone) удаляет ключ.

kafka-topics.sh --create --topic user-profiles \
  --partitions 6 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config delete.retention.ms=86400000

Producer

Конфигурация

ПараметрОписаниеРекомендация
bootstrap.serversАдреса брокеровМинимум 2-3 для отказоустойчивости
acksПодтверждение записиall для надёжности
retriesПовторы при ошибке2147483647 (бесконечно)
delivery.timeout.msОбщий таймаут доставки120000 (2 мин)
linger.msЗадержка для батчинга5-100 (больше = больший batch)
batch.sizeРазмер batch в байтах16384-65536
compression.typeСжатиеlz4 или zstd
enable.idempotenceИдемпотентность (дедупликация)true
max.in.flight.requests.per.connectionПараллельные запросы5 (с идемпотентностью)
key.serializerСериализатор ключейStringSerializer / Avro / Protobuf
value.serializerСериализатор значенийStringSerializer / Avro / Protobuf

acks: уровни подтверждения

УровеньОписание
acks=0Не ждать подтверждения (максимальная скорость, возможна потеря)
acks=1Ждать подтверждения от leader-а (компромисс)
acks=allЖдать подтверждения от всех ISR (максимальная надёжность). Работает в связке с min.insync.replicas

Для продакшена: acks=all + min.insync.replicas=2 + replication.factor=3.

Partitioning (выбор партиции)

Стратегии выбора партиции
  1. Явная партиция — producer указывает номер (редко)
  2. По ключу — hash(key) % num_partitions (основной способ)
  3. Round-robin — без ключа, равномерное распределение
  4. Custom partitioner — своя логика

Сообщения с одинаковым ключом всегда попадают в одну партицию → гарантирован порядок для одного ключа.

Влияние ключа на порядок
  • Key = user_id → все события одного пользователя в одной партиции → упорядочены
  • Key = null → round-robin → порядок не гарантирован

Идемпотентный producer

Идемпотентный producer

enable.idempotence=true (по умолчанию с Kafka 3.0)

Каждый producer получает уникальный PID (producer ID). Каждое сообщение получает sequence number. Брокер отклоняет дубликаты по (PID, sequence).

Результат: Exactly-once на уровне producer → partition.

Транзакции (Exactly-Once Semantics)

Транзакции (Exactly-Once Semantics)

transactional.id=my-producer-1

Producer:

  1. beginTransaction()
  2. send(topic1, message1)
  3. send(topic2, message2)
  4. sendOffsetsToTransaction(offsets, consumerGroupId) — для consume-transform-produce
  5. commitTransaction() или abortTransaction()

Все сообщения атомарно: либо все видны consumer-ам, либо ни одного.

Consumer

Конфигурация

ПараметрОписаниеРекомендация
bootstrap.serversАдреса брокеровМинимум 2-3
group.idConsumer groupОбязателен для групповой работы
auto.offset.resetСтратегия при отсутствии offsetearliest или latest
enable.auto.commitАвтоматический commit offsetfalse для at-least-once
auto.commit.interval.msИнтервал автокоммита5000
max.poll.recordsЗаписей за один poll500
max.poll.interval.msМаксимум между poll-ами300000 (5 мин)
session.timeout.msТаймаут сессии (heartbeat)45000
heartbeat.interval.msИнтервал heartbeat3000
fetch.min.bytesМинимум данных для ответа1
fetch.max.wait.msМаксимальное ожидание данных500
isolation.levelread_committed / read_uncommittedread_committed для EOS

auto.offset.reset

ЗначениеОписание
earliestНачать с начала топика (если нет сохранённого offset)
latestНачать с конца (только новые сообщения)
noneОшибка, если нет сохранённого offset

Commit стратегии

Commit стратегии

Auto-commit (enable.auto.commit=true): Фоновый коммит каждые auto.commit.interval.ms. Просто, но может привести к потере сообщений (commit до обработки) или дубликатам (crash до commit).

Manual sync commit: consumer.commitSync() — после успешной обработки batch-а. At-least-once: при crash повторная обработка от последнего commit.

Manual async commit: consumer.commitAsync() — не блокирует. Callback для обработки ошибок.

Commit per-partition: commitSync(Map<TopicPartition, OffsetAndMetadata>) — точный контроль.

Rebalancing

Когда consumer присоединяется/покидает группу, партиции перераспределяются:

Стратегии Rebalancing

Eager (по умолчанию до Kafka 2.4): Отзывает все партиции → перераспределяет. Пауза в обработке на время ребалансировки.

Cooperative/Incremental (рекомендуемый): partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor Перераспределяет только изменившиеся партиции. Минимальная пауза.

Клиенты

Go (segmentio/kafka-go)

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "time"
)

// --- Producer ---
func produce() {
    writer := &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "orders",
        Balancer:     &kafka.Hash{},        // по ключу
        BatchTimeout: 10 * time.Millisecond,
        BatchSize:    100,
        Compression:  kafka.Lz4,
        RequiredAcks: kafka.RequireAll,
    }
    defer writer.Close()

    err := writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("user-42"),
            Value: []byte(`{"order_id": 1, "amount": 99.99}`),
            Headers: []kafka.Header{
                {Key: "source", Value: []byte("api")},
            },
        },
    )
    if err != nil {
        panic(err)
    }
}

// --- Consumer ---
func consume() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        []string{"localhost:9092"},
        Topic:          "orders",
        GroupID:        "order-processor",
        MinBytes:       1,
        MaxBytes:       10e6,               // 10MB
        CommitInterval: 0,                  // manual commit
        StartOffset:    kafka.FirstOffset,  // earliest
    })
    defer reader.Close()

    for {
        msg, err := reader.FetchMessage(context.Background())
        if err != nil {
            panic(err)
        }

        fmt.Printf("partition=%d offset=%d key=%s value=%s\n",
            msg.Partition, msg.Offset, msg.Key, msg.Value)

        // Обработка сообщения...

        // Manual commit (at-least-once)
        if err := reader.CommitMessages(context.Background(), msg); err != nil {
            panic(err)
        }
    }
}

Go (IBM/sarama)

package main

import (
    "fmt"
    "github.com/IBM/sarama"
)

// --- Producer ---
func produce() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Idempotent = true
    config.Producer.Return.Successes = true
    config.Producer.Compression = sarama.CompressionLZ4
    config.Net.MaxOpenRequests = 1

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "orders",
        Key:   sarama.StringEncoder("user-42"),
        Value: sarama.StringEncoder(`{"order_id": 1, "amount": 99.99}`),
    }

    partition, offset, err := producer.SendMessage(msg)
    fmt.Printf("partition=%d offset=%d\n", partition, offset)
}

// --- Consumer Group ---
type handler struct{}

func (h handler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("partition=%d offset=%d key=%s value=%s\n",
            msg.Partition, msg.Offset, msg.Key, msg.Value)

        // Обработка...

        session.MarkMessage(msg, "")     // пометить как обработанное
    }
    return nil
}

Python (confluent-kafka)

from confluent_kafka import Producer, Consumer, KafkaError
import json

# --- Producer ---
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'linger.ms': 10,
    'batch.size': 65536,
})

def delivery_report(err, msg):
    if err:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer.produce(
    'orders',
    key='user-42',
    value=json.dumps({'order_id': 1, 'amount': 99.99}),
    callback=delivery_report,
)
producer.flush()

# --- Consumer ---
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
})
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            raise Exception(msg.error())

        print(f'partition={msg.partition()} offset={msg.offset()} '
              f'key={msg.key()} value={msg.value()}')

        # Обработка...

        consumer.commit(message=msg)     # manual commit
finally:
    consumer.close()

Node.js (kafkajs)

import { Kafka, CompressionTypes } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
});

// --- Producer ---
const producer = kafka.producer({
  idempotent: true,
  maxInFlightRequests: 5,
});
await producer.connect();

await producer.send({
  topic: 'orders',
  compression: CompressionTypes.LZ4,
  messages: [
    {
      key: 'user-42',
      value: JSON.stringify({ order_id: 1, amount: 99.99 }),
      headers: { source: 'api' },
    },
  ],
});

await producer.disconnect();

// --- Consumer ---
const consumer = kafka.consumer({ groupId: 'order-processor' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      partition,
      offset: message.offset,
      key: message.key?.toString(),
      value: message.value?.toString(),
    });
    // Обработка...
    // auto-commit по умолчанию (после eachMessage)
  },
});

Java (Spring Kafka)

// application.yml
// spring:
//   kafka:
//     bootstrap-servers: localhost:9092
//     producer:
//       acks: all
//       key-serializer: org.apache.kafka.common.serialization.StringSerializer
//       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
//     consumer:
//       group-id: order-processor
//       auto-offset-reset: earliest
//       enable-auto-commit: false
//       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

// Producer
@Service
public class OrderProducer {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void send(String userId, OrderEvent event) {
        kafkaTemplate.send("orders", userId, event);
    }
}

// Consumer
@Service
public class OrderConsumer {
    @KafkaListener(topics = "orders", groupId = "order-processor")
    public void consume(@Payload OrderEvent event,
                        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                        @Header(KafkaHeaders.OFFSET) long offset,
                        Acknowledgment ack) {
        // Обработка...
        ack.acknowledge();  // manual commit
    }
}

Schema Registry

Централизованное хранилище схем (Avro, Protobuf, JSON Schema). Гарантирует совместимость producer/consumer.

# compose.yaml (добавить к кластеру)
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.7
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
# Зарегистрировать схему
curl -X POST http://localhost:8081/subjects/orders-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "AVRO",
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"user_id\",\"type\":\"string\"}]}"
  }'

# Список subjects
curl http://localhost:8081/subjects

# Версии схемы
curl http://localhost:8081/subjects/orders-value/versions

# Проверить совместимость
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schemaType": "AVRO", "schema": "..."}'

# Уровни совместимости
# BACKWARD  — новый consumer может читать старые данные (по умолчанию)
# FORWARD   — старый consumer может читать новые данные
# FULL      — оба направления
# NONE      — без проверки
curl -X PUT http://localhost:8081/config/orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

Kafka Connect

Интеграция Kafka с внешними системами без кода:

# compose.yaml
services:
  connect:
    image: confluentinc/cp-kafka-connect:7.7
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# Список доступных коннекторов
curl http://localhost:8083/connector-plugins

# Создать Source-коннектор (PostgreSQL → Kafka)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "pg-source",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "replicator",
      "database.password": "secret",
      "database.dbname": "myapp",
      "topic.prefix": "cdc",
      "table.include.list": "public.orders,public.users",
      "plugin.name": "pgoutput"
    }
  }'
# → топики: cdc.public.orders, cdc.public.users (CDC — Change Data Capture)

# Создать Sink-коннектор (Kafka → Elasticsearch)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "es-sink",
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "topics": "orders",
      "connection.url": "http://elasticsearch:9200",
      "key.ignore": "false",
      "schema.ignore": "true"
    }
  }'

# Статус коннекторов
curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/pg-source/status

# Перезапустить
curl -X POST http://localhost:8083/connectors/pg-source/restart

# Удалить
curl -X DELETE http://localhost:8083/connectors/pg-source

Мониторинг

Ключевые метрики

Ключевые метрики Kafka

Брокер:

  • UnderReplicatedPartitions — партиции с отстающими репликами (должно быть 0)
  • ActiveControllerCount — 1 controller в кластере (ровно 1)
  • OfflinePartitionsCount — недоступные партиции (должно быть 0)
  • BytesInPerSec / BytesOutPerSec — throughput
  • RequestsPerSec — запросы к брокеру
  • LogFlushRateAndTimeMs — скорость записи на диск

Consumer:

  • records-lag-max — максимальное отставание (lag)
  • records-consumed-rate — скорость потребления
  • commit-latency-avg — задержка commit

Producer:

  • record-send-rate — скорость отправки
  • record-error-rate — ошибки отправки
  • request-latency-avg — задержка запроса
  • batch-size-avg — средний размер batch

Проверка через CLI

# Consumer lag (основная метрика здоровья)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor

# Replica status
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --under-replicated-partitions

# Log dirs (диск)
kafka-log-dirs.sh --bootstrap-server localhost:9092 \
  --describe --topic-list orders

Kafka UI

Визуальный интерфейс, проще всего для мониторинга:

# compose.yaml
services:
  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: production
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083

Паттерны

Event Sourcing

Event Sourcing

Хранить не текущее состояние, а все события. Текущее состояние = replay событий.

Topic: account-events (compacted=false, retention=forever)

  • key=account-42 value={"type":"created","balance":0}
  • key=account-42 value={"type":"deposited","amount":100}
  • key=account-42 value={"type":"withdrawn","amount":30}
  • Текущий баланс = 0 + 100 - 30 = 70

Materialized view (consumer): Слушает account-events → обновляет состояние в PostgreSQL/Redis

CQRS (Command Query Responsibility Segregation)

CQRS (Command Query Responsibility Segregation)

Write path: API → Command → Kafka topic → DB

Read path: API → Read model (materialized view, другая БД)

Topic: orders (source of truth)

  • Consumer 1 → PostgreSQL (для OLTP-запросов)
  • Consumer 2 → Elasticsearch (для поиска)
  • Consumer 3 → ClickHouse (для аналитики)
  • Consumer 4 → Redis (для кеша)

Saga (распределённые транзакции)

Saga (распределённые транзакции)

Оркестрация через топики:

  • order-commands → Order Service → order-events
  • payment-commands → Payment Service → payment-events
  • shipping-commands → Shipping Service → shipping-events

Saga orchestrator:

  1. Send “create-order” → order-commands
  2. Listen order-events → “order-created”
  3. Send “process-payment” → payment-commands
  4. Listen payment-events → “payment-succeeded” или “payment-failed”
  5. Если failed → Send “cancel-order” → order-commands (компенсация)
  6. Если succeeded → Send “ship-order” → shipping-commands

Dead Letter Queue (DLQ)

Dead Letter Queue (DLQ)

Consumer не может обработать сообщение → не терять, а отправить в DLQ.

Topic: orders → Consumer → обработка → (ошибка после N попыток) → Topic: orders.dlq → ручной разбор или автоповтор

# Пример: retry с DLQ
MAX_RETRIES = 3

for msg in consumer:
    retries = int(msg.headers.get('retries', 0))
    try:
        process(msg)
        consumer.commit(msg)
    except Exception as e:
        if retries >= MAX_RETRIES:
            producer.produce('orders.dlq', key=msg.key, value=msg.value,
                           headers={'error': str(e), 'original_topic': 'orders'})
        else:
            producer.produce('orders.retry', key=msg.key, value=msg.value,
                           headers={'retries': str(retries + 1)})
        consumer.commit(msg)

Outbox Pattern

Outbox Pattern

Проблема: нужно атомарно записать в БД И отправить в Kafka.

Решение: писать событие в outbox-таблицу в той же транзакции.

  1. BEGININSERT INTO orders (...)INSERT INTO outbox (topic, key, value) VALUES ('orders', 'user-42', '{"order_id":1}')COMMIT
  2. Outbox publisher (или Debezium CDC) читает outbox → публикует в Kafka → помечает отправленным.

Consume-Transform-Produce

Consume-Transform-Produce

Прочитать из одного топика → трансформировать → записать в другой. С Exactly-Once Semantics:

  1. consumer.poll()transform(message)
  2. producer.send(output_topic)
  3. producer.sendOffsetsToTransaction(consumer_offsets)
  4. producer.commitTransaction()

Все операции в одной Kafka-транзакции.

Выбор количества партиций

Выбор количества партиций

Правило: partitions >= max(expected_consumers)

Факторы:

  • Throughput — больше партиций → больше параллелизм → выше throughput
  • Ordering — порядок гарантирован ТОЛЬКО внутри одной партиции
  • Consumers — consumer-ов в группе не может быть больше, чем партиций
  • Overhead — каждая партиция = файлы на диске, память, открытые дескрипторы
  • Rebalancing — больше партиций → дольше rebalancing

Рекомендации:

  • Маленький проект: 3-6 партиций
  • Средний: 6-12 партиций
  • Высокая нагрузка: 12-50 партиций
  • Экстремальная: 50-200 (редко нужно)

Партиции можно увеличить, но НЕ уменьшить! При увеличении — ключи перераспределяются (hash(key) % new_partitions).

Частые проблемы

Consumer lag растёт:

# Проверить lag
kafka-consumer-groups.sh --describe --group my-group \
  --bootstrap-server localhost:9092

Решения:

  • Увеличить количество consumer-ов (до числа партиций)
  • Увеличить max.poll.records
  • Оптимизировать обработку сообщений (async, batch)
  • Увеличить количество партиций + consumer-ов
  • Профилировать consumer, найти bottleneck

Rebalancing слишком частый:

Частый Rebalancing: причины и решения

Симптомы: consumer-ы постоянно теряют/получают партиции.

Причины:

  1. max.poll.interval.ms слишком маленький — обработка не успевает
  2. session.timeout.ms слишком маленький — heartbeat не доходит
  3. Consumer-ы часто падают/перезапускаются

Решения:

  • max.poll.interval.ms=600000 (10 минут)
  • session.timeout.ms=45000 (45 секунд)
  • heartbeat.interval.ms=3000 (3 секунды)
  • partition.assignment.strategy=CooperativeStickyAssignor

Сообщения теряются:

Сообщения теряются: чеклист
  • Producer: acks=all? retries > 0? delivery.timeout.ms достаточный?
  • Broker: min.insync.replicas=2? replication.factor=3?
  • Consumer: enable.auto.commit=false? commit после обработки?
  • Retention: retention.ms не слишком маленький?

Сообщения дублируются:

Сообщения дублируются: причины и решения

Причина: consumer упал после обработки, но до commit offset.

Решения:

  1. Идемпотентная обработка (INSERT ON CONFLICT, upsert)
  2. Дедупликация по уникальному ID сообщения
  3. Exactly-once semantics (транзакции)
  4. Хранить offset в той же БД что и результат (atomic commit)

Порядок сообщений нарушен:

Порядок сообщений нарушен

Порядок гарантирован ТОЛЬКО внутри одной партиции.

Решения:

  1. Использовать ключ сообщения — все сообщения одного entity в одной партиции
  2. max.in.flight.requests.per.connection=1 (без идемпотентности) или =5 с enable.idempotence=true
  3. Одна партиция (жертвуем throughput)

Диск заканчивается:

# Проверить размер топиков
kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe

# Уменьшить retention
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
  --add-config retention.ms=86400000     # 1 день

# Удалить старые сегменты
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
  --add-config retention.bytes=10737418240  # 10GB на партицию

# Включить сжатие (если не включено)
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
  --add-config compression.type=zstd

UnderReplicatedPartitions > 0:

UnderReplicatedPartitions > 0

Реплики отстают от лидера.

Причины:

  1. Брокер перегружен (диск, сеть, CPU)
  2. Брокер недоступен
  3. Сеть между брокерами

Проверить: kafka-topics.sh --describe --under-replicated-partitions — посмотреть, какой брокер является follower для проблемных партиций.