An asynchronous RabbitMQ client in Python

1 Background

Feng Gao
4 min readSep 12, 2020

RabbitMQ is high performance of message-queueing software writing in Erlang. It is used to dispatch messages it receives. The client connecting and sending messages to it is called producer and that waiting for message is called consumer. The basic skeleton of RabbitMQ is as follow:

RabbitMQ

RabbitMQ is widely used in a large system which helps to load balances. Many time-wasting processes will bring out worse user experiences. For example, if you purchase online on credit card, what does happen when in backend? Your order information will be split into various messages and those will be sent to RabbitMQ or any other message queue system. The repository client will consume this message and prepares the goods for you. The payment client will consume the message and email the bill to you. As you can see, all the works are done by different worker client. As you can see, you don’t have to wait the above jobs and all you to do is “one-click”.

If thousands of client send messages to RabbitMQ simultaneously, what would like to occur?

Traditional RabbitMQ’s architecture is the publishers , consumers and RabbitMQ server are distributed among different hosts. publishers and consumers connect to server with TCP connection. As we all known, IO will do harm to system performance because it will invoke system call. Multithreads solution seems to be nice but it also brings out disasters if you take C10K problem into consideration.

Is any strategies that we can take? The answer is yes, using asynchronous no-blocking framework.

2 Pika RabbitMQ Driver

Pika is RabbitMQ driver for Python.

2.1 Synchronous Producer

Let’s take the first glancing at synchronous producer.

How can we use this publisher client?

With SyncAMQProducer("127.0.0.1", "exchange_name") as p:
p.publish("topic.one", "message to send")

When we create a instance of SyncRabbitMQProducer, it establishes a TCP connection with RabbitMQ server with BlockingConnection which means it will block the process. When invokes publish method, it will check whether channel has been built and then publish messages.

2.2 Drawbacks

The handshake process for a RabbitMQ connection is actually quite involved and requires at least 7 TCP packets (more if TLS is used). Each publisher establishes a connection, creates a new channel, sends messages and closes channel and connection. Most of time is wasted in system IO. How can we solve this problem? The answer is TornadoConnection which is provided by pika library. You can using tornado IO loop to achieve asynchronous connection.

3 Asynchronous Client

  • Each RabbitMQ client has two connections, one for publishing message and the other for consuming message.
  • Creating a new channel as long as it publish message.
  • Aside from publisher and consumer, it have to plays a role of RPC.
  • The client must to be robust which means you have to handler errors as much as possible.

3.1 Asynchronous Connection

Here in line 28, we create new instance of TornadoConnection but its initialization is asynchronous, it will yield out by IO loop. Once connection is open, it will callback open_callback in line 15 . We set connection into Future as result. In line 2, the connect method wait those two connection to establish before it return. But we should be aware of connect method is also asynchronous.

3.2 Create new Channel

We have to admit the fact that RabbitMQ connections are a lot more resource heavy than channel. Connection should be long live, and channels can be open and closed frequently. So our strategy is that we create a new channel for each publish and consume. After publishing message, the channel will be useless so we choose to close it as soon as possible.

I guess you have known how the code works. It does not create a new channel as channel() method called in line 16. It will invoke open_callback in line 11 once the channel is open.

3.3 Exchange, Queue and Routing Key

The exchange, queue and routing key are basic terminologies in RabbitMQ. All of them can be declared or bound in a synchronous way.

Those three methods share the same idea with previous asynchronouse code.

3.3 Publish

All the codes are self-explanatory.

3.4 Receive

As we mentioned above, we have to take RPC into consideration. So after consuming the message by passing handler, we check the message properties whether it was not None. If it was not none, we create a channel to publish result into RabbitMQ. Here is the code:

Different from publish, receive declares exchange and queue and binds routing key ahead.

3.5 RPC

RabbitMQ helps us to implement a micro Remote Process Call (RPC) framework. We send message to the queue and consumer receive and handle it. After then sends result back into RabbitMQ. Finally, we complete a RPC procedure.

To avoid declaring exchange and queue repeat, we take exchange as key to prevent it. And Creating callback queue randomly by RabbitMQ. Before we send message to RabbitMQ, we start a consume wait result from `server` in callback queue. In _call method, we using uuid to generate a unique key for this call and a Future to wait result asynchronous. Apart from basic feature, the rpc provider timeout to prevent unexpected condition from server.

--

--

Feng Gao

A software developer in Microsoft at Suzhou. Most articles spoken language is Chinese. I will try with English when I’m ready