o
    ?Zc߃                     @   s  d Z ddlZddlZddlZddlZddlmZ ddlmZmZm	Z	 ddl
mZ ddlmZ ddlmZ ddlmZmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ ddlmZm Z  ddl!m"Z" ddl#m$Z$ ddl%m&Z& dZ'dZ(dZ)dZ*dZ+dZ,ee-Z.e	ddZ/e	ddZ0G dd dZ1G dd  d e2Z3G d!d" d"e4Z5G d#d$ d$Z6G d%d& d&Z7G d'd( d(ej8Z8G d)d* d*Z9G d+d, d,e9ej:Z;G d-d. d.ej<Z<G d/d0 d0ej=Z=dS )1zPVirtual transport implementation.

Emulates the AMQ API for non-AMQ transports.
    N)array)OrderedDictdefaultdict
namedtuple)count)Finalize)Empty)	monotonicsleep)queue_declare_ok_t)ChannelErrorResourceError)
get_logger)base)emergency_dump_state)bytes_to_strstr_to_bytes)	FairCycleuuid   )STANDARD_EXCHANGE_TYPESHzlMessage could not be delivered: No queues bound to exchange {exchange!r} using binding key {routing_key!r}.
zkCannot redeclare exchange {0!r} in vhost {1!r} with different type, durable, autodelete or arguments value.z;Requeuing undeliverable message for queue %r: No consumers.z)Restoring {0!r} unacknowledged message(s)z#UNABLE TO RESTORE {0} MESSAGES: {1}binding_key_t)queueexchangerouting_keyqueue_binding_t)r   r   	argumentsc                   @   s    e Zd ZdZdd Zdd ZdS )Base64zBase64 codec.c                 C   s   t tt|S N)r   base64	b64encoder   selfs r&   R/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/virtual/base.pyencode@   s   zBase64.encodec                 C   s   t t|S r    )r!   	b64decoder   r#   r&   r&   r'   decodeC      zBase64.decodeN)__name__
__module____qualname____doc__r(   r*   r&   r&   r&   r'   r   =   s    r   c                   @      e Zd ZdZdS )NotEquivalentErrorzAEntity declaration is not equivalent to the previous declaration.Nr,   r-   r.   r/   r&   r&   r&   r'   r1   G       r1   c                   @   r0   )UndeliverableWarningz.The message could not be delivered to a queue.Nr2   r&   r&   r&   r'   r4   K   r3   r4   c                   @   sV   e Zd ZdZdZdZdZdddZdd Zdd Z	d	d
 Z
dd Zdd Zdd ZdS )BrokerStatez2Broker state holds exchanges, queues and bindings.Nc                 C   s&   |d u ri n|| _ i | _tt| _d S r    )	exchangesbindingsr   setqueue_index)r$   r6   r&   r&   r'   __init__l   s   zBrokerState.__init__c                 C   s"   | j   | j  | j  d S r    )r6   clearr7   r9   r$   r&   r&   r'   r;   q   s   

zBrokerState.clearc                 C   s   |||f| j v S r    )r7   )r$   r   r   r   r&   r&   r'   has_bindingv   s   zBrokerState.has_bindingc                 C   s.   t |||}| j|| | j| | d S r    )r   r7   
setdefaultr9   add)r$   r   r   r   r   keyr&   r&   r'   binding_declarey   s   zBrokerState.binding_declarec                 C   sB   t |||}z| j|= W n
 ty   Y d S w | j| | d S r    )r   r7   KeyErrorr9   remove)r$   r   r   r   r@   r&   r&   r'   binding_delete~   s   zBrokerState.binding_deletec                    s<   z j |}W n
 ty   Y d S w  fdd|D  d S )Nc                    s   g | ]	} j |d qS r    )r7   pop).0bindingr<   r&   r'   
<listcomp>   s    z5BrokerState.queue_bindings_delete.<locals>.<listcomp>)r9   rE   rB   )r$   r   r7   r&   r<   r'   queue_bindings_delete   s   z!BrokerState.queue_bindings_deletec                    s    fdd j | D S )Nc                 3   s&    | ]}t |j|j j| V  qd S r    )r   r   r   r7   )rF   r@   r<   r&   r'   	<genexpr>   s
    
z-BrokerState.queue_bindings.<locals>.<genexpr>)r9   r$   r   r&   r<   r'   queue_bindings   s   
zBrokerState.queue_bindingsr    )r,   r-   r.   r/   r6   r7   r9   r:   r;   r=   rA   rD   rI   rL   r&   r&   r&   r'   r5   O   s    

	r5   c                   @   s~   e Zd ZdZdZdZdZdZdddZdd Z	d	d
 Z
dd Zdd Zdd Zdd ZdddZdd ZdddZdd ZdS )QoSzQuality of Service guarantees.

    Only supports `prefetch_count` at this point.

    Arguments:
        channel (ChannelT): Connection channel.
        prefetch_count (int): Initial prefetch count (defaults to 0).
    r   NTc                 C   sR   || _ |pd| _t | _d| j_t | _| jj| _| jj	| _
t| | jdd| _d S )Nr   Fr   )exitpriority)channelprefetch_countr   
_deliveredrestoredr8   _dirtyr?   
_quick_ack__setitem___quick_appendr   restore_unacked_once_on_collect)r$   rO   rP   r&   r&   r'   r:      s   


zQoS.__init__c                 C   s$   | j }| pt| jt| j |k S )zReturn true if the channel can be consumed from.

        Used to ensure the client adhers to currently active
        prefetch limits.
        )rP   lenrQ   rS   r$   pcountr&   r&   r'   can_consume   s   zQoS.can_consumec                 C   s,   | j }|rt|t| jt| j  dS dS )a  Return the maximum number of messages allowed to be returned.

        Returns an estimated number of messages that a consumer may be allowed
        to consume at once from the broker.  This is used for services where
        bulk 'get message' calls are preferred to many individual 'get message'
        calls - like SQS.

        Returns:
            int: greater than zero.
        r   N)rP   maxrY   rQ   rS   rZ   r&   r&   r'   can_consume_max_estimate   s   zQoS.can_consume_max_estimatec                 C   s   | j r|   | || dS )z&Append message to transactional state.N)rS   _flushrV   )r$   messagedelivery_tagr&   r&   r'   append   s   z
QoS.appendc                 C   s
   | j | S r    )rQ   r$   ra   r&   r&   r'   get      
zQoS.getc                 C   s>   | j }| j}	 z| }W n
 ty   Y dS w ||d q)z'Flush dirty (acked/rejected) tags from.r   N)rS   rQ   rE   rB   )r$   dirty	delivered	dirty_tagr&   r&   r'   r_      s   z
QoS._flushc                 C      |  | dS )z8Acknowledge message and remove from transactional state.N)rT   rc   r&   r&   r'   ack   s   zQoS.ackFc                 C   s$   |r| j | j|  | | dS )z4Remove from transactional state and requeue message.N)rO   _restore_at_beginningrQ   rT   r$   ra   requeuer&   r&   r'   reject   s   z
QoS.rejectc              
   C   s   |    | j}g }| jj}|j}|rEz| \}}W n	 ty"   Y n#w z|| W n tyB } z|||f W Y d}~nd}~ww |s|  |S )z$Restore all unacknowledged messages.N)	r_   rQ   rO   _restorepopitemrB   BaseExceptionrb   r;   )r$   rg   errorsrestorepop_message_r`   excr&   r&   r'   restore_unacked   s(   
zQoS.restore_unackedc                 C   s   | j   |   |du rtjn|}| j}| jr| jjsdS t	|ddr*|r(J dS z@|r_t
tt| j|d |  }|rett| \}}t
tt|||d t||d W d|_dS W d|_dS W d|_dS d|_w )zRestore all unacknowledged messages at shutdown/gc collect.

        Note:
            Can only be called once for each instance, subsequent
            calls will be ignored.
        NrR   )file)stderrT)rX   cancelr_   sysry   rQ   restore_at_shutdownrO   
do_restoregetattrprintRESTORING_FMTformatrY   rw   listzipRESTORE_PANIC_FMTr   rR   )r$   ry   state
unrestoredrr   messagesr&   r&   r'   rW   
  s4   


zQoS.restore_unacked_oncec                 O      dS )zRestore any pending unackwnowledged messages.

        To be filled in for visibility_timeout style implementations.

        Note:
            This is implementation optional, and currently only
            used by the Redis transport.
        Nr&   )r$   argskwargsr&   r&   r'   restore_visible)      zQoS.restore_visible)r   Fr    )r,   r-   r.   r/   rP   rQ   rS   r|   r:   r\   r^   rb   rd   r_   rj   rn   rw   rW   r   r&   r&   r&   r'   rM      s"    

	

rM   c                       s*   e Zd ZdZd fdd	Zdd Z  ZS )MessagezMessage object.Nc                    st   || _ |d }|d}|r|||d}t jd|||d |d|d|d||dd	d
	| d S )N
propertiesbodybody_encodingra   content-typecontent-encodingheadersdelivery_infozutf-8)	r   rO   ra   content_typecontent_encodingr   r   r   
postencoder&   )_rawrd   decode_bodysuperr:   )r$   payloadrO   r   r   r   	__class__r&   r'   r:   7  s$   


zMessage.__init__c                 C   sJ   | j }| j| j|d\}}t| j}|dd  ||| j| j	|dS )Nr   compression)r   r   r   r   r   )
r   rO   encode_bodyr   rd   dictr   rE   r   r   )r$   propsr   ru   r   r&   r&   r'   serializableI  s   

zMessage.serializabler    )r,   r-   r.   r/   r:   r   __classcell__r&   r&   r   r'   r   4  s    r   c                   @   s\   e Zd ZdZdddZdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dddZdd ZdS )AbstractChannelzAbstract channel interface.

    This is an abstract class defining the channel methods
    you'd usually want to implement in a virtual channel.

    Note:
        Do not subclass directly, but rather inherit
        from :class:`Channel`.
    Nc                 C      t d)zGet next message from `queue`.z$Virtual channels must implement _getNotImplementedError)r$   r   timeoutr&   r&   r'   _getd     zAbstractChannel._getc                 C   r   )zPut `message` onto `queue`.z$Virtual channels must implement _putr   )r$   r   r`   r&   r&   r'   _puth  r   zAbstractChannel._putc                 C   r   )z!Remove all messages from `queue`.z&Virtual channels must implement _purger   rK   r&   r&   r'   _purgel  r   zAbstractChannel._purgec                 C   r   )z<Return the number of messages in `queue` as an :class:`int`.r   r&   rK   r&   r&   r'   _sizep  s   zAbstractChannel._sizec                 O   ri   )zDelete `queue`.

        Note:
            This just purges the queue, if you need to do more you can
            override this method.
        Nr   )r$   r   r   r   r&   r&   r'   _deletet  s   zAbstractChannel._deletec                 K   r   )zCreate new queue.

        Note:
            Your transport can override this method if it needs
            to do something whenever a new queue is declared.
        Nr&   r$   r   r   r&   r&   r'   
_new_queue}  r   zAbstractChannel._new_queuec                 K   r   )zVerify that queue exists.

        Returns:
            bool: Should return :const:`True` if the queue exists
                or :const:`False` otherwise.
        Tr&   r   r&   r&   r'   
_has_queue  s   zAbstractChannel._has_queuec                 C   s
   | |S )z-Poll a list of queues for available messages.)rd   )r$   cyclecallbackr   r&   r&   r'   _poll     
zAbstractChannel._pollc                 C   s   |  |}||| d S r    )r   )r$   r   r   r`   r&   r&   r'   _get_and_deliver  s   
z AbstractChannel._get_and_deliverr    )r,   r-   r.   r/   r   r   r   r   r   r   r   r   r   r&   r&   r&   r'   r   Y  s    

	
	r   c                   @   s  e Zd ZdZeZeZdZeeZ	dZ
de iZdZedZdZdZdZdZd	Zd
d Z			dbddZdcddZddddZdcddZdd Z		deddZ		deddZ		dfddZ		dfddZd d! Zd"d# Z d$d% Z!d&d' Z"d(d) Z#d*d+ Z$d,d- Z%dgd.d/Z&dgd0d1Z'dgd2d3Z(dgd4d5Z)		dhd6d7Z*d8d9 Z+d:d; Z,did<d=Z-djd>d?Z.d@dA Z/dBdC Z0dkdDdEZ1dFdG Z2		dldHdIZ3dmdJdKZ4dLdM Z5djdNdOZ6djdPdQZ7dRdS Z8dTdU Z9dVdW Z:e;dXdY Z<e;dZd[ Z=e;d\d] Z>dgd^d_Z?d`da Z@dS )nChannelzVirtual channel.

    Arguments:
        connection (ConnectionT): The transport instance this
            channel is part of.
    TFr!   r   N)r   deadletter_queuer   	   c              	      s   | _ t  _d  _i  _g  _d  _d _ fdd j	 D  _ 
  _ j jj} jD ]}z
t |||  W q0 tyE   Y q0w d S )NFc                    s   i | ]	\}}|| qS r&   r&   )rF   typclsr<   r&   r'   
<dictcomp>  s    z$Channel.__init__.<locals>.<dictcomp>)
connectionr8   
_consumers_cycle_tag_to_queue_active_queues_qosclosedexchange_typesitems_get_free_channel_id
channel_idclienttransport_optionsfrom_transport_optionssetattrrB   )r$   r   r   toptsopt_namer&   r<   r'   r:     s&   



zChannel.__init__directc           	   	   C   s   |pd}|p	d| }|r$|| j jvr"td|| jjjpdddddS z#| j j| }| |||||||sEt	t
|| jjjpBdW dS  ty_   ||||pTi g d	| j j|< Y dS w )
zDeclare exchange.r   zamq.%sz*NOT_FOUND - no exchange {!r} in vhost {!r}/2   
   Channel.exchange_declare404N)typedurableauto_deleter   table)r   r6   r   r   r   r   virtual_hosttypeof
equivalentr1   NOT_EQUIVALENT_FMTrB   )	r$   r   r   r   r   r   nowaitpassiveprevr&   r&   r'   exchange_declare  s:   r   c                 C   s:   |  |D ]\}}}| j|ddd q| jj|d dS )z'Delete `exchange` and all its bindings.T)	if_unusedif_emptyN)	get_tablequeue_deleter   r6   rE   )r$   r   r   r   rkeyru   r   r&   r&   r'   exchange_delete  s   zChannel.exchange_deletec                 K   sh   |pdt   }|r"| j|fi |s"td|| jjjpdddd| j|fi | t|| 	|dS )zDeclare queue.z
amq.gen-%sz'NOT_FOUND - no queue {!r} in vhost {!r}r   r   Channel.queue_declarer   r   )
r   r   r   r   r   r   r   r   r   r   )r$   r   r   r   r&   r&   r'   queue_declare   s   r   c           	      K   sj   |r	|  |r	dS | j|D ]\}}}| |||||}| j||g|R i | q| j| dS )zDelete queue.N)r   r   rL   r   prepare_bindr   rI   )	r$   r   r   r   r   r   r   r   metar&   r&   r'   r     s   
zChannel.queue_deletec                 C   s   |  | d S r    )r   rK   r&   r&   r'   after_reply_message_received  r+   z$Channel.after_reply_message_received c                 C   r   )Nz(transport does not support exchange_bindr   r$   destinationsourcer   r   r   r&   r&   r'   exchange_bind  r   zChannel.exchange_bindc                 C   r   )Nz*transport does not support exchange_unbindr   r   r&   r&   r'   exchange_unbind  r   zChannel.exchange_unbindc                 K   s   |pd}| j |||rdS | j |||| | j j| dg }| |||||}|| | jr?| j	|g|R   dS dS )z.Bind `queue` to `exchange` with `routing key`.z
amq.directNr   )
r   r=   rA   r6   r>   r   r   rb   supports_fanout_queue_bind)r$   r   r   r   r   r   r   r   r&   r&   r'   
queue_bind#  s   

zChannel.queue_bindc                    sh   | j ||| z| |}W n
 ty   Y d S w | |||||  fdd|D |d d < d S )Nc                    s   g | ]}| kr|qS r&   r&   )rF   r   binding_metar&   r'   rH   A  s    z(Channel.queue_unbind.<locals>.<listcomp>)r   rD   r   rB   r   r   )r$   r   r   r   r   r   r   r&   r   r'   queue_unbind4  s   
zChannel.queue_unbindc                    s    fdd j jD S )Nc                 3   s0    | ]}  |D ]\}}}|||fV  q	qd S r    )r   )rF   r   r   patternr   r<   r&   r'   rJ   D  s    z(Channel.list_bindings.<locals>.<genexpr>r   r6   r<   r&   r<   r'   list_bindingsC  s   
zChannel.list_bindingsc                 K   
   |  |S )z%Remove all ready messages from queue.r   r   r&   r&   r'   queue_purgeH  r   zChannel.queue_purgec                 C   s   t  S r    r   r<   r&   r&   r'   _next_delivery_tagL  s   zChannel._next_delivery_tagc                 K   sB   |  ||| |r| |j|||fi |S | j||fi |S )zPublish message.)_inplace_augment_messager   deliverr   )r$   r`   r   r   r   r&   r&   r'   basic_publishO  s   
zChannel.basic_publishc                 C   sJ   |  |d | j\|d< }|d }|j||  d |d j||d d S )Nr   r   )r   ra   r   r   r   )r   r   updater   )r$   r`   r   r   r   r   r&   r&   r'   r   Y  s   

z Channel._inplace_augment_messagec                    sJ   |j |< j|  fdd}|jj|< j|   dS )zConsume from `queue`.c                    s*   j | d}sj||j  |S )NrO   )r   qosrb   ra   )raw_messager`   r   no_ackr$   r&   r'   	_callbackl  s   z(Channel.basic_consume.<locals>._callbackN)r   r   rb   r   
_callbacksr   r?   _reset_cycle)r$   r   r	  r   consumer_tagr   r
  r&   r  r'   basic_consumeg  s   
zChannel.basic_consumec                 C   sh   || j v r2| j | |   | j|d}z| j| W n	 ty'   Y nw | jj|d dS dS )z Cancel consumer by consumer tag.N)	r   rC   r  r   rE   r   
ValueErrorr   r  )r$   r  r   r&   r&   r'   basic_cancelw  s   
zChannel.basic_cancelc                 K   sD   z| j | || d}|s| j||j |W S  ty!   Y dS w )z+Get message by direct access (synchronous).r  N)r   r   r  rb   ra   r   )r$   r   r	  r   r`   r&   r&   r'   	basic_get  s   zChannel.basic_getc                 C   s   | j | dS )zAcknowledge message.N)r  rj   )r$   ra   multipler&   r&   r'   	basic_ack     zChannel.basic_ackc                 C   s   |r| j  S td)zRecover unacked messages.z'Does not support recover(requeue=False))r  rw   r   )r$   rm   r&   r&   r'   basic_recover  s   
zChannel.basic_recoverc                 C   s   | j j||d dS )zReject message.rm   N)r  rn   rl   r&   r&   r'   basic_reject  s   zChannel.basic_rejectc                 C   s   || j _dS )zmChange QoS settings for this channel.

        Note:
            Only `prefetch_count` is supported.
        N)r  rP   )r$   prefetch_sizerP   apply_globalr&   r&   r'   	basic_qos  s   zChannel.basic_qosc                 C   s   t | jjS r    )r   r   r6   r<   r&   r&   r'   get_exchanges     zChannel.get_exchangesc                 C   s   | j j| d S )z%Get table of bindings for `exchange`.r   r   )r$   r   r&   r&   r'   r     r  zChannel.get_tablec                 C   s6   z
| j j| d }W n ty   |}Y nw | j| S )z.Get the exchange type instance for `exchange`.r   )r   r6   rB   r   )r$   r   defaultr   r&   r&   r'   r     s   
zChannel.typeofc                 C   s   |du r| j }|s|p|gS z| || ||||}W n ty)   g }Y nw |sD|durDtttj	||d | 
| |g}|S )zFind all queues matching `routing_key` for the given `exchange`.

        Returns:
            str: queue name -- must return the string `default`
                if no queues matched.
        Nr  )r   r   lookupr   rB   warningswarnr4   UNDELIVERABLE_FMTr   r   )r$   r   r   r  Rr&   r&   r'   _lookup  s&   




zChannel._lookupc                 C   s@   |j }| }d|d< | |d |d D ]}| || qdS )z.Redeliver message to its original destination.Tredeliveredr   r   N)r   r   r#  r   )r$   r`   r   r   r&   r&   r'   ro     s   zChannel._restorec                 C   r   r    )ro   )r$   r`   r&   r&   r'   rk     re   zChannel._restore_at_beginningc                 C   sN   |p| j j}| jr$| j r$t| dr| j| j|dS | j| j	||dS t
 )N	_get_manyr   )r   _deliverr   r  r\   hasattrr%  r   r   r   r   )r$   r   r   r&   r&   r'   drain_events  s   
zChannel.drain_eventsc                 C   s   t || js| j|| dS |S )z1Convert raw message to :class:`Message` instance.)r   rO   )
isinstancer   )r$   r  r&   r&   r'   message_to_python  s   zChannel.message_to_pythonc                 C   s>   |pi }| di  | d|p| j ||||pi |pi dS )zPrepare message data.r   priority)r   r   r   r   r   )r>   default_priority)r$   r   r,  r   r   r   r   r&   r&   r'   prepare_message  s   zChannel.prepare_messagec                 C   r   )zEnable/disable message flow.

        Raises:
            NotImplementedError: as flow
                is not implemented by the base virtual implementation.
        z%virtual channels do not support flow.r   )r$   activer&   r&   r'   flow  s   zChannel.flowc                 C   sp   | j s3d| _ t| jD ]}| | q| jr| j  | jdur(| j  d| _| jdur3| j	|  d| _
dS )zTClose channel.

        Cancel all consumers, and requeue unacked messages.
        TN)r   r   r   r  r   rW   r   closer   close_channelr   )r$   consumerr&   r&   r'   r1    s   




zChannel.closec                 C   s"   |r| j |||fS ||fS r    )codecsrd   r(   r$   r   encodingr&   r&   r'   r     s   zChannel.encode_bodyc                 C   s   |r| j ||S |S r    )r4  rd   r*   r5  r&   r&   r'   r     s   zChannel.decode_bodyc                 C   s   t | j| jt| _d S r    )r   r   r   r   r   r<   r&   r&   r'   r    s   

zChannel._reset_cyclec                 C   s   | S r    r&   r<   r&   r&   r'   	__enter__  s   zChannel.__enter__c                 G   s   |    d S r    )r1  )r$   exc_infor&   r&   r'   __exit__"  r  zChannel.__exit__c                 C   s   | j jS )z/Broker state containing exchanges and bindings.)r   r   r<   r&   r&   r'   r   %  s   zChannel.statec                 C   s   | j du r| | | _ | j S )z&:class:`QoS` manager for this channel.N)r   rM   r<   r&   r&   r'   r  *  s   
zChannel.qosc                 C   s   | j d u r	|   | j S r    )r   r  r<   r&   r&   r'   r   1  s   
zChannel.cyclec              
   C   sV   zt tt|d d | j| j}W n tttfy!   | j}Y nw |r)| j| S |S )zGet priority from message.

        The value is limited to within a boundary of 0 to 9.

        Note:
            Higher value has more priority.
        r   r,  )	r]   minintmax_prioritymin_priority	TypeErrorr  rB   r-  )r$   r`   reverser,  r&   r&   r'   _get_message_priority7  s   
zChannel._get_message_priorityc                 C   s`   t | jj}td| jjd D ]}||vr | jj| |  S qtdt| jj	| jjd)Nr   z/No free channel ids, current={}, channel_max={})   r   )
r8   r   _used_channel_idsrangechannel_maxrb   r   r   rY   channels)r$   used_channel_idsr   r&   r&   r'   r   J  s   
zChannel._get_free_channel_id)Nr   FFNFF)FF)NF)r   r   FN)Nr   Nr   )r   r   F)r   r    )NN)NNNNN)T)Ar,   r-   r.   r/   r   rM   r}   r   r   r   r   r   r4  r   r   _delivery_tagsr   r   r-  r=  r<  r:   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r  r  r  r  r  r  r  r  r   r   r#  ro   rk   r)  r+  r.  r0  r1  r   r   r  r7  r9  propertyr   r  r   r@  r   r&   r&   r&   r'   r     s    















	

	


	





r   c                       s0   e Zd ZdZ fddZdd Zdd Z  ZS )
Managementz'Base class for the AMQP management API.c                    s   t  | |j | _d S r    )r   r:   r   rO   )r$   	transportr   r&   r'   r:   ^  s   zManagement.__init__c                 C   s   dd | j  D S )Nc                 S   s   g | ]\}}}|||d qS ))r   r   r   r&   )rF   qerr&   r&   r'   rH   c  s    z+Management.get_bindings.<locals>.<listcomp>)rO   r   r<   r&   r&   r'   get_bindingsb  s   zManagement.get_bindingsc                 C   s   | j   d S r    )rO   r1  r<   r&   r&   r'   r1  f  r+   zManagement.close)r,   r-   r.   r/   r:   rN  r1  r   r&   r&   r   r'   rI  [  s
    rI  c                   @   s   e Zd ZdZeZeZeZdZdZ	dZ
dZdZdZejjjdeddgddZd	d
 Zdd Zdd Zdd Zdd ZdddZdd Zdd Zdd ZdddZedd ZdS ) 	TransportznVirtual transport.

    Arguments:
        client (kombu.Connection): The client this is a transport for.
    Ng      ?i  Fr   topic)asynchronousexchange_type
heartbeatsc                 K   s\   || _ t | _g | _g | _i | _| | j| jt| _	|j
d}|d ur'|| _tt| _d S )Npolling_interval)r   r5   r   rE  _avail_channelsr  Cycle_drain_channelr   r   r   rd   rT  r   ARRAY_TYPE_HrB  )r$   r   r   rT  r&   r&   r'   r:     s   zTransport.__init__c                 C   s:   z| j  W S  ty   | |}| j| | Y S w r    )rU  rE   
IndexErrorr   rE  rb   )r$   r   rO   r&   r&   r'   create_channel  s   
zTransport.create_channelc                 C   sl   z1z	| j |j W n	 ty   Y nw z| j| W n	 ty%   Y nw W d |_d S W d |_d S d |_w r    )rB  rC   r   r  rE  r   )r$   rO   r&   r&   r'   r2    s   
zTransport.close_channelc                 C   s   | j | |  | S r    )rU  rb   rZ  r<   r&   r&   r'   establish_connection  s   zTransport.establish_connectionc              	   C   sP   | j   | j| jfD ]}|r%z| }W n	 ty   Y nw |  |sqd S r    )r   r1  rU  rE  rE   LookupError)r$   r   	chan_listrO   r&   r&   r'   close_connection  s   
zTransport.close_connectionc                 C   s   t  }| jj}| j}|r|r||kr|}	 z
|| j|d W d S  ty?   |d ur5t  | |kr5t |d ur=t| Y nw q)Nr   r&  )	r	   r   rd   rT  r'  r   socketr   r
   )r$   r   r   
time_startrd   rT  r&   r&   r'   r)    s"   zTransport.drain_eventsc                 C   sX   |s	t d|z| j| }W n t y%   tt| | | Y d S w || d S )Nz.Received message without destination queue: {})rB   r   r  loggerwarningW_NO_CONSUMERS_reject_inbound_message)r$   r`   r   r   r&   r&   r'   r'    s   zTransport._deliverc                 C   sH   | j D ]}|r!|j||d}|j||j |j|jdd  d S qd S )Nr  Tr  )rE  r   r  rb   ra   r  )r$   r  rO   r`   r&   r&   r'   rd    s   
z!Transport._reject_inbound_messagec                 C   s0   |r|| j vrtd||| j | | d S )Nz,Message for queue {!r} without consumers: {})r  rB   r   )r$   rO   r`   r   r&   r&   r'   on_message_ready  s   zTransport.on_message_readyc                 C   s   |j ||dS )N)r   r   )r)  )r$   rO   r   r   r&   r&   r'   rW    r+   zTransport._drain_channelc                 C   s   | j ddS )N	localhost)porthostname)default_portr<   r&   r&   r'   default_connection_params  s   z#Transport.default_connection_paramsr    )r,   r-   r.   r/   r   r   rV  rI  r   ri  rE  r  rT  rD  r   rO  
implementsextend	frozensetr:   rZ  r2  r[  r^  r)  r'  rd  re  rW  rH  rj  r&   r&   r&   r'   rO  j  s8    


rO  )>r/   r!   r_  r{   r  r   collectionsr   r   r   	itertoolsr   multiprocessing.utilr   r   r   timer	   r
   amqp.protocolr   kombu.exceptionsr   r   	kombu.logr   kombu.transportr   kombu.utils.divr   kombu.utils.encodingr   r   kombu.utils.schedulingr   kombu.utils.uuidr   r   r   rX  r!  r   rc  r   r   r,   ra  r   r   r   	Exceptionr1   UserWarningr4   r5   rM   r   r   
StdChannelr   rI  rO  r&   r&   r&   r'   <module>   sV    


G %>   G