Kafka: распределённый брокер сообщений

Концепции

Producer → Topic (partitions) → Consumer Group (consumers)
              ↓
         Broker Cluster (хранение на диске, репликация)

Topic: именованный канал сообщений. Аналог таблицы в БД. Сообщения в топике хранятся на диске, не удаляются при чтении.

Partition: физическое разделение топика. Каждая партиция представляет собой упорядоченный, неизменяемый лог (append-only). Внутри партиции сообщения имеют уникальный offset (порядковый номер). Партиции распределяются по брокерам.

Offset: порядковый номер сообщения внутри партиции. Монотонно растёт. Consumer запоминает свой offset, то есть «где остановился». Offset 0 = начало, offset -1 = конец.

Broker: один сервер Kafka. Хранит партиции, обслуживает клиентов. Кластер = несколько брокеров.

Producer: публикует сообщения в топик. Выбирает партицию по ключу сообщения (или round-robin).

Consumer: читает сообщения из топика. Хранит свой offset (позицию чтения).

Consumer Group: группа consumer-ов, читающих один топик совместно. Каждая партиция назначается ровно одному consumer-у в группе. Если consumer-ов больше, чем партиций, лишние простаивают.

Replication Factor: количество копий каждой партиции. RF=3 → данные на 3 брокерах. Leader обслуживает чтение/запись, follower-ы реплицируют.

ISR (In-Sync Replicas): реплики, не отстающие от лидера. Сообщение считается committed, когда записано во все ISR.

Topic: orders (3 партиции, RF=2)
──────────────────────────────────────
Partition 0: [msg0, msg1, msg2, msg3, ...]  → Broker 1 (leader), Broker 2 (follower)
Partition 1: [msg0, msg1, msg2, ...]        → Broker 2 (leader), Broker 3 (follower)
Partition 2: [msg0, msg1, ...]              → Broker 3 (leader), Broker 1 (follower)

Consumer Group: order-service (3 consumer-а)
  consumer-1 ← Partition 0
  consumer-2 ← Partition 1
  consumer-3 ← Partition 2

Гарантии доставки

At-most-once   — сообщение может быть потеряно, но не дублировано
                 (commit offset до обработки)
At-least-once  — сообщение доставлено хотя бы раз, возможны дубликаты (по умолчанию)
                 (commit offset после обработки)
Exactly-once   — ровно один раз (idempotent producer + transactions)
                 (самый медленный, включается явно)

Kafka vs очереди (RabbitMQ, Redis)

Традиционная очередь              Kafka
────────────────────────          ────────────────────────
Сообщение удаляется при чтении    Сообщение хранится до retention
Один потребитель на сообщение     Множество consumer group читают независимо
Push-модель (брокер → consumer)   Pull-модель (consumer → брокер)
Сложная маршрутизация             Простая модель (topic + partition)
Малые объёмы, low latency         Огромные объёмы, высокий throughput

Установка

Docker Compose (KRaft, без ZooKeeper)

# compose.yaml
services:
  kafka:
    image: apache/kafka:3.9
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      KAFKA_LOG_RETENTION_BYTES: 1073741824
    volumes:
      - kafka_data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
    depends_on:
      - kafka

volumes:
  kafka_data:

Кластер из 3 брокеров

# compose.yaml
services:
  kafka-1:
    image: apache/kafka:3.9
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 3
      KAFKA_MIN_INSYNC_REPLICAS: 2
    ports:
      - "9092:9092"
    volumes:
      - kafka1_data:/var/lib/kafka/data

  kafka-2:
    image: apache/kafka:3.9
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,EXTERNAL://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    ports:
      - "9093:9093"
    volumes:
      - kafka2_data:/var/lib/kafka/data

  kafka-3:
    image: apache/kafka:3.9
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-3:29092,EXTERNAL://localhost:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    ports:
      - "9094:9094"
    volumes:
      - kafka3_data:/var/lib/kafka/data

volumes:
  kafka1_data:
  kafka2_data:
  kafka3_data:

CLI: kafka-* утилиты

# Если из Docker:
docker exec -it kafka bash
# Утилиты в /opt/kafka/bin/

# Или через kcat (kafkacat) — удобнее для разработки
# Arch: sudo pacman -S kcat
# macOS: brew install kcat
# Ubuntu: sudo apt install kafkacat

Топики

# Создать топик
kafka-topics.sh --bootstrap-server localhost:9092 \
  --create --topic orders \
  --partitions 6 \
  --replication-factor 3

# Список топиков
kafka-topics.sh --bootstrap-server localhost:9092 --list

# Описание топика
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic orders

# Увеличить партиции (уменьшить нельзя!)
kafka-topics.sh --bootstrap-server localhost:9092 \
  --alter --topic orders --partitions 12

# Удалить топик
kafka-topics.sh --bootstrap-server localhost:9092 \
  --delete --topic orders

# Изменить конфигурацию топика
kafka-configs.sh --bootstrap-server localhost:9092 \
  --alter --entity-type topics --entity-name orders \
  --add-config retention.ms=604800000        # retention 7 дней

# Посмотреть конфигурацию
kafka-configs.sh --bootstrap-server localhost:9092 \
  --describe --entity-type topics --entity-name orders

Producer (CLI)

# Простая отправка (stdin → Kafka)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders
> {"order_id": 1, "amount": 99.99}
> {"order_id": 2, "amount": 49.99}
> ^C

# С ключом (key:value)
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders \
  --property parse.key=true \
  --property key.separator=:
> user42:{"order_id": 1, "amount": 99.99}
> user43:{"order_id": 2, "amount": 49.99}

# Из файла
kafka-console-producer.sh --bootstrap-server localhost:9092 \
  --topic orders < messages.txt

# kcat (проще)
echo '{"order_id": 1}' | kcat -b localhost:9092 -t orders -P
kcat -b localhost:9092 -t orders -P -K: < messages.txt   # key:value

Consumer (CLI)

# Читать новые сообщения
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders

# С начала
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders --from-beginning

# С ключами и метаданными
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders --from-beginning \
  --property print.key=true \
  --property print.timestamp=true \
  --property print.partition=true \
  --property print.offset=true

# В составе consumer group
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic orders --group order-processor

# kcat (проще)
kcat -b localhost:9092 -t orders -C                       # новые
kcat -b localhost:9092 -t orders -C -o beginning          # с начала
kcat -b localhost:9092 -t orders -C -o end                # только новые
kcat -b localhost:9092 -t orders -C -o -10                # последние 10
kcat -b localhost:9092 -t orders -C -p 0 -o 100           # partition 0, offset 100
kcat -b localhost:9092 -t orders -C -f '%T %k %s\n'       # формат: timestamp key value

# Метаданные кластера
kcat -b localhost:9092 -L                                 # список топиков и брокеров
kcat -b localhost:9092 -L -t orders                       # детали по топику

Consumer Groups

# Список consumer groups
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# Описание группы (offset, lag)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor

# Вывод:
# GROUP         TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-proc    orders   0          1000            1050            50
# order-proc    orders   1          2000            2000            0
#                                                             LAG = отставание

# Сбросить offset (все consumer-ы в группе должны быть остановлены!)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --to-earliest --execute

# Сброс на конкретный offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --to-offset 1000 --execute

# Сброс по времени
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --to-datetime 2025-03-15T00:00:00.000 --execute

# Сдвиг offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group order-processor --topic orders \
  --reset-offsets --shift-by -100 --execute

# Удалить группу
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --delete --group order-processor

Конфигурация

Брокер (server.properties / переменные окружения)

# Идентификация
node.id=1
process.roles=broker,controller

# Сеть
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-host:9092

# Логи (хранилище сообщений)
log.dirs=/var/lib/kafka/data
log.retention.hours=168                  # хранить 7 дней (по умолчанию)
log.retention.bytes=-1                   # без лимита по размеру (-1)
log.segment.bytes=1073741824             # размер сегмента 1GB
log.cleanup.policy=delete                # delete или compact

# Репликация
default.replication.factor=3
min.insync.replicas=2                    # минимум ISR для acks=all
unclean.leader.election.enable=false     # не выбирать не-ISR реплику лидером

# Топики
num.partitions=6                         # партиции по умолчанию
auto.create.topics.enable=false          # не создавать топики автоматически

# Производительность
num.io.threads=8
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

Конфигурация топика

# При создании
kafka-topics.sh --create --topic events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete \
  --config max.message.bytes=10485760

# Изменение
kafka-configs.sh --alter --entity-type topics --entity-name events \
  --add-config retention.ms=259200000    # 3 дня
ПараметрОписаниеПо умолчанию
retention.msВремя хранения сообщений604800000 (7 дней)
retention.bytesЛимит размера партиции-1 (без лимита)
cleanup.policydelete или compactdelete
max.message.bytesМаксимальный размер сообщения1048576 (1MB)
min.insync.replicasМинимум ISR для acks=all1
segment.bytesРазмер лог-сегмента1073741824 (1GB)
compression.typeСжатие: none, gzip, snappy, lz4, zstdproducer

Cleanup policies

delete  — удалять сегменты старше retention.ms или при превышении retention.bytes
compact — оставлять только последнее сообщение для каждого ключа (log compaction)

Log compaction: для топиков-снимков (состояние, конфигурация). Сохраняет последнее значение для каждого ключа, удаляя старые. Сообщение с value=null (tombstone) удаляет ключ.

kafka-topics.sh --create --topic user-profiles \
  --partitions 6 \
  --config cleanup.policy=compact \
  --config min.cleanable.dirty.ratio=0.5 \
  --config delete.retention.ms=86400000

Producer

Конфигурация

ПараметрОписаниеРекомендация
bootstrap.serversАдреса брокеровМинимум 2-3 для отказоустойчивости
acksПодтверждение записиall для надёжности
retriesПовторы при ошибке2147483647 (бесконечно)
delivery.timeout.msОбщий таймаут доставки120000 (2 мин)
linger.msЗадержка для батчинга5-100 (больше = больший batch)
batch.sizeРазмер batch в байтах16384-65536
compression.typeСжатиеlz4 или zstd
enable.idempotenceИдемпотентность (дедупликация)true
max.in.flight.requests.per.connectionПараллельные запросы5 (с идемпотентностью)
key.serializerСериализатор ключейStringSerializer / Avro / Protobuf
value.serializerСериализатор значенийStringSerializer / Avro / Protobuf

acks: уровни подтверждения

acks=0    — не ждать подтверждения (максимальная скорость, возможна потеря)
acks=1    — ждать подтверждения от leader-а (компромисс)
acks=all  — ждать подтверждения от всех ISR (максимальная надёжность)
            Работает в связке с min.insync.replicas

Для продакшена: acks=all + min.insync.replicas=2 + replication.factor=3.

Partitioning (выбор партиции)

1. Явная партиция   — producer указывает номер (редко)
2. По ключу          — hash(key) % num_partitions (основной способ)
3. Round-robin       — без ключа, равномерное распределение
4. Custom partitioner — своя логика

Сообщения с одинаковым ключом всегда попадают в одну партицию → гарантирован порядок для одного ключа.

Key = user_id → все события одного пользователя в одной партиции → упорядочены
Key = null    → round-robin → порядок не гарантирован

Идемпотентный producer

enable.idempotence=true (по умолчанию с Kafka 3.0)

Каждый producer получает уникальный PID (producer ID).
Каждое сообщение — sequence number.
Брокер отклоняет дубликаты по (PID, sequence).
→ Exactly-once на уровне producer → partition.

Транзакции (Exactly-Once Semantics)

transactional.id=my-producer-1

Producer:
  beginTransaction()
  send(topic1, message1)
  send(topic2, message2)
  sendOffsetsToTransaction(offsets, consumerGroupId)  // для consume-transform-produce
  commitTransaction()   // или abortTransaction()

→ Все сообщения атомарно: либо все видны consumer-ам, либо ни одного.

Consumer

Конфигурация

ПараметрОписаниеРекомендация
bootstrap.serversАдреса брокеровМинимум 2-3
group.idConsumer groupОбязателен для групповой работы
auto.offset.resetСтратегия при отсутствии offsetearliest или latest
enable.auto.commitАвтоматический commit offsetfalse для at-least-once
auto.commit.interval.msИнтервал автокоммита5000
max.poll.recordsЗаписей за один poll500
max.poll.interval.msМаксимум между poll-ами300000 (5 мин)
session.timeout.msТаймаут сессии (heartbeat)45000
heartbeat.interval.msИнтервал heartbeat3000
fetch.min.bytesМинимум данных для ответа1
fetch.max.wait.msМаксимальное ожидание данных500
isolation.levelread_committed / read_uncommittedread_committed для EOS

auto.offset.reset

earliest  — начать с начала топика (если нет сохранённого offset)
latest    — начать с конца (только новые сообщения)
none      — ошибка, если нет сохранённого offset

Commit стратегии

Auto-commit (enable.auto.commit=true):
  Фоновый коммит каждые auto.commit.interval.ms.
  Просто, но может привести к потере сообщений (commit до обработки)
  или дубликатам (crash до commit).

Manual sync commit:
  consumer.commitSync() — после успешной обработки batch-а.
  At-least-once: при crash повторная обработка от последнего commit.

Manual async commit:
  consumer.commitAsync() — не блокирует.
  Callback для обработки ошибок.

Commit per-partition:
  commitSync(Map<TopicPartition, OffsetAndMetadata>) — точный контроль.

Rebalancing

Когда consumer присоединяется/покидает группу, партиции перераспределяются:

Eager (по умолчанию до Kafka 2.4):
  Отзывает все партиции → перераспределяет.
  Пауза в обработке на время ребалансировки.

Cooperative/Incremental (рекомендуемый):
  partition.assignment.strategy=
    org.apache.kafka.clients.consumer.CooperativeStickyAssignor
  Перераспределяет только изменившиеся партиции.
  Минимальная пауза.

Клиенты

Go (segmentio/kafka-go)

package main

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "time"
)

// --- Producer ---
func produce() {
    writer := &kafka.Writer{
        Addr:         kafka.TCP("localhost:9092"),
        Topic:        "orders",
        Balancer:     &kafka.Hash{},        // по ключу
        BatchTimeout: 10 * time.Millisecond,
        BatchSize:    100,
        Compression:  kafka.Lz4,
        RequiredAcks: kafka.RequireAll,
    }
    defer writer.Close()

    err := writer.WriteMessages(context.Background(),
        kafka.Message{
            Key:   []byte("user-42"),
            Value: []byte(`{"order_id": 1, "amount": 99.99}`),
            Headers: []kafka.Header{
                {Key: "source", Value: []byte("api")},
            },
        },
    )
    if err != nil {
        panic(err)
    }
}

// --- Consumer ---
func consume() {
    reader := kafka.NewReader(kafka.ReaderConfig{
        Brokers:        []string{"localhost:9092"},
        Topic:          "orders",
        GroupID:        "order-processor",
        MinBytes:       1,
        MaxBytes:       10e6,               // 10MB
        CommitInterval: 0,                  // manual commit
        StartOffset:    kafka.FirstOffset,  // earliest
    })
    defer reader.Close()

    for {
        msg, err := reader.FetchMessage(context.Background())
        if err != nil {
            panic(err)
        }

        fmt.Printf("partition=%d offset=%d key=%s value=%s\n",
            msg.Partition, msg.Offset, msg.Key, msg.Value)

        // Обработка сообщения...

        // Manual commit (at-least-once)
        if err := reader.CommitMessages(context.Background(), msg); err != nil {
            panic(err)
        }
    }
}

Go (IBM/sarama)

package main

import (
    "fmt"
    "github.com/IBM/sarama"
)

// --- Producer ---
func produce() {
    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Idempotent = true
    config.Producer.Return.Successes = true
    config.Producer.Compression = sarama.CompressionLZ4
    config.Net.MaxOpenRequests = 1

    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    msg := &sarama.ProducerMessage{
        Topic: "orders",
        Key:   sarama.StringEncoder("user-42"),
        Value: sarama.StringEncoder(`{"order_id": 1, "amount": 99.99}`),
    }

    partition, offset, err := producer.SendMessage(msg)
    fmt.Printf("partition=%d offset=%d\n", partition, offset)
}

// --- Consumer Group ---
type handler struct{}

func (h handler) Setup(_ sarama.ConsumerGroupSession) error   { return nil }
func (h handler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }

func (h handler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("partition=%d offset=%d key=%s value=%s\n",
            msg.Partition, msg.Offset, msg.Key, msg.Value)

        // Обработка...

        session.MarkMessage(msg, "")     // пометить как обработанное
    }
    return nil
}

Python (confluent-kafka)

from confluent_kafka import Producer, Consumer, KafkaError
import json

# --- Producer ---
producer = Producer({
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',
    'enable.idempotence': True,
    'compression.type': 'lz4',
    'linger.ms': 10,
    'batch.size': 65536,
})

def delivery_report(err, msg):
    if err:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer.produce(
    'orders',
    key='user-42',
    value=json.dumps({'order_id': 1, 'amount': 99.99}),
    callback=delivery_report,
)
producer.flush()

# --- Consumer ---
consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'order-processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
})
consumer.subscribe(['orders'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            raise Exception(msg.error())

        print(f'partition={msg.partition()} offset={msg.offset()} '
              f'key={msg.key()} value={msg.value()}')

        # Обработка...

        consumer.commit(message=msg)     # manual commit
finally:
    consumer.close()

Node.js (kafkajs)

import { Kafka, CompressionTypes } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
});

// --- Producer ---
const producer = kafka.producer({
  idempotent: true,
  maxInFlightRequests: 5,
});
await producer.connect();

await producer.send({
  topic: 'orders',
  compression: CompressionTypes.LZ4,
  messages: [
    {
      key: 'user-42',
      value: JSON.stringify({ order_id: 1, amount: 99.99 }),
      headers: { source: 'api' },
    },
  ],
});

await producer.disconnect();

// --- Consumer ---
const consumer = kafka.consumer({ groupId: 'order-processor' });
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: true });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    console.log({
      partition,
      offset: message.offset,
      key: message.key?.toString(),
      value: message.value?.toString(),
    });
    // Обработка...
    // auto-commit по умолчанию (после eachMessage)
  },
});

Java (Spring Kafka)

// application.yml
// spring:
//   kafka:
//     bootstrap-servers: localhost:9092
//     producer:
//       acks: all
//       key-serializer: org.apache.kafka.common.serialization.StringSerializer
//       value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
//     consumer:
//       group-id: order-processor
//       auto-offset-reset: earliest
//       enable-auto-commit: false
//       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
//       value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

// Producer
@Service
public class OrderProducer {
    private final KafkaTemplate<String, OrderEvent> kafkaTemplate;

    public void send(String userId, OrderEvent event) {
        kafkaTemplate.send("orders", userId, event);
    }
}

// Consumer
@Service
public class OrderConsumer {
    @KafkaListener(topics = "orders", groupId = "order-processor")
    public void consume(@Payload OrderEvent event,
                        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
                        @Header(KafkaHeaders.OFFSET) long offset,
                        Acknowledgment ack) {
        // Обработка...
        ack.acknowledge();  // manual commit
    }
}

Schema Registry

Централизованное хранилище схем (Avro, Protobuf, JSON Schema). Гарантирует совместимость producer/consumer.

# compose.yaml (добавить к кластеру)
services:
  schema-registry:
    image: confluentinc/cp-schema-registry:7.7
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
# Зарегистрировать схему
curl -X POST http://localhost:8081/subjects/orders-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "AVRO",
    "schema": "{\"type\":\"record\",\"name\":\"Order\",\"fields\":[{\"name\":\"order_id\",\"type\":\"long\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"user_id\",\"type\":\"string\"}]}"
  }'

# Список subjects
curl http://localhost:8081/subjects

# Версии схемы
curl http://localhost:8081/subjects/orders-value/versions

# Проверить совместимость
curl -X POST http://localhost:8081/compatibility/subjects/orders-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schemaType": "AVRO", "schema": "..."}'

# Уровни совместимости
# BACKWARD  — новый consumer может читать старые данные (по умолчанию)
# FORWARD   — старый consumer может читать новые данные
# FULL      — оба направления
# NONE      — без проверки
curl -X PUT http://localhost:8081/config/orders-value \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"compatibility": "FULL"}'

Kafka Connect

Интеграция Kafka с внешними системами без кода:

# compose.yaml
services:
  connect:
    image: confluentinc/cp-kafka-connect:7.7
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:29092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# Список доступных коннекторов
curl http://localhost:8083/connector-plugins

# Создать Source-коннектор (PostgreSQL → Kafka)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "pg-source",
    "config": {
      "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
      "database.hostname": "postgres",
      "database.port": "5432",
      "database.user": "replicator",
      "database.password": "secret",
      "database.dbname": "myapp",
      "topic.prefix": "cdc",
      "table.include.list": "public.orders,public.users",
      "plugin.name": "pgoutput"
    }
  }'
# → топики: cdc.public.orders, cdc.public.users (CDC — Change Data Capture)

# Создать Sink-коннектор (Kafka → Elasticsearch)
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "es-sink",
    "config": {
      "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "topics": "orders",
      "connection.url": "http://elasticsearch:9200",
      "key.ignore": "false",
      "schema.ignore": "true"
    }
  }'

# Статус коннекторов
curl http://localhost:8083/connectors
curl http://localhost:8083/connectors/pg-source/status

# Перезапустить
curl -X POST http://localhost:8083/connectors/pg-source/restart

# Удалить
curl -X DELETE http://localhost:8083/connectors/pg-source

Мониторинг

Ключевые метрики

Брокер:
  UnderReplicatedPartitions    — партиции с отстающими репликами (должно быть 0)
  ActiveControllerCount        — 1 controller в кластере (ровно 1)
  OfflinePartitionsCount       — недоступные партиции (должно быть 0)
  BytesInPerSec / BytesOutPerSec — throughput
  RequestsPerSec               — запросы к брокеру
  LogFlushRateAndTimeMs        — скорость записи на диск

Consumer:
  records-lag-max              — максимальное отставание (lag)
  records-consumed-rate        — скорость потребления
  commit-latency-avg           — задержка commit

Producer:
  record-send-rate             — скорость отправки
  record-error-rate            — ошибки отправки
  request-latency-avg          — задержка запроса
  batch-size-avg               — средний размер batch

Проверка через CLI

# Consumer lag (основная метрика здоровья)
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-processor

# Replica status
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --under-replicated-partitions

# Log dirs (диск)
kafka-log-dirs.sh --bootstrap-server localhost:9092 \
  --describe --topic-list orders

Kafka UI

Визуальный интерфейс, проще всего для мониторинга:

# compose.yaml
services:
  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: production
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: connect
      KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://connect:8083

Паттерны

Event Sourcing

Хранить не текущее состояние, а все события. Текущее состояние = replay событий.

Topic: account-events (compacted=false, retention=forever)
  key=account-42  value={"type":"created","balance":0}
  key=account-42  value={"type":"deposited","amount":100}
  key=account-42  value={"type":"withdrawn","amount":30}
  → текущий баланс = 0 + 100 - 30 = 70

Materialized view (consumer):
  Слушает account-events → обновляет состояние в PostgreSQL/Redis

CQRS (Command Query Responsibility Segregation)

Write path: API → Command → Kafka topic → DB
Read path:  API → Read model (materialized view, другая БД)

Topic: orders (source of truth)
Consumer 1: → PostgreSQL (для OLTP-запросов)
Consumer 2: → Elasticsearch (для поиска)
Consumer 3: → ClickHouse (для аналитики)
Consumer 4: → Redis (для кеша)

Saga (распределённые транзакции)

Оркестрация через топики:

Topic: order-commands     → Order Service → Topic: order-events
Topic: payment-commands   → Payment Service → Topic: payment-events
Topic: shipping-commands  → Shipping Service → Topic: shipping-events

Saga orchestrator:
  1. Send "create-order" → order-commands
  2. Listen order-events → "order-created"
  3. Send "process-payment" → payment-commands
  4. Listen payment-events → "payment-succeeded" или "payment-failed"
  5. Если failed → Send "cancel-order" → order-commands (компенсация)
  6. Если succeeded → Send "ship-order" → shipping-commands

Dead Letter Queue (DLQ)

Consumer не может обработать сообщение → не терять, а отправить в DLQ.

Topic: orders → Consumer → обработка
                  ↓ (ошибка после N попыток)
              Topic: orders.dlq → ручной разбор или автоповтор
# Пример: retry с DLQ
MAX_RETRIES = 3

for msg in consumer:
    retries = int(msg.headers.get('retries', 0))
    try:
        process(msg)
        consumer.commit(msg)
    except Exception as e:
        if retries >= MAX_RETRIES:
            producer.produce('orders.dlq', key=msg.key, value=msg.value,
                           headers={'error': str(e), 'original_topic': 'orders'})
        else:
            producer.produce('orders.retry', key=msg.key, value=msg.value,
                           headers={'retries': str(retries + 1)})
        consumer.commit(msg)

Outbox Pattern

Проблема: нужно атомарно записать в БД И отправить в Kafka.
Решение: писать событие в outbox-таблицу в той же транзакции.

1. BEGIN
   INSERT INTO orders (...) VALUES (...);
   INSERT INTO outbox (topic, key, value) VALUES ('orders', 'user-42', '{"order_id":1}');
   COMMIT;

2. Outbox publisher (или Debezium CDC) читает outbox → публикует в Kafka → помечает отправленным.

Consume-Transform-Produce

Прочитать из одного топика → трансформировать → записать в другой.
С Exactly-Once Semantics:

consumer.poll() → transform(message) → producer.send(output_topic)
                                       → producer.sendOffsetsToTransaction(consumer_offsets)
                                       → producer.commitTransaction()

Все три операции в одной Kafka-транзакции.

Выбор количества партиций

Правило: partitions >= max(expected_consumers)

Факторы:
  Throughput:   больше партиций → больше параллелизм → выше throughput
  Ordering:     порядок гарантирован ТОЛЬКО внутри одной партиции
  Consumers:    consumer-ов в группе не может быть больше, чем партиций
  Overhead:     каждая партиция = файлы на диске, память, открытые дескрипторы
  Rebalancing:  больше партиций → дольше rebalancing

Рекомендации:
  Маленький проект: 3-6 партиций
  Средний: 6-12 партиций
  Высокая нагрузка: 12-50 партиций
  Экстремальная: 50-200 (редко нужно)

Партиции можно увеличить, но НЕ уменьшить!
При увеличении — ключи перераспределяются (hash(key) % new_partitions).

Частые проблемы

Consumer lag растёт:

# Проверить lag
kafka-consumer-groups.sh --describe --group my-group \
  --bootstrap-server localhost:9092

Решения:

  • Увеличить количество consumer-ов (до числа партиций)
  • Увеличить max.poll.records
  • Оптимизировать обработку сообщений (async, batch)
  • Увеличить количество партиций + consumer-ов
  • Профилировать consumer, найти bottleneck

Rebalancing слишком частый:

Симптомы: consumer-ы постоянно теряют/получают партиции.

Причины:
  1. max.poll.interval.ms слишком маленький — обработка не успевает
  2. session.timeout.ms слишком маленький — heartbeat не доходит
  3. Consumer-ы часто падают/перезапускаются

Решения:
  max.poll.interval.ms=600000             # 10 минут
  session.timeout.ms=45000                # 45 секунд
  heartbeat.interval.ms=3000              # 3 секунды
  partition.assignment.strategy=CooperativeStickyAssignor

Сообщения теряются:

Проверить:
  Producer: acks=all? retries > 0? delivery.timeout.ms достаточный?
  Broker: min.insync.replicas=2? replication.factor=3?
  Consumer: enable.auto.commit=false? commit после обработки?
  Retention: retention.ms не слишком маленький?

Сообщения дублируются:

Причина: consumer упал после обработки, но до commit offset.
Решения:
  1. Идемпотентная обработка (INSERT ON CONFLICT, upsert)
  2. Дедупликация по уникальному ID сообщения
  3. Exactly-once semantics (транзакции)
  4. Хранить offset в той же БД что и результат (atomic commit)

Порядок сообщений нарушен:

Порядок гарантирован ТОЛЬКО внутри одной партиции.

Решения:
  1. Использовать ключ сообщения — все сообщения одного entity в одной партиции
  2. max.in.flight.requests.per.connection=1 (без идемпотентности)
     или =5 с enable.idempotence=true
  3. Одна партиция (жертвуем throughput)

Диск заканчивается:

# Проверить размер топиков
kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe

# Уменьшить retention
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
  --add-config retention.ms=86400000     # 1 день

# Удалить старые сегменты
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
  --add-config retention.bytes=10737418240  # 10GB на партицию

# Включить сжатие (если не включено)
kafka-configs.sh --alter --entity-type topics --entity-name big-topic \
  --add-config compression.type=zstd

UnderReplicatedPartitions > 0:

Реплики отстают от лидера.

Причины:
  1. Брокер перегружен (диск, сеть, CPU)
  2. Брокер недоступен
  3. Сеть между брокерами

Проверить:
  kafka-topics.sh --describe --under-replicated-partitions
  # Посмотреть, какой брокер — follower для проблемных партиций