Use various rabbimq features with pika in python

Introduction

In the previous Handling rabbimq with python, I exchanged simple messages with RabbitMq, but since RabbitMq has various functions, this time it is various. Features and usage are summarized.

environment

Queue confirmation settings

With the standard pika settings, if the queue you want to send and receive does not exist in RabbitMq, it will be created automatically, but you may want to make an error without creating the queue automatically. In that case, giving True to the passive argument of the `queue_declare ()` function of the channel will result in an error if there is no queue.

Situation example

For example, when you want to manage all queues on the producer side and allow pure connections on the consumer side.

Producer with passive set to true

I'm just adding `passive = True` to the producer `channel.queue_declare ()` created as the previous example. If the queue exists, you can connect as it is, but if the queue does not exist, `pika.exceptions.ChannelClosedByBroker` will come up as an exception.

client_main.py



import pika

pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()

try:
    channel.queue_declare(queue='hello', passive=True)
except pika.exceptions.ChannelClosedByBroker as ex:
    print(ex)
    exit(1)

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')

connection.close()

Queue check

Now that the source is ready, let's run it. An exception has occurred. If you look at the exception that occurred, you can see that there is no hello queue.


PS C:\Users\xxxx\program\python\pika> python .\client_main.py
(404, "NOT_FOUND - no queue 'hello' in vhost '/'")

Exclusive connection settings

The standard pika setting accepts connections unconditionally, but exclusive control can be applied so that other connections are not accepted. In that case, if True is given to the exclusive argument of the `queue_declare ()` function of the channel, an error will occur when another connection is connected, so it can be used for checking the exclusion. If you close the connection, you can connect another connection.

Situation example

For example, when you do not want to accept messages from other consumers until you prepare the message on the consumer side and send it to RabbitMq.

Producer with exclusive connection settings enabled

channel.queue_declare()Exclusive to=I'm just adding True. This time, after connecting once, I make a new connection and connect without closing the connection. For connections that come later, pika.exceptions.ChannelClosedByBroker comes up as an exception.





#### **` client_main.py`**
```py


import pika

pika_param = pika.ConnectionParameters('localhost')
connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
channel.queue_declare(queue='hello', exclusive=True)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
channel.close()

connection = pika.BlockingConnection(pika_param)
channel = connection.channel()
try:
    channel.queue_declare(queue='hello')
except pika.exceptions.ChannelClosedByBroker as ex:
    print('other connection access fail')
    exit(1)
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
channel.close()

channel.close()Under connection.close()If you enter, it will end normally.



## Execution of exclusive connection
 Now that the source is ready, let's run it. An exception occurred when connecting to the second queue. If you view the exception that occurred, you can see that hello has an access lock.

PS C:\Users\xxxx\program\python\pika> python .\client_main.py other connection access fail (405, "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'hello' in vhost '/'. It could be originally declared on another connection or the exclusive property value does not match that of the original declaration.")



# Channel limit setting
 The standard pika settings allow you to create channels unconditionally, but you can set the upper limit of the channel. In that case, you can set the upper limit by giving an upper limit to the channel_max argument of the `` `pika.ConnectionParameters ()` `` function of pika. An error will occur if you try to create a channel that exceeds the limit.

## Situation example
 Since the server of RabbitMq is not abundant, when adjusting the number of channels generated, etc.

## Producer with channel caps enabled

#### **` pika.ConnectionParameters()Channel_max=I'm just adding 2. This time, the upper limit is set to 2, so 3 channels are created meaninglessly. As a result, pika could not be created for the third channel.exceptions.ConnectionClosedByBroker comes up as an exception.`**

client_main.py



import pika

pika_param = pika.ConnectionParameters('localhost', channel_max=2)
connection = pika.BlockingConnection(pika_param)

channel = connection.channel(1)
channel = connection.channel(2)
try:
    channel = connection.channel(3)
except pika.exceptions.ConnectionClosedByBroker as ex:
    print('channel crate error')
    print(ex)

Execution of channel cap

Now that the source is ready, let's run it. An exception occurred when creating the third channel. If you display the exception that occurred, you can see that the channel limit is 2.


PS C:\Users\xxxx\program\python\pika> python .\client_main.py
channel crate error
(530, 'NOT_ALLOWED - number of channels opened (2) has reached the negotiated channel_max (2)')

Retry settings

You can set the number of retries when the pika connection fails. In that case, you can set the number by giving the number of retries to the connection_attempts argument of the `pika.ConnectionParameters ()` function of pika.

Situation example

When the network to the RabbitMq server is unstable, etc.

Producer with retry settings enabled

pika.ConnectionParameters()Connect to_attempts=I'm just adding 2. This time, RabbitMq is dropped by making a log of pika so that you can see that it is retrying. Top logger.The xxx system is a setting for outputting the log in pika.





#### **` client_main.py`**
```py


import pika
import datetime
import logging 

logger = logging.getLogger('pika')
logger.setLevel(logging.ERROR)
logger.addHandler(logging.StreamHandler())
pika_param = pika.ConnectionParameters('localhost', connection_attempts=2)

try:
    print('start connect {}'.format(datetime.datetime.now()))
    connection = pika.BlockingConnection(pika_param)
except pika.exceptions.AMQPConnectionError as ex:
    print('connect error {}'.format(datetime.datetime.now()))
    print(ex)

Execute retry

Now that the source is ready, let's run it. It is difficult to understand because a large amount of logs are output, but similar errors have occurred 4 times (2 errors x set value retry (2 times)).


PS C:\Users\xxxx\program\python\pika> python .\client_main.py
start connect 2020-03-01 21:46:18.549268
Socket failed to connect: <socket.socket fd=936, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::', 38520, 0, 0)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET6: 23>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('::1', 5672, 0, 0))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=936, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 38524)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=812, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::', 38533, 0, 0)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET6: 23>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('::1', 5672, 0, 0))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Socket failed to connect: <socket.socket fd=812, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('0.0.0.0', 38535)>; error=10061 (Unknown error)
TCP Connection attempt failed: ConnectionRefusedError(10061, 'Unknown error'); dest=(<AddressFamily.AF_INET: 2>, <SocketKind.SOCK_STREAM: 1>, 6, '', ('127.0.0.1', 5672))
AMQPConnector - reporting failure: AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
AMQP connection workflow failed: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error').
AMQPConnectionWorkflow - reporting failure: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 'Unknown error')
Connection workflow failed: AMQPConnectionWorkflowFailed: 4 exceptions in all; last exception - AMQPConnectorSocketConnectError: 
ConnectionRefusedError(10061, 'Unknown error'); first exception - AMQPConnectorSocketConnectError: ConnectionRefusedError(10061, 
'Unknown error')
Error in _create_connection().
Traceback (most recent call last):
  File "C:\Users\minkl\AppData\Local\Programs\Python\Python36-32\lib\site-packages\pika\adapters\blocking_connection.py", line 450, in _create_connection
    raise self._reap_last_connection_workflow_error(error)
pika.exceptions.AMQPConnectionError
connect error 2020-03-01 21:46:28.665843

in conclusion

I tried using the functions of RabbitMq that seems to be used often. I think it has all the features I need, but I felt confused if I didn't understand and use which settings were queues, connections, or channels.

Recommended Posts

Use various rabbimq features with pika in python
Use rospy with virtualenv in Python3
Use Python in pyenv with NeoVim
Use OpenCV with Python 3 in Window
Use config.ini in Python
[Python] Use JSON with Python
Use dates in Python
Use Valgrind in Python
Use mecab with Python3
Use DynamoDB with Python
Use Python 3.8 with Anaconda
Handle rabbimq with python
Use python with docker
How to use tkinter with python in pyenv
Use Python in Anaconda environment with VS Code
Use profiler in Python
Use Trello API with python
Use "$ in" operator with mongo-go-driver
Use let expression in Python
Scraping with selenium in Python
Use Measurement Protocol in Python
Working with LibreOffice in Python
Scraping with chromedriver in python
Use callback function in Python
Use Twitter API with Python
Use parameter store in Python
Debugging with pdb in Python
Use HTTP cache in Python
Use TUN / TAP with Python
Use MongoDB ODM in Python
Working with sounds in Python
Use list-keyed dict in Python
Scraping with Selenium in Python
Use Random Forest in Python
Scraping with Tor in Python
Use Spyder in Python IDE
Tweet with image in Python
Manipulate various databases with Python
Combined with permutations in Python
Use subsonic API with python3
Number recognition in images with Python
Use PointGrey camera with Python (PyCapture2)
Testing with random numbers in Python
Use vl53l0x with Raspberry Pi (python)
GOTO in Python with Sublime Text 3
Working with LibreOffice in Python: import
Use fabric as is in python (fabric3)
CSS parsing with cssutils in Python
How to use SQLite in Python
[Python] Use Basic/Digest authentication with Flask
Numer0n with items made in Python
Use NAIF SPICE TOOLKIT with Python
Open UTF-8 with BOM in Python
How to use Mysql in python
How to use ChemSpider in Python
How to use FTP with Python
Use Windows 10 speech synthesis with Python
Heatmap with Dendrogram in Python + matplotlib
How to use PubChem in Python
Read files in parallel with Python
Password generation in texto with python