Основное различие между session.MarkMessage и session.MarkOffset в библиотеке Sarama
Функция | Описание | Использование |
---|---|---|
session.MarkMessage |
Позволяет пометить конкретное сообщение как обработанное. Использует все детали сообщения (Topic , Partition , Offset ). |
Удобно, когда вы работаете с конкретными сообщениями, например, в цикле чтения. |
session.MarkOffset |
Позволяет явно указать смещение (Offset ), которое вы хотите зафиксировать как обработанное. |
Используется, когда вы хотите контролировать фиксацию смещений вручную, например, при пакетной обработке. |
Подробности:
1. session.MarkMessage
- Используется, чтобы пометить отдельное сообщение как обработанное.
- Автоматически берет информацию из объекта сообщения (
sarama.ConsumerMessage
), включаяTopic
,Partition
иOffset
. - Упрощает работу, если вы обрабатываете сообщения по одному.
Пример:
1 2 3 4 5 |
for message := range claim.Messages() { fmt.Printf("Processing message: %s\n", string(message.Value)) session.MarkMessage(message, "") // Помечаем сообщение как обработанное } |
MarkMessage
, библиотека Sarama автоматически фиксирует смещение последнего обработанного сообщения в Kafka.
2. session.MarkOffset
- Предоставляет более низкоуровневый контроль.
- Позволяет вручную указать:
- Тему (
topic
), - Раздел (
partition
), - Смещение (
offset
) следующего сообщения, которое должно быть обработано.
- Тему (
- Используется, когда вы обрабатываете сообщения пакетами и хотите зафиксировать обработку целой группы сообщений сразу.
Пример:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
batch := []*sarama.ConsumerMessage{} for message := range claim.Messages() { batch = append(batch, message) if len(batch) >= batchSize { // Обработка пакета processBatch(batch) // Явно фиксируем смещение последнего сообщения lastMessage := batch[len(batch)-1] session.MarkOffset(lastMessage.Topic, lastMessage.Partition, lastMessage.Offset+1, "") batch = nil } } |
Когда использовать какую функцию?
- Используйте
session.MarkMessage
, если:- Вы обрабатываете сообщения по одному (потоковая обработка).
- Вам не нужно вручную контролировать смещения.
- Используйте
session.MarkOffset
, если:- Вы обрабатываете сообщения группами (пакетами).
- Вам нужен более явный контроль за тем, какие смещения фиксируются в Kafka.
Пример сравнительного использования:
Если вы обрабатываете поток сообщений:
1 2 3 4 5 |
for message := range claim.Messages() { fmt.Printf("Processing message: %s\n", string(message.Value)) session.MarkMessage(message, "") // Удобно и просто } |
1 2 3 4 5 6 7 8 9 10 11 12 13 |
var batch []*sarama.ConsumerMessage batchSize := 10 for message := range claim.Messages() { batch = append(batch, message) if len(batch) >= batchSize { processBatch(batch) lastMessage := batch[len(batch)-1] session.MarkOffset(lastMessage.Topic, lastMessage.Partition, lastMessage.Offset+1, "") batch = nil } } |
Итог:
MarkMessage
— удобный способ для автоматической фиксации смещений, если вы работаете с каждым сообщением отдельно.MarkOffset
— предоставляет гибкость и контроль, особенно при пакетной обработке.
Recommended Posts
clickhouse-go лучшие практики
16.04.2024