Пакетное чтение и фиксация смещения в Kafka (golang sarama)
В Kafka при использовании потребителя нужно явно помечать сообщения как обработанные (committed). Это называется фиксацией смещения (offset commit). В контексте пакетной обработки сообщений это можно делать одним из двух способов:
1. Синхронное подтверждение (CommitSync)
Фиксирует обработанные смещения сразу после обработки всего пакета. Это безопасный, но относительно медленный подход, поскольку требует синхронного взаимодействия с Kafka.
2. Асинхронное подтверждение (CommitAsync)
Фиксирует смещения в фоновом режиме. Это быстрее, но менее надежно, так как возможна потеря смещений в случае сбоя.
Ниже показано, как можно реализовать пакетную фиксацию смещений в Kafka с использованием библиотеки Sarama.
Пример с пакетной фиксацией смещений
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
package main import ( "context" "fmt" "log" "os" "os/signal" "strings" "time" "github.com/Shopify/sarama" ) // ConsumerGroupHandler обрабатывает сообщения из ConsumerGroup type ConsumerGroupHandler struct{} // Setup вызывается перед началом работы с ConsumerGroup func (h *ConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { fmt.Println("Consumer group setup completed") return nil } // Cleanup вызывается перед завершением работы с ConsumerGroup func (h *ConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { fmt.Println("Consumer group cleanup completed") return nil } // ConsumeClaim обрабатывает сообщения пакетами func (h *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { // Буфер для пакетного чтения var batch []*sarama.ConsumerMessage batchSize := 10 // Размер пакета flushInterval := 5 * time.Second // Максимальное время ожидания для заполнения пакета timer := time.NewTimer(flushInterval) // Функция для обработки пакета processBatch := func() { if len(batch) > 0 { fmt.Printf("Processing batch of %d messages\n", len(batch)) for _, message := range batch { fmt.Printf("Message topic:%s partition:%d offset:%d key:%s value:%s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value)) } // Помечаем смещения для пакета как прочитанные session.MarkOffset(batch[len(batch)-1].Topic, batch[len(batch)-1].Partition, batch[len(batch)-1].Offset+1, "") batch = nil // Очищаем пакет после обработки } timer.Reset(flushInterval) // Сбрасываем таймер } for { select { case message := <-claim.Messages(): batch = append(batch, message) if len(batch) >= batchSize { processBatch() } case <-timer.C: // Если сработал таймер, обрабатываем текущий пакет processBatch() case <-session.Context().Done(): // Завершаем работу при остановке сессии return nil } } } func main() { // Настройка Sarama config := sarama.NewConfig() config.Consumer.Return.Errors = true config.Version = sarama.V2_8_0_0 // Укажите версию Kafka config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin config.Consumer.Offsets.Initial = sarama.OffsetOldest // Начать с самого старого сообщения // Адреса брокеров Kafka brokers := []string{"localhost:9092"} group := "example-group" topics := []string{"example-topic"} // Создание ConsumerGroup consumer := ConsumerGroupHandler{} client, err := sarama.NewConsumerGroup(brokers, group, config) if err != nil { log.Fatalf("Error creating consumer group client: %v", err) } defer client.Close() // Контекст для graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Обработка ошибок go func() { for err := range client.Errors() { log.Printf("Error: %v", err) } }() // Обработка сигналов завершения работы go func() { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) <-signals cancel() }() // Запуск потребителя for { if err := client.Consume(ctx, topics, &consumer); err != nil { log.Printf("Error during consumption: %v", err) } // Проверяем, завершился ли контекст if ctx.Err() != nil { return } } } |
Что изменилось для пакетной фиксации:
- Буферизация сообщений:
- Сообщения собираются в массив
batch
для обработки пакетами.
- Сообщения собираются в массив
- Фиксация последнего смещения:
- После обработки пакета вызывается
session.MarkOffset
для последнего сообщения в пакете:
12session.MarkOffset(batch[len(batch)-1].Topic, batch[len(batch)-1].Partition, batch[len(batch)-1].Offset+1, "")
- После обработки пакета вызывается
- Это фиксирует смещение как обработанное.
- Таймер и размер пакета:
- Если пакет не заполняется до максимального размера (
batchSize
), он будет обработан по таймеру (flushInterval
).
- Если пакет не заполняется до максимального размера (
- Очистка пакета:
- После обработки пакет очищается.
Преимущества такого подхода:
- Повышение производительности: Смещения фиксируются пакетно, что уменьшает количество операций записи в Kafka.
- Гибкость: Вы можете настроить размер пакета и интервал времени для достижения баланса между производительностью и надежностью.
- Надежность: Используется явная фиксация смещений, что позволяет контролировать точку восстановления в случае сбоя.
Recommended Posts
clickhouse-go лучшие практики
16.04.2024