o
    ?Zc$                     @   s   d Z ddlZddlmZ ddlmZ ddlmZ ddlmZ dZ	d	Z
G d
d dejZG dd dejejZG dd dejZG dd dejZG dd deZdS )a  pyamqp transport module for Kombu.

Pure-Python amqp transport using py-amqp library.

Features
========
* Type: Native
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
Connection string can have the following formats:

.. code-block::

    amqp://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    [USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]
    amqp://

For TLS encryption use:

.. code-block::

    amqps://[USER:PASSWORD@]BROKER_ADDRESS[:PORT][/VIRTUALHOST]

Transport Options
=================
Transport Options are passed to constructor of underlying py-amqp
:class:`~kombu.connection.Connection` class.

Using TLS
=========
Transport over TLS can be enabled by ``ssl`` parameter of
:class:`~kombu.Connection` class. By setting ``ssl=True``, TLS transport is
used::

    conn = Connect('amqp://', ssl=True)

This is equivalent to ``amqps://`` transport URI::

    conn = Connect('amqps://')

For adding additional parameters to underlying TLS, ``ssl`` parameter should
be set with dict instead of True::

    conn = Connect('amqp://broker.example.com', ssl={
            'keyfile': '/path/to/keyfile'
            'certfile': '/path/to/certfile',
            'ca_certs': '/path/to/ca_certfile'
        }
    )

All parameters are passed to ``ssl`` parameter of
:class:`amqp.connection.Connection` class.

SSL option ``server_hostname`` can be set to ``None`` which is causing using
hostname from broker URL. This is usefull when failover is used to fill
``server_hostname`` with currently used broker::

    conn = Connect('amqp://broker1.example.com;broker2.example.com', ssl={
            'server_hostname': None
        }
    )
    N)get_manager)version_string_as_tuple   )baseto_rabbitmq_queue_argumentsi(  i'  c                       s"   e Zd ZdZd fdd	Z  ZS )MessagezAMQP Message.Nc                    sL   |j }t jd|j||j|d|d|j|j |dpi d| d S )Ncontent_typecontent_encodingapplication_headers)bodychanneldelivery_tagr	   r
   delivery_info
propertiesheaders )r   super__init__r   r   getr   )selfmsgr   kwargsprops	__class__r   L/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/pyamqp.pyr   V   s   	
zMessage.__init__N__name__
__module____qualname____doc__r   __classcell__r   r   r   r   r   S   s    r   c                   @   s<   e Zd ZdZeZdddddejfddZdd Zdd ZdS )	ChannelzAMQP Channel.Nc                 C   s   ||f||||d|pi S )z<Prepare message so that it can be sent using this transport.)priorityr	   r
   r   r   )r   r   r%   r	   r
   r   r   _Messager   r   r   prepare_messagei   s   zChannel.prepare_messagec                 K   s   t |fi |S r   r   )r   	argumentsr   r   r   r   prepare_queue_argumentsv      zChannel.prepare_queue_argumentsc                 C   s   | j || dS )z4Convert encoded message body back to a Python value.r   )r   )r   raw_messager   r   r   message_to_pythony   s   zChannel.message_to_python)	r   r    r!   r"   r   amqpr'   r)   r-   r   r   r   r   r$   d   s    
r$   c                   @   s   e Zd ZdZeZdS )
ConnectionzAMQP Connection.N)r   r    r!   r"   r$   r   r   r   r   r/   ~   s    r/   c                   @   s   e Zd ZdZeZeZeZe	jj
Z
e	jjZe	jjZe	jjZdZdZejjjdddZ	d$ddZd	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd%ddZdd Ze d d! Z!d"d# Z"dS )&	TransportzAMQP Transport.zpy-amqpr.   T)asynchronous
heartbeatsNc                 K   s"   || _ |p| j| _|p| j| _d S r   )clientdefault_portdefault_ssl_port)r   r3   r4   r5   r   r   r   r   r      s   zTransport.__init__c                 C   s   t jS r   )r.   __version__r   r   r   r   driver_version      zTransport.driver_versionc                 C   s   |  S r   r+   r   
connectionr   r   r   create_channel   s   zTransport.create_channelc                 K   s   |j di |S )Nr   )drain_events)r   r;   r   r   r   r   r=      r*   zTransport.drain_eventsc                 C   s   |d ur
|   d S d S r   )collectr:   r   r   r   _collect   s   zTransport._collectc                 C   s   | j }| j D ]\}}t||dst||| q|jdkr!d|_t|jtr9d|jv r9|jd du r9|j|jd< t|j	|j
|j|j|j|j|j|j|jd	fi |jpTi }| jdi |}| j |_ |  |S )z(Establish connection to the AMQP broker.N	localhostz	127.0.0.1server_hostname)	hostuseridpasswordlogin_methodvirtual_hostinsistsslconnect_timeout	heartbeatr   )r3   default_connection_paramsitemsgetattrsetattrhostname
isinstancerH   dictrB   rC   rD   rE   rF   rG   rI   rJ   transport_optionsr/   connect)r   conninfonamedefault_valueoptsconnr   r   r   establish_connection   s<   

zTransport.establish_connectionc                 C      |j S r   )	connectedr:   r   r   r   verify_connection   r9   zTransport.verify_connectionc                 C   s   d|_ |  dS )z!Close the AMQP broker connection.N)r3   closer:   r   r   r   close_connection   s   zTransport.close_connectionc                 C   rZ   r   )rJ   r:   r   r   r   get_heartbeat_interval   r9   z Transport.get_heartbeat_intervalc                 C   s    d|j _||j| j|| d S NT)	transportraise_on_initial_eintr
add_readersockon_readable)r   r;   loopr   r   r   register_with_event_loop   s   z"Transport.register_with_event_loop   c                 C   s   |j |dS )N)rate)heartbeat_tick)r   r;   ri   r   r   r   heartbeat_check   s   zTransport.heartbeat_checkc                 C   s(   |j }|ddkrt|d dk S dS )NproductRabbitMQversion)   ro   T)server_propertiesr   r   )r   r;   r   r   r   r   qos_semantics_matches_spec   s   z$Transport.qos_semantics_matches_specc                 C   s    dd| j jr	| jn| jdddS )Nguestr@   PLAIN)rC   rD   portrO   rE   )r3   rH   r5   r4   r7   r   r   r   rK      s   z#Transport.default_connection_paramsc                 O   s   t | jg|R i |S r   )r   r3   r   argsr   r   r   r   r      s   zTransport.get_manager)NN)rh   )#r   r    r!   r"   r/   DEFAULT_PORTr4   DEFAULT_SSL_PORTr5   r.   connection_errorschannel_errorsrecoverable_connection_errorsrecoverable_channel_errorsdriver_namedriver_typer   r0   
implementsextendr   r8   r<   r=   r?   rY   r\   r^   r_   rg   rk   rq   propertyrK   r   r   r   r   r   r0      s@    



r0   c                       s    e Zd ZdZ fddZ  ZS )SSLTransportzAMQP SSL Transport.c                    s*   t  j|i | | jjsd| j_d S d S r`   )r   r   r3   rH   ru   r   r   r   r      s   zSSLTransport.__init__r   r   r   r   r   r      s    r   )r"   r.   kombu.utils.amq_managerr   kombu.utils.textr    r   r   rw   rx   r   r$   
StdChannelr/   r0   r   r   r   r   r   <module>   s    Fo