Рабочая очередь в Go с RabbitMQ


Мы создадим систему рабочих очередей, в которой будут появляться новые задания. Обработчики (workers) будут следить за очередью и выполнять работу по мере поступления. Система рабочих очередей идеально подходит для фоновых заданий, которые длятся дольше, чем обычный http-запрос. Один из примеров рабочих очередей: приложение обрабатывает загруженные пользователем фото, создает несколько версий и делится ими в различных социальных сетях. Изменение размера фото и загрузка их на другие сайты занимает время. Стоит ли выполнять эту работу внутри обработчика http? Скорее всего, нет. После загрузки фото сохраните его в удобном месте и передайте детали фоновым обработчикам.

Система сообщений

Для выполнения этой задачи мы создадим систему, работающую в качестве очереди сообщений. Для этого нужна распределенная система рабочих очередей, в которой обработчики и поставщики задач находятся на разных серверах. Для создания такой распределенной системы нужна централизованная очередь сообщений/брокер сообщений и система для передачи сообщений, которая будет доставлять эти сообщения к обработчикам.

Существует несколько инструментов для решения этой задачи — Redis, Kafka, RabbitMQ, ZeroMQ, IronMQ, AWS SQS и т.д. В данном примере мы будем использовать RabbitMQ и Go, чтобы создать простую систему рабочих очередей.

Концепции RabbitMQ

Поставщики (Producers) и подписчики (Consumers): поставщик создает новые сообщения/задачи, а подписчики их принимают. В данном примере после загрузки файла http-обработчик создает сообщение для обработчиков. http-обработчик/веб-приложение — это поставщик, а фоновые обработчики — подписчики.

Точка обмена (Exchange) и очереди (Queues): Точки обмена получают сообщения от поставщиков и доставляют их в очереди. Подписчики принимают сообщения из очередей. RabbitMQ предоставляет мощные функции маршрутизации сообщений. Доставку сообщений в очереди можно настроить разными способами.

Пример командной строки

В этой статье мы не будем создавать полноценное веб-приложение с загрузкой файлов, поэтому все действия будут выполняться в командной строке. Мы создадим инструмент для публикации командной строки, который будет публиковать/создавать сообщения, а также подписчика, который будет принимать сообщения. Затем мы запустим несколько экземпляров подписчиков параллельно, чтобы продемонстрировать возможности масштабирования этой системы с добавлением большего количества обработчиков.

Мы создадим калькулятор, который может принимать два числа и выводить их сумму, и сделаем его масштабируемым с помощью Go и RabbitMQ.

Подготовка

Прежде чем приступить к созданию калькулятора, нужно установить RabbitMQ. Для нас, разработчиков, идеальным вариантом является localhost. Устанавливаем RabbitMQ на локальный компьютер и запускаем его.

Установка RabbitMQ варьируется в зависимости от платформы. На MacBook его можно установить с помощью Homebrew. В дистрибутиве Linux он, вероятно, доступен из менеджера пакетов. Для Windows должны быть устанавливаемые пакеты.

Теперь нужно установить пакет Go AMQP. Для этого я использую модули Go. Можно воспользоваться go get или системой управления зависимостями.

go get github.com/streadway/amqp

Создание подписчика

Создаем каталог consumer, внутри которого мы разместим приложение consumer. Приложение consumer должно подключиться к RabbitMQ, объявить очередь, которую оно хочет прослушать, а затем начать принимать сообщения.

Для начала создадим функцию обработки ошибок:

Если ошибка не nil, то будет напечатано сообщение и детали ошибки, а затем работа будет прекращена. Так работает вышеуказанная функция.

Теперь переходим к установке соединения с RabbitMQ.

Попробуем подключиться к RabbitMQ и завершить работу в случае сбоя. URL соединения хранится в файле shared.go верхнего уровня. Значение установлено в: amqp://guest:[email protected]:5672/.

При успешном соединении нужно установить канал. Не путайте его с каналом Go. У RabbitMQ есть собственная концепция каналов. Соединение — это соединение TCP от клиента к серверу, создание которого — дорогостоящая операция. Канал служит протоколом связи по соединению и не занимает много ресурсов. Следует стремиться к ограничению количества подключений.

Теперь можно приступить к общению с RabbitMQ. Нужно сообщить серверу об интересующей нас очереди:

Далее создаем очередь с названием add. Документацию с указанием аргументов функции можно найти здесь.

RabbitMQ начинает доставлять сообщения подписчикам в режиме круговой системы. Таким образом, он в равной степени распределяет работу для всех обработчиков. Если некоторые задания занимают больше времени, а другие завершаются в первую очередь, у одного обработчика будет много накопленных заданий, а другой останется без дела. Для устранения подобных сценариев просим RabbitMQ доставлять новые сообщения, только когда работник подтвердит предыдущее. В документации по функции Qos можно найти более подробную информацию.

Переходим к обработке сообщений:

Просмотреть аргументы можно здесь. На этот раз получаем канал Go. Используем функцию range для этого канала, чтобы получить сообщения.

Мы будем отправлять сообщения в формате JSON. Чтобы представить задачу для операции add, нужно снова определить тип в файле shared.go:

Используем функцию range для messageChannel, чтобы декодировать тело сообщения в экземпляры AddTask и затем суммировать Number1 и Number2 для получения результатов.

Запускаем горутину с вызовом go func(). Она работает в фоновом режиме, поэтому нам нужен способ, гарантирующий, что основной интерфейс командной строки (обработчик), работающий на переднем плане, не завершит работу. Можно использовать канал и прослушивать его, чтобы продлить ожидание.

Тем временем мы просматриваем сообщения в горутине, обрабатываем тело сообщения и ошибки, а затем подтверждаем их. Во время вызова метода Consume устанавливаем значение false для autoAck. Таким образом, подтверждение обработанного сообщения выполняется вручную. Если сообщение не подтверждено и обработчик теряет соединение, RabbitMQ передает это сообщение другим обработчикам. Благодаря этому можно повторять сообщения, даже когда обработчик выходит из строя.

Также не стоит забывать о подтверждении сообщений вручную при автоматическом отключении. В противном случае RabbitMQ не удалит сообщения (они не подтверждены — соответственно еще не выполнены), и они будут заполнять память.

Если мы выполним go build внутри директории подписчиков и запустим ./cosumer, то подписчик также должен запуститься (если все сделано правильно).

➜ consumer git:(master) ✗ ./consumer 2019/02/23 20:54:55 Consumer ready, PID: 36361

Подписчик выглядит следующим образом:

package main import ( "encoding/json" "log" "os" gopher_and_rabbit "github.com/masnun/gopher-and-rabbit" "github.com/streadway/amqp" ) func handleError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial(gopher_and_rabbit.Config.AMQPConnectionURL) handleError(err, "Can't connect to AMQP") defer conn.Close() amqpChannel, err := conn.Channel() handleError(err, "Can't create a amqpChannel") defer amqpChannel.Close() queue, err := amqpChannel.QueueDeclare("add", true, false, false, false, nil) handleError(err, "Could not declare `add` queue") err = amqpChannel.Qos(1, 0, false) handleError(err, "Could not configure QoS") messageChannel, err := amqpChannel.Consume( queue.Name, "", false, false, false, false, nil, ) handleError(err, "Could not register consumer") stopChan := make(chan bool) go func() { log.Printf("Consumer ready, PID: %d", os.Getpid()) for d := range messageChannel { log.Printf("Received a message: %s", d.Body) addTask := &gopher_and_rabbit.AddTask{} err := json.Unmarshal(d.Body, addTask) if err != nil { log.Printf("Error decoding JSON: %s", err) } log.Printf("Result of %d + %d is : %d", addTask.Number1, addTask.Number2, addTask.Number1+addTask.Number2) if err := d.Ack(false); err != nil { log.Printf("Error acknowledging message : %s", err) } else { log.Printf("Acknowledged message") } } }() // Остановка для завершения программы <-stopChan }

Создание поставщика

Переходим к созданию поставщика, который генерирует случайные числа и отправляет их в очередь add. Для него также необходимо повторить объявление соединения, канала и очереди. Пропускаем эти действия и переходим к интересным частям.

Необходимо объявить очередь как для подписчика, так и для поставщика, поскольку неизвестно, кто из них приступит к выполнению первым. Поэтому нужно убедиться в постоянном наличии очереди до начала принятия/публикации.

Ранее мы видели тип AddTask, теперь сгенерируем два случайных числа и создадим экземпляр. Затем перекодируем его в JSON для публикации в точке обмена.

И опубликуем его:

На данный момент код выглядит так:

package main import ( "encoding/json" "github.com/masnun/gopher-and-rabbit" "github.com/streadway/amqp" "log" "math/rand" "time" ) func handleError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } func main() { conn, err := amqp.Dial(gopher_and_rabbit.Config.AMQPConnectionURL) handleError(err, "Can't connect to AMQP") defer conn.Close() amqpChannel, err := conn.Channel() handleError(err, "Can't create a amqpChannel") defer amqpChannel.Close() queue, err := amqpChannel.QueueDeclare("add", true, false, false, false, nil) handleError(err, "Could not declare `add` queue") rand.Seed(time.Now().UnixNano()) addTask := gopher_and_rabbit.AddTask{Number1: rand.Intn(999), Number2: rand.Intn(999)} body, err := json.Marshal(addTask) if err != nil { handleError(err, "Error encoding JSON") } err = amqpChannel.Publish("", queue.Name, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, ContentType: "text/plain", Body: body, }) if err != nil { log.Fatalf("Error publishing message: %s", err) } log.Printf("AddTask: %d+%d", addTask.Number1, addTask.Number2) }

После сборки этого кода и запуска ./publisher он ставит задачу в очередь. При работе одного или нескольких подписчиков можно увидеть результаты.

➜ publisher git:(master) ✗ ./publisher 2019/02/23 21:09:59 AddTask: 221+345

Окно подписчика:

➜ consumer git:(master) ✗ ./consumer 2019/02/23 20:54:55 Consumer ready, PID: 36361 2019/02/23 21:09:59 Received a message: {"Number1":221,"Number2":345} 2019/02/23 21:09:59 Result of 221 + 345 is : 566 2019/02/23 21:09:59 Acknowledged message

Пустое имя точки обмена и очереди

Сообщения публикуются в точки обмена и принимаются из очередей. В примере публикатора не указано название точки обмена. Если ее название является пустой строкой, RabbitMQ отправляет сообщение прямо в очередь, переданную как название очереди.

Продвинутое использование

В данном случае использовалась простая именованная очередь. Точки обмена RabbitMQ могут выполнять намного больше действий. Существует несколько типов точек обмена сообщениями, с помощью которых можно разветвлять сообщения (доставлять одно и то же сообщение через несколько очередей) или выполнять сопоставление на основе тем при доставке сообщений в очереди. Точки обмена и очереди могут выполнять интеллектуальную настраиваемую маршрутизацию сообщений для обслуживания сложных случаев использования и создания сложных распределенных систем.

Код из этой статьи доступен на Github.


Перевод статьи Abu Ashraf Masnun: Work queue with Go and RabbitMQ


Поделиться статьей:


Вернуться к статьям

Комментарии

    Ничего не найдено.