Asyncworker — Microframework para consumers assíncronos em Python
Na Sieve, as formas mais comuns de integração entre serviços são através de chamadas síncronas via requisições HTTP, ou de chamadas assíncronas a workers AMQP. Para HTTP, clientes ou servidores, utilizamos aiohttp para tudo. Atualmente, utilizamos o asyncworker na construção de novos workers e no processo de modernização de aplicações existentes.
Nesse post, vou tentar mostrar através de exemplos práticos por que criamos e utilizamos o asyncworker. Pra isso, vamos considerar como exemplo uma aplicação que consome mensagens de uma fila no rabbitmq e indexa essas mensagens em um Elasticsearch.
Primeiro, nós precisamos de uma instância do rabbitmq rodando. Vamos fazer isso com docker:
docker run -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_VHOST=/ --name rabbitmq rabbitmq:3.7.8-management-alpine
No rabbit, a porta 5672 é utilizada para AMQP e a 15672 para interface de gerenciamento HTTP. Essa imagem tem usuário e senha padrão como guest/guest
. Você consegue acompanhar o processo de start da aplicação através dos logs:
docker logs -f rabbitmq
Producer
Como queremos escrever um consumer com o asyncworker, agora que já temos uma instância do rabbitmq rodando, precisamos criar e alimentar uma fila, escrevendo um producer para dar algum trabalho para o nosso consumer.
Nosso producer vai publicar uma mensagem nesta fila, contendo uma palavra a ser indexada no Elasticsearch pelo nosso consumer. Como input, vamos utilizar uma lista de palavras: https://s3.amazonaws.com/diogo.martins/public/portuguese-brazil.txt
Esse arquivo contém uma palavra por linha, ~1 milhão de palavras e 11,4mb. Como não queremos ler todo o arquivo de uma vez e nem precisamos de todas as linhas pra esse exemplo, vamos fazer o streaming da parte do corpo que precisamos e vamos usar aiohttp pra isso.
pip install aiohttp==3.4.4
Agora que já conseguimos ler linha a linha a lista de palavras, tudo que precisamos é nos conectar ao rabbitmq e publicar cada uma das mensagens na fila words_to_index.
Após rodarmos o script, podemos observar que 10000 mensagens foram produzidas.
Consumer
Vamos começar instalando as dependências:
pip install async-worker==0.5.0 aioelasticsearch==0.5.2
Agora vamos subir uma instância de Elasticsearch, para que o nosso consumer consiga indexar as mensagens:
docker run -d -p 9200:9200 -p 9300:9300 -e “discovery.type=single-node” --name elasticsearch docker.elastic.co/elasticsearch/elasticsearch:6.4.2
O código é bem simples, mas tem bastante coisa acontecendo por debaixo dos panos:
- ack / reject automático de mensagens. Nesse caso, se o handler rodar com sucesso, todas as mensagem sofrerão
ACK
. Caso uma exceção não tratada seja capturada pelo asyncworker, todas as mensagens sofrerãoREJECT
e voltarão para a fila; - Reconexão automática em caso de perda de conexão;
- Um consumer é criado para cada rota do handler e mensagens são empurradas pelo rabbitmq, neste caso, com prefetch de 512 mensagens;
- Recebemos dados em lote. Configuramos nossa rota com a opção de bulk de 256 mensagens, ou seja, o nosso handler será chamado com uma lista de 256 mensagens, o que é perfeito para o nosso caso em que queremos fazer um insert em bulk no elasticsearch.
Neste exemplo, estamos consumindo mensagens somente da fila words_to_index, mas existem casos onde queremos tratar mensagens de filas diferentes da mesma forma. Pra isso, basta adicionarmos mais elementos à lista de rotas deste handler:
@app.route(["queue_name1", "queue_name2", "queue_name3"], vhost="/")
Já utilizarmos o asyncworker de forma estável em diversos serviços em produção e, comprovadamente, atende o seu principal objetivo: Ser simples e fazer com que o desenvolvedor se foque nas regras de negócio da aplicação e abstraia as peculiaridades de comunicação.
O que mostrei aqui é só uma parte do que é possível fazer com o asyncworker hoje, e ainda vamos adicionar muitas funcionalidades ao projeto, então recomendo à todos que acompanhem o projeto. Se você gostou, nos ajude colaborando no projeto ou dando uma estrela no github =)