Golang Sarama и асинхронная работа с Kafka
Вот пример кода на Go, который демонстрирует использование sarama.AsyncProducer
для отправки сообщений в Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
package main import ( "fmt" "log" "os" "os/signal" "strings" "time" "github.com/Shopify/sarama" ) func main() { // Конфигурация Sarama config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll // Ждать подтверждения от всех ISR config.Producer.Retry.Max = 5 // Количество попыток при ошибке config.Producer.Return.Successes = true // Возвращать успешные отправки в канал Successes config.Producer.Return.Errors = true // Возвращать ошибки в канал Errors // Адреса Kafka-брокеров brokers := []string{"localhost:9092"} // Создание AsyncProducer producer, err := sarama.NewAsyncProducer(brokers, config) if err != nil { log.Fatalf("Не удалось создать продюсера: %v", err) } defer func() { if err := producer.Close(); err != nil { log.Printf("Ошибка при закрытии продюсера: %v", err) } }() // Канал для обработки сигналов остановки signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt) // Топик для отправки сообщений topic := "example-topic" // Горутин для обработки результатов go func() { for { select { case success := <-producer.Successes(): fmt.Printf("Сообщение успешно отправлено в %s:%d\n", success.Topic, success.Partition) case err := <-producer.Errors(): fmt.Printf("Ошибка при отправке сообщения: %v\n", err.Err) } } }() // Отправка сообщений в Kafka for i := 0; i < 10; i++ { message := fmt.Sprintf("Сообщение %d", i+1) producer.Input() <- &sarama.ProducerMessage{ Topic: topic, Value: sarama.StringEncoder(message), } fmt.Printf("Отправлено сообщение: %s\n", message) // Искусственная задержка для наглядности time.Sleep(500 * time.Millisecond) } // Ожидание завершения или сигнала остановки fmt.Println("Нажмите Ctrl+C для выхода...") <-signals fmt.Println("Завершаем работу.") } |
Пояснение:
- Конфигурация продюсера (
sarama.NewConfig
):RequiredAcks
: определяет, сколько подтверждений нужно получить от брокеров (WaitForAll
ждет подтверждения от всех ISR).Retry.Max
: максимальное число повторных попыток при ошибке отправки.Return.Successes
иReturn.Errors
: включают каналы для обработки успешных и неуспешных отправок.
- Создание AsyncProducer: Используется
sarama.NewAsyncProducer
, который позволяет отправлять сообщения асинхронно через каналproducer.Input()
. - Обработка результатов: В отдельной горутине обрабатываются успешные отправки (
producer.Successes()
) и ошибки (producer.Errors()
). - Отправка сообщений: Сообщения отправляются через канал
producer.Input()
. Каждое сообщение включает:Topic
: название топика.Value
: данные сообщения (здесь используетсяsarama.StringEncoder
).
- Завершение работы: Обработка сигнала
os.Interrupt
позволяет завершить работу продюсера корректно.
Результат работы:
При успешной отправке сообщений в Kafka вы увидите в консоли что-то вроде:
1 2 3 4 5 6 7 |
Отправлено сообщение: Сообщение 1 Сообщение успешно отправлено в example-topic:0 Отправлено сообщение: Сообщение 2 Сообщение успешно отправлено в example-topic:0 ... Нажмите Ctrl+C для выхода... |
Если возникнут ошибки, они будут выведены в консоль.
Recommended Posts
Golang Sarama: настройка Partitioner
20.03.2024