Golang Sarama пример отправки сообщения в топик Kafka через асинхронный продюсер
Вот пример отправки сообщения в Kafka с использованием асинхронного продюсера из библиотеки Sarama:
Пример: Асинхронный Kafka продюсер как объект
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 |
package main import ( "log" "os" "os/signal" "sync" "syscall" "github.com/Shopify/sarama" ) // KafkaProducer - структура для асинхронного продюсера type KafkaProducer struct { producer sarama.AsyncProducer topic string stop chan os.Signal wg sync.WaitGroup } // NewKafkaProducer - конструктор для создания нового KafkaProducer func NewKafkaProducer(brokers []string, topic string) (*KafkaProducer, error) { // Конфигурация продюсера config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForLocal // Дождаться подтверждения от лидера config.Producer.Retry.Max = 5 // Количество попыток повторной отправки config.Producer.Return.Successes = true // Возвращать успешные отправки config.Producer.Return.Errors = true // Возвращать ошибки отправки config.Producer.Partitioner = sarama.NewRandomPartitioner // Случайное распределение по партициям // Создание асинхронного продюсера producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { return nil, err } return &KafkaProducer{ producer: producer, topic: topic, stop: make(chan os.Signal, 1), }, nil } // Start - метод для запуска продюсера func (k *KafkaProducer) Start() { // Обработка сигналов для завершения работы signal.Notify(k.stop, os.Interrupt, syscall.SIGTERM) // Горутина для обработки успешных сообщений k.wg.Add(1) go func() { defer k.wg.Done() for success := range k.producer.Successes() { log.Printf("Сообщение успешно отправлено в партицию %d, с оффсетом %d", success.Partition, success.Offset) } }() // Горутина для обработки ошибок k.wg.Add(1) go func() { defer k.wg.Done() for err := range k.producer.Errors() { log.Printf("Ошибка отправки сообщения: %v", err.Err) } }() } // SendMessage - метод для отправки сообщения в Kafka func (k *KafkaProducer) SendMessage(key string, value string) { message := &sarama.ProducerMessage{ Topic: k.topic, Key: sarama.StringEncoder(key), // Ключ сообщения Value: sarama.StringEncoder(value), } // Отправка сообщения k.producer.Input() <- message log.Printf("Сообщение отправлено в очередь на отправку: %s", value) } // Close - метод для закрытия продюсера func (k *KafkaProducer) Close() { close(k.producer.Input()) // Закрытие канала для отправки сообщений k.wg.Wait() // Ожидание завершения горутин if err := k.producer.Close(); err != nil { log.Fatalf("Ошибка закрытия продюсера: %v", err) } log.Println("Программа завершена") } func main() { // Адреса Kafka брокеров brokers := []string{"localhost:9092"} // Название топика topic := "example-topic" // Создание KafkaProducer producer, err := NewKafkaProducer(brokers, topic) if err != nil { log.Fatalf("Ошибка создания продюсера: %v", err) } defer producer.Close() // Запуск продюсера producer.Start() // Отправка сообщений for i := 0; i < 10; i++ { producer.SendMessage("example-key", "Hello, Kafka! Message number: "+string(i)) } // Ожидание завершения работы программы <-producer.stop } |
Что изменено?
- KafkaProducer:
- Мы обернули логику асинхронного Kafka-продюсера в структуру
KafkaProducer
, которая инкапсулирует всю работу с Kafka. - Эта структура включает:
producer
: сам асинхронный продюсер (sarama.AsyncProducer
).topic
: имя топика, в который будут отправляться сообщения.stop
: канал для обработки сигналов завершения.wg
:sync.WaitGroup
для ожидания завершения работы горутин.
- Мы обернули логику асинхронного Kafka-продюсера в структуру
- Методы:
- NewKafkaProducer: Конструктор для создания нового продюсера.
- Start: Запуск асинхронных горутин для обработки успешных сообщений и ошибок.
- SendMessage: Метод для отправки сообщений в Kafka.
- Close: Метод для закрытия продюсера и корректного завершения работы.
- Основной код:
- В основном коде создается объект
KafkaProducer
, который управляет отправкой сообщений и обработкой завершения работы.
- В основном коде создается объект
Как это работает:
- Создание и запуск:
- Создается объект
KafkaProducer
с использованиемNewKafkaProducer
. - Метод
Start
запускает горутины для обработки успешных сообщений и ошибок.
- Создается объект
- Отправка сообщений:
- Метод
SendMessage
отправляет сообщения в Kafka через асинхронный каналproducer.Input()
.
- Метод
- Ожидание завершения:
- Программа ожидает завершения работы с Kafka через канал
stop
, который перехватывает системные сигналы (например,os.Interrupt
илиsyscall.SIGTERM
). - После этого вызывается метод
Close
, который закрывает канал и ожидает завершения горутин с помощьюsync.WaitGroup
.
- Программа ожидает завершения работы с Kafka через канал
Преимущества такой структуры:
- Чистота кода: Логика работы с Kafka инкапсулирована в одном объекте.
- Модульность: Легко управлять отправкой сообщений и их обработкой.
- Гибкость: Легко менять параметры, такие как топик, брокеры, или методы обработки сообщений.
Recommended Posts
Golang Sarama: настройка Partitioner
20.03.2024