o
    ?Zc                     @   s   d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ d	Zd
efdefdefdefdefdZdd Zdd Zdd ZG dd dZG dd dZG dd deZedeg dddZG dd  d ZdS )!zBase transport interface.    N)RecoverableConnectionError)ChannelErrorConnectionError)Message)
dictfilter)cached_property)maybe_s_to_ms)r   
StdChannel
Management	Transportz	x-expireszx-message-ttlzx-max-lengthzx-max-length-byteszx-max-priority)expiresmessage_ttl
max_lengthmax_length_bytesmax_priorityc                 K   s2   t tdd | D }|rt| fi |S | S )a  Convert queue arguments to RabbitMQ queue arguments.

    This is the implementation for Channel.prepare_queue_arguments
    for AMQP-based transports.  It's used by both the pyamqp and librabbitmq
    transports.

    Arguments:
        arguments (Mapping):
            User-supplied arguments (``Queue.queue_arguments``).

    Keyword Arguments:
        expires (float): Queue expiry time in seconds.
            This will be converted to ``x-expires`` in int milliseconds.
        message_ttl (float): Message TTL in seconds.
            This will be converted to ``x-message-ttl`` in int milliseconds.
        max_length (int): Max queue length (in number of messages).
            This will be converted to ``x-max-length`` int.
        max_length_bytes (int): Max queue size in bytes.
            This will be converted to ``x-max-length-bytes`` int.
        max_priority (int): Max priority steps for queue.
            This will be converted to ``x-max-priority`` int.

    Returns:
        Dict: RabbitMQ compatible queue arguments.
    c                 s   s    | ]
\}}t ||V  qd S N)_to_rabbitmq_queue_argument).0keyvalue r   J/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/base.py	<genexpr>6   s
    
z.to_rabbitmq_queue_arguments.<locals>.<genexpr>)r   dictitems)	argumentsoptionspreparedr   r   r   to_rabbitmq_queue_arguments   s   

r   c                 C   s&   t |  \}}||d ur||fS |fS r   )RABBITMQ_QUEUE_ARGUMENTS)r   r   opttypr   r   r   r   =   s   r   c                 C   s   t d| j|S )Nz<Transport {0.__module__}.{0.__name__} does not implement {1})NotImplementedErrorformat	__class__)objmethodr   r   r   
_LeftBlankC   s
   r'   c                   @   sL   e Zd ZdZdZdd Zdd Zdd Zd	d
 Zdd Z	dd Z
dd ZdS )r	   zStandard channel base class.Nc                 O   "   ddl m} || g|R i |S )Nr   )Consumer)kombu.messagingr)   )selfargskwargsr)   r   r   r   r)   N      zStdChannel.Consumerc                 O   r(   )Nr   )Producer)r*   r/   )r+   r,   r-   r/   r   r   r   r/   R   r.   zStdChannel.Producerc                 C   
   t | dNget_bindingsr'   r+   r   r   r   r2   V      
zStdChannel.get_bindingsc                 C      dS )zCallback called after RPC reply received.

        Notes:
           Reply queue semantics: can be used to delete the queue
           after transient reply message received.
        Nr   )r+   queuer   r   r   after_reply_message_receivedY   s    z'StdChannel.after_reply_message_receivedc                 K   s   |S r   r   )r+   r   r-   r   r   r   prepare_queue_argumentsa      z"StdChannel.prepare_queue_argumentsc                 C   s   | S r   r   r4   r   r   r   	__enter__d   r:   zStdChannel.__enter__c                 G   s   |    d S r   )close)r+   exc_infor   r   r   __exit__g      zStdChannel.__exit__)__name__
__module____qualname____doc__no_ack_consumersr)   r/   r2   r8   r9   r;   r>   r   r   r   r   r	   I   s    r	   c                   @   s    e Zd ZdZdd Zdd ZdS )r
   z!AMQP Management API (incomplete).c                 C   
   || _ d S r   )	transport)r+   rF   r   r   r   __init__n   r5   zManagement.__init__c                 C   r0   r1   r3   r4   r   r   r   r2   q   r5   zManagement.get_bindingsN)r@   rA   rB   rC   rG   r2   r   r   r   r   r
   k   s    r
   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	
Implementsz/Helper class used to define transport features.c                 C   s"   z| | W S  t y   t|w r   )KeyErrorAttributeError)r+   r   r   r   r   __getattr__x   s
   
zImplements.__getattr__c                 C   s   || |< d S r   r   )r+   r   r   r   r   r   __setattr__~   r?   zImplements.__setattr__c                 K   s   | j | fi |S r   )r$   )r+   r-   r   r   r   extend   s   zImplements.extendN)r@   rA   rB   rC   rK   rL   rM   r   r   r   r   rH   u   s
    rH   F)directtopicfanoutheaders)asynchronousexchange_type
heartbeatsc                   @   s  e Zd ZdZeZdZdZdZefZ	e
fZdZdZdZe Zdd Zdd Zd	d
 Zdd Zdd Zdd Zd3ddZdd Zdd Zdd Zdd Zdd Zejej e!j"e!j#ffddZ$d d! Z%d"d# Z&d4d%e'd&e'fd'd(Z(e)d)d* Z*d+d, Z+e,d-d. Z-e)d/d0 Z.e)d1d2 Z/dS )5r   zBase class for transports.NFN/Ac                 K   rE   r   )client)r+   rV   r-   r   r   r   rG      r5   zTransport.__init__c                 C   r0   )Nestablish_connectionr3   r4   r   r   r   rW      r5   zTransport.establish_connectionc                 C   r0   )Nclose_connectionr3   r+   
connectionr   r   r   rX      r5   zTransport.close_connectionc                 C   r0   )Ncreate_channelr3   rY   r   r   r   r[      r5   zTransport.create_channelc                 C   r0   )Nclose_channelr3   rY   r   r   r   r\      r5   zTransport.close_channelc                 K   r0   )Ndrain_eventsr3   )r+   rZ   r-   r   r   r   r]      r5   zTransport.drain_events   c                 C      d S r   r   )r+   rZ   rater   r   r   heartbeat_check   r:   zTransport.heartbeat_checkc                 C   r6   )NrU   r   r4   r   r   r   driver_version   r:   zTransport.driver_versionc                 C   r6   )Nr   r   rY   r   r   r   get_heartbeat_interval   r:   z Transport.get_heartbeat_intervalc                 C   r_   r   r   r+   rZ   loopr   r   r   register_with_event_loop   r:   z"Transport.register_with_event_loopc                 C   r_   r   r   rd   r   r   r   unregister_from_event_loop   r:   z$Transport.unregister_from_event_loopc                 C   r6   NTr   rY   r   r   r   verify_connection   r:   zTransport.verify_connectionc                    s    j  fdd  S )Nc              
      sr   j stdzdd W n" y   Y d S  y0 } z|jv r+W Y d }~d S  d }~ww |  |  d S )NzSocket was disconnectedr   )timeout)	connectedr   errno	call_soon)re   exc_read_unavailrZ   r]   errorrj   r   r   rp      s   
z%Transport._make_reader.<locals>._read)r]   )r+   rZ   rj   rr   rq   r   ro   r   _make_reader   s   zTransport._make_readerc                 C   r6   rh   r   rY   r   r   r   qos_semantics_matches_spec   r:   z$Transport.qos_semantics_matches_specc                 C   s*   | j }|d u r| | }| _ || d S r   )_Transport__readerrs   )r+   rZ   re   readerr   r   r   on_readable   s   zTransport.on_readable**urireturnc                 C   s   t  )z(Customise the display format of the URI.)r"   )r+   ry   include_passwordmaskr   r   r   as_uri   s   zTransport.as_uric                 C   s   i S r   r   r4   r   r   r   default_connection_params   s   z#Transport.default_connection_paramsc                 O   s
   |  | S r   )r
   )r+   r,   r-   r   r   r   get_manager   r5   zTransport.get_managerc                 C   s   |   S r   )r   r4   r   r   r   manager      zTransport.managerc                 C      | j jS r   )
implementsrT   r4   r   r   r   supports_heartbeats   r   zTransport.supports_heartbeatsc                 C   r   r   )r   rR   r4   r   r   r   supports_ev   r   zTransport.supports_ev)r^   )Frx   )0r@   rA   rB   rC   r
   rV   can_parse_urldefault_portr   connection_errorsr   channel_errorsdriver_typedriver_nameru   default_transport_capabilitiesrM   r   rG   rW   rX   r[   r\   r]   ra   rb   rc   rf   rg   ri   socketrj   rr   rl   EAGAINEINTRrs   rt   rw   strr}   propertyr~   r   r   r   r   r   r   r   r   r   r      sN    




r   )rC   rl   r   amqp.exceptionsr   kombu.exceptionsr   r   kombu.messager   kombu.utils.functionalr   kombu.utils.objectsr   kombu.utils.timer   __all__intr   r   r   r'   r	   r
   r   rH   	frozensetr   r   r   r   r   r   <module>   s8    	""

