Мы создадим систему рабочих очередей, в которой будут появляться новые задания. Обработчики (workers) будут следить за очередью и выполнять работу по мере поступления. Система рабочих очередей идеально подходит для фоновых заданий, которые длятся дольше, чем обычный http-запрос. Один из примеров рабочих очередей: приложение обрабатывает загруженные пользователем фото, создает несколько версий и делится ими в различных социальных сетях. Изменение размера фото и загрузка их на другие сайты занимает время. Стоит ли выполнять эту работу внутри обработчика http? Скорее всего, нет. После загрузки фото сохраните его в удобном месте и передайте детали фоновым обработчикам.
Для выполнения этой задачи мы создадим систему, работающую в качестве очереди сообщений. Для этого нужна распределенная система рабочих очередей, в которой обработчики и поставщики задач находятся на разных серверах. Для создания такой распределенной системы нужна централизованная очередь сообщений/брокер сообщений и система для передачи сообщений, которая будет доставлять эти сообщения к обработчикам.
Существует несколько инструментов для решения этой задачи — Redis, Kafka, RabbitMQ, ZeroMQ, IronMQ, AWS SQS и т.д. В данном примере мы будем использовать RabbitMQ и Go, чтобы создать простую систему рабочих очередей.
Поставщики (Producers) и подписчики (Consumers): поставщик создает новые сообщения/задачи, а подписчики их принимают. В данном примере после загрузки файла http-обработчик создает сообщение для обработчиков. http-обработчик/веб-приложение — это поставщик, а фоновые обработчики — подписчики.
Точка обмена (Exchange) и очереди (Queues): Точки обмена получают сообщения от поставщиков и доставляют их в очереди. Подписчики принимают сообщения из очередей. RabbitMQ предоставляет мощные функции маршрутизации сообщений. Доставку сообщений в очереди можно настроить разными способами.
В этой статье мы не будем создавать полноценное веб-приложение с загрузкой файлов, поэтому все действия будут выполняться в командной строке. Мы создадим инструмент для публикации командной строки, который будет публиковать/создавать сообщения, а также подписчика, который будет принимать сообщения. Затем мы запустим несколько экземпляров подписчиков параллельно, чтобы продемонстрировать возможности масштабирования этой системы с добавлением большего количества обработчиков.
Мы создадим калькулятор, который может принимать два числа и выводить их сумму, и сделаем его масштабируемым с помощью Go и RabbitMQ.
Прежде чем приступить к созданию калькулятора, нужно установить RabbitMQ. Для нас, разработчиков, идеальным вариантом является localhost. Устанавливаем RabbitMQ на локальный компьютер и запускаем его.
Установка RabbitMQ варьируется в зависимости от платформы. На MacBook его можно установить с помощью Homebrew. В дистрибутиве Linux он, вероятно, доступен из менеджера пакетов. Для Windows должны быть устанавливаемые пакеты.
Теперь нужно установить пакет Go AMQP. Для этого я использую модули Go. Можно воспользоваться go get
или системой управления зависимостями.
Создаем каталог consumer
, внутри которого мы разместим приложение consumer. Приложение consumer должно подключиться к RabbitMQ, объявить очередь, которую оно хочет прослушать, а затем начать принимать сообщения.
Для начала создадим функцию обработки ошибок:
Если ошибка не nil
, то будет напечатано сообщение и детали ошибки, а затем работа будет прекращена. Так работает вышеуказанная функция.
Теперь переходим к установке соединения с RabbitMQ.
Попробуем подключиться к RabbitMQ и завершить работу в случае сбоя. URL соединения хранится в файле shared.go
верхнего уровня. Значение установлено в: amqp://guest:[email protected]:5672/
.
При успешном соединении нужно установить канал. Не путайте его с каналом Go. У RabbitMQ есть собственная концепция каналов. Соединение — это соединение TCP от клиента к серверу, создание которого — дорогостоящая операция. Канал служит протоколом связи по соединению и не занимает много ресурсов. Следует стремиться к ограничению количества подключений.
Теперь можно приступить к общению с RabbitMQ. Нужно сообщить серверу об интересующей нас очереди:
Далее создаем очередь с названием add
. Документацию с указанием аргументов функции можно найти здесь.
RabbitMQ начинает доставлять сообщения подписчикам в режиме круговой системы. Таким образом, он в равной степени распределяет работу для всех обработчиков. Если некоторые задания занимают больше времени, а другие завершаются в первую очередь, у одного обработчика будет много накопленных заданий, а другой останется без дела. Для устранения подобных сценариев просим RabbitMQ доставлять новые сообщения, только когда работник подтвердит предыдущее. В документации по функции Qos
можно найти более подробную информацию.
Переходим к обработке сообщений:
Просмотреть аргументы можно здесь. На этот раз получаем канал Go. Используем функцию range
для этого канала, чтобы получить сообщения.
Мы будем отправлять сообщения в формате JSON. Чтобы представить задачу для операции add, нужно снова определить тип в файле shared.go
:
Используем функцию range
для messageChannel
, чтобы декодировать тело сообщения в экземпляры AddTask
и затем суммировать Number1
и Number2
для получения результатов.
Запускаем горутину с вызовом go func()
. Она работает в фоновом режиме, поэтому нам нужен способ, гарантирующий, что основной интерфейс командной строки (обработчик), работающий на переднем плане, не завершит работу. Можно использовать канал и прослушивать его, чтобы продлить ожидание.
Тем временем мы просматриваем сообщения в горутине, обрабатываем тело сообщения и ошибки, а затем подтверждаем их. Во время вызова метода Consume
устанавливаем значение false
для autoAck
. Таким образом, подтверждение обработанного сообщения выполняется вручную. Если сообщение не подтверждено и обработчик теряет соединение, RabbitMQ передает это сообщение другим обработчикам. Благодаря этому можно повторять сообщения, даже когда обработчик выходит из строя.
Также не стоит забывать о подтверждении сообщений вручную при автоматическом отключении. В противном случае RabbitMQ не удалит сообщения (они не подтверждены — соответственно еще не выполнены), и они будут заполнять память.
Если мы выполним go build
внутри директории подписчиков и запустим ./cosumer
, то подписчик также должен запуститься (если все сделано правильно).
➜ consumer git:(master) ✗ ./consumer
2019/02/23 20:54:55 Consumer ready, PID: 36361
Подписчик выглядит следующим образом:
package main
import (
"encoding/json"
"log"
"os"
gopher_and_rabbit "github.com/masnun/gopher-and-rabbit"
"github.com/streadway/amqp"
)
func handleError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial(gopher_and_rabbit.Config.AMQPConnectionURL)
handleError(err, "Can't connect to AMQP")
defer conn.Close()
amqpChannel, err := conn.Channel()
handleError(err, "Can't create a amqpChannel")
defer amqpChannel.Close()
queue, err := amqpChannel.QueueDeclare("add", true, false, false, false, nil)
handleError(err, "Could not declare `add` queue")
err = amqpChannel.Qos(1, 0, false)
handleError(err, "Could not configure QoS")
messageChannel, err := amqpChannel.Consume(
queue.Name,
"",
false,
false,
false,
false,
nil,
)
handleError(err, "Could not register consumer")
stopChan := make(chan bool)
go func() {
log.Printf("Consumer ready, PID: %d", os.Getpid())
for d := range messageChannel {
log.Printf("Received a message: %s", d.Body)
addTask := &gopher_and_rabbit.AddTask{}
err := json.Unmarshal(d.Body, addTask)
if err != nil {
log.Printf("Error decoding JSON: %s", err)
}
log.Printf("Result of %d + %d is : %d", addTask.Number1, addTask.Number2, addTask.Number1+addTask.Number2)
if err := d.Ack(false); err != nil {
log.Printf("Error acknowledging message : %s", err)
} else {
log.Printf("Acknowledged message")
}
}
}()
// Остановка для завершения программы
<-stopChan
}
Переходим к созданию поставщика, который генерирует случайные числа и отправляет их в очередь add
. Для него также необходимо повторить объявление соединения, канала и очереди. Пропускаем эти действия и переходим к интересным частям.
Необходимо объявить очередь как для подписчика, так и для поставщика, поскольку неизвестно, кто из них приступит к выполнению первым. Поэтому нужно убедиться в постоянном наличии очереди до начала принятия/публикации.
Ранее мы видели тип AddTask
, теперь сгенерируем два случайных числа и создадим экземпляр. Затем перекодируем его в JSON для публикации в точке обмена.
И опубликуем его:
На данный момент код выглядит так:
package main
import (
"encoding/json"
"github.com/masnun/gopher-and-rabbit"
"github.com/streadway/amqp"
"log"
"math/rand"
"time"
)
func handleError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
conn, err := amqp.Dial(gopher_and_rabbit.Config.AMQPConnectionURL)
handleError(err, "Can't connect to AMQP")
defer conn.Close()
amqpChannel, err := conn.Channel()
handleError(err, "Can't create a amqpChannel")
defer amqpChannel.Close()
queue, err := amqpChannel.QueueDeclare("add", true, false, false, false, nil)
handleError(err, "Could not declare `add` queue")
rand.Seed(time.Now().UnixNano())
addTask := gopher_and_rabbit.AddTask{Number1: rand.Intn(999), Number2: rand.Intn(999)}
body, err := json.Marshal(addTask)
if err != nil {
handleError(err, "Error encoding JSON")
}
err = amqpChannel.Publish("", queue.Name, false, false, amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: body,
})
if err != nil {
log.Fatalf("Error publishing message: %s", err)
}
log.Printf("AddTask: %d+%d", addTask.Number1, addTask.Number2)
}
После сборки этого кода и запуска ./publisher
он ставит задачу в очередь. При работе одного или нескольких подписчиков можно увидеть результаты.
➜ publisher git:(master) ✗ ./publisher
2019/02/23 21:09:59 AddTask: 221+345
Окно подписчика:
➜ consumer git:(master) ✗ ./consumer
2019/02/23 20:54:55 Consumer ready, PID: 36361
2019/02/23 21:09:59 Received a message: {"Number1":221,"Number2":345}
2019/02/23 21:09:59 Result of 221 + 345 is : 566
2019/02/23 21:09:59 Acknowledged message
Сообщения публикуются в точки обмена и принимаются из очередей. В примере публикатора не указано название точки обмена. Если ее название является пустой строкой, RabbitMQ отправляет сообщение прямо в очередь, переданную как название очереди.
В данном случае использовалась простая именованная очередь. Точки обмена RabbitMQ могут выполнять намного больше действий. Существует несколько типов точек обмена сообщениями, с помощью которых можно разветвлять сообщения (доставлять одно и то же сообщение через несколько очередей) или выполнять сопоставление на основе тем при доставке сообщений в очереди. Точки обмена и очереди могут выполнять интеллектуальную настраиваемую маршрутизацию сообщений для обслуживания сложных случаев использования и создания сложных распределенных систем.
Код из этой статьи доступен на Github.
Перевод статьи Abu Ashraf Masnun: Work queue with Go and RabbitMQ
Комментарии