Kafka, как вычитать большое количество записей, обработать их и только потом пометить их все как прочитанные?
Вот пример на Go с использованием библиотеки Sarama, где происходит вычитывание большого количества записей (например, 10,000), их обработка и ручной коммит оффсетов после успешной обработки.
Пример кода: Чтение, обработка и коммит оффсетов
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
package main import ( "fmt" "log" "os" "os/signal" "github.com/Shopify/sarama" ) func main() { // Конфигурация Sarama config := sarama.NewConfig() config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange config.Consumer.Offsets.Initial = sarama.OffsetOldest config.Version = sarama.V2_6_0_0 // Укажите версию Kafka, соответствующую вашей среде // Ручной коммит оффсетов config.Consumer.Offsets.AutoCommit.Enable = false // Адреса брокеров и имя группы brokers := []string{"localhost:9092"} groupID := "example-group" topic := "example-topic" // Создаем consumer group consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config) if err != nil { log.Fatalf("Failed to create consumer group: %v", err) } defer consumerGroup.Close() // Обработка сигналов завершения sigterm := make(chan os.Signal, 1) signal.Notify(sigterm, os.Interrupt) // Инициализируем ConsumerHandler handler := &BatchConsumerHandler{ messages: make([]*sarama.ConsumerMessage, 0), batchSize: 10000, // Размер пакета для обработки } // Основной цикл обработки go func() { for { if err := consumerGroup.Consume(nil, []string{topic}, handler); err != nil { log.Printf("Error from consumer: %v", err) } } }() fmt.Println("Consumer is running...") select { case <-sigterm: fmt.Println("Terminating...") } } // BatchConsumerHandler обрабатывает сообщения пакетами type BatchConsumerHandler struct { messages []*sarama.ConsumerMessage // Хранилище для сообщений batchSize int // Размер пакета для обработки } // Setup вызывается при инициализации consumer group func (h *BatchConsumerHandler) Setup(sarama.ConsumerGroupSession) error { fmt.Println("Consumer group session setup") return nil } // Cleanup вызывается при завершении consumer group func (h *BatchConsumerHandler) Cleanup(sarama.ConsumerGroupSession) error { fmt.Println("Consumer group session cleanup") return nil } // ConsumeClaim читает сообщения из Kafka func (h *BatchConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for message := range claim.Messages() { h.messages = append(h.messages, message) // Если собрали достаточно сообщений для пакета, обрабатываем их if len(h.messages) >= h.batchSize { h.processBatch(session) } } // Обработка оставшихся сообщений (если количество меньше batchSize) if len(h.messages) > 0 { h.processBatch(session) } return nil } // processBatch обрабатывает пакет сообщений и коммитит оффсеты func (h *BatchConsumerHandler) processBatch(session sarama.ConsumerGroupSession) { fmt.Printf("Processing batch of %d messages\n", len(h.messages)) // Пример обработки сообщений for _, msg := range h.messages { fmt.Printf("Processing message: topic=%s partition=%d offset=%d value=%s\n", *msg.Topic, msg.Partition, msg.Offset, string(msg.Value)) } // Коммитим оффсеты после успешной обработки for _, msg := range h.messages { session.MarkMessage(msg, "") } session.Commit() fmt.Println("Batch committed") // Очищаем список сообщений h.messages = h.messages[:0] } |
Объяснение кода
- Ручное управление оффсетами:
- Отключен автокоммит (
config.Consumer.Offsets.AutoCommit.Enable = false
). - После успешной обработки всех сообщений в пакете вызывается
session.MarkMessage
и затемsession.Commit
для фиксации оффсетов.
- Отключен автокоммит (
- Сбор сообщений в пакеты:
- Сообщения накапливаются в массив
h.messages
. - Когда количество сообщений достигает
batchSize
, вызывается функцияprocessBatch
.
- Сообщения накапливаются в массив
- Обработка пакета:
- Сообщения из пакета проходят обработку (в данном примере просто выводятся в консоль).
- После успешной обработки вызывается коммит оффсетов для всех сообщений в пакете.
- Завершение работы:
- Обрабатываются оставшиеся сообщения, если их количество меньше
batchSize
.
- Обрабатываются оставшиеся сообщения, если их количество меньше
- Работа с сигналами:
- Программа корректно завершает работу при получении сигнала (например,
Ctrl+C
).
- Программа корректно завершает работу при получении сигнала (например,
Примечания
- Производительность:
- Увеличьте
batchSize
, чтобы уменьшить частоту коммитов оффсетов и снизить нагрузку на Kafka. - Используйте асинхронные вызовы или параллельную обработку, если требуется высокая скорость обработки.
- Увеличьте
- Устойчивость к сбоям:
- Если программа упадет до вызова
session.Commit
, Kafka отправит сообщения заново, так как их оффсеты не были зафиксированы.
- Если программа упадет до вызова
- Версия Kafka:
- Убедитесь, что версия Kafka совместима с указанной в
config.Version
.
- Убедитесь, что версия Kafka совместима с указанной в
Этот код позволяет эффективно обрабатывать большое количество сообщений из Kafka, собирая их в пакеты и фиксируя оффсеты только после успешной обработки.
Recommended Posts
clickhouse-go лучшие практики
16.04.2024