o
    ?ZcD4                     @   s  d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZmZmZ ddlmZmZ d	d
lmZmZ d	dlmZ d	dlmZ d	dlmZ dZdZeeZ da!dd Z"dd Z#d:ddZ$G dd deZ%dd Z&d;ddZ'dd Z(d d! Z)d"d# Z*d<d$d%Z+		d<d&d'Z,d=d(d)Z-	d>d*d+Z.d,d- Z/d.d/ Z0ed0d1 Z1d?d2d3Z2d?d4d5Z3d@d6d7Z4G d8d9 d9Z5dS )AzCommon Utilities.    N)deque)contextmanager)partial)count)NAMESPACE_OIDuuid3uuid4uuid5)ChannelErrorRecoverableConnectionError   )ExchangeQueue)
get_logger)registry)uuid)		Broadcastmaybe_declarer   itermessages
send_replycollect_repliesinsureddrain_consumer	eventloopi  c                   C   s   t d u rt ja t S N)_node_idr   int r   r   B/var/www/chikooza/env/lib/python3.10/site-packages/kombu/common.pyget_node_id    s   r   c                 C   sL   d | ||t|}z
ttt|}W |S  ty%   ttt|}Y |S w )Nz{:x}-{:x}-{:x}-{:x})formatidstrr   r   
ValueErrorr	   )node_id
process_id	thread_idinstanceentretr   r   r   generate_oid'   s   r*   Tc                 C   s$   t t t |rt | S d| S Nr   )r*   r   osgetpid	threading	get_ident)r'   threadsr   r   r   oid_from1   s   
r1   c                       s8   e Zd ZdZejd Z						d fdd	Z  ZS )	r   a  Broadcast queue.

    Convenience class used to define broadcast queues.

    Every queue instance will have a unique name,
    and both the queue and exchange is configured with auto deletion.

    Arguments:
        name (str): This is used as the name of the exchange.
        queue (str): By default a unique id is used for the queue
            name for every consumer.  You can specify a custom
            queue name here.
        unique (bool): Always create a unique queue
            even if a queue name is supplied.
        **kwargs (Any): See :class:`~kombu.Queue` for a list
            of additional keyword arguments supported.
    ))queueNNFTc              
      sb   |rd |pdt }n|pdt  }t jd|p|||||d ur$|nt|ddd| d S )Nz{}.{}bcastzbcast.fanout)type)aliasr2   nameauto_deleteexchanger   )r    r   super__init__r   )selfr7   r2   uniquer8   r9   r6   kwargs	__class__r   r   r;   O   s   

zBroadcast.__init__)NNFTNN)__name__
__module____qualname____doc__r   attrsr;   __classcell__r   r   r?   r   r   :   s    
r   c                 C   s   | |j jjv S r   )
connectionclientdeclared_entities)entitychannelr   r   r   declaration_cachedf   s   rL   Fc                 K   s    |rt | |fi |S t| |S )zDeclare entity (cached).)_imaybe_declare_maybe_declare)rJ   rK   retryretry_policyr   r   r   r   j   s   
r   c                 C   s4   | j }|s|std| d|  | |} | S dS )zMake sure the channel is bound to the entity.

    :param entity: generic kombu nomenclature, generally an exchange or queue
    :param channel: channel to bind to the entity
    :return: the updated entity
    zCannot bind channel z to entity N)is_boundr
   bind)rJ   rK   rQ   r   r   r   _ensure_channel_is_boundq   s   
rS   c                 C   s   | }t | | |d u r| jstd|  d| j}d  }}|jr2| jr2|jjj}t| }||v r2dS |js9t	d| j
|d |d urJ|rJ|| |d urR| j|_dS )Nzchannel is None and entity z not bound.Fchannel disconnected)rK   T)rS   rQ   r
   rK   rG   can_cache_declarationrH   rI   hashr   declareaddr7   )rJ   rK   origdeclaredidentr   r   r   rN      s,   



rN   c                 K   s:   t | | | jjstd| jjjj| tfi || |S )NrT   )rS   rK   rG   r   rH   ensurerN   )rJ   rK   rP   r   r   r   rM      s   

rM   c              
   #   s    t    fdd}|g|pg  | _| ' t| jjj||ddD ]}z  V  W q  ty2   Y q w W d   dS 1 s>w   Y  dS )z&Drain messages from consumer instance.c                    s     | |f d S r   )append)bodymessageaccr   r   
on_message   s   z"drain_consumer.<locals>.on_messageT)limittimeoutignore_timeoutsN)r   	callbacksr   rK   rG   rH   popleft
IndexError)consumerrc   rd   rf   rb   _r   r`   r   r      s   

"r   c                 K   s$   t | jd|g|d||||dS )zIterator over messages.)queuesrK   )rc   rd   rf   Nr   )r   Consumer)connrK   r2   rc   rd   rf   r>   r   r   r   r      s   r   c              	   c   sN    |rt |p	t D ]}z	| j|dV  W q
 tjy$   |r"|s" Y q
w dS )a  Best practice generator wrapper around ``Connection.drain_events``.

    Able to drain events forever, with a limit, and optionally ignoring
    timeout errors (a timeout of 1 is often used in environments where
    the socket can get "stuck", and is a best practice for Kombu consumers).

    ``eventloop`` is a generator.

    Examples:
        >>> from kombu.common import eventloop

        >>> def run(conn):
        ...     it = eventloop(conn, timeout=1, ignore_timeouts=True)
        ...     next(it)   # one event consumed, or timed out.
        ...
        ...     for _ in eventloop(conn, timeout=1, ignore_timeouts=True):
        ...         pass  # loop forever.

    It also takes an optional limit parameter, and timeout errors
    are propagated by default::

        for _ in eventloop(connection, limit=1, timeout=1):
            pass

    See Also:
        :func:`itermessages`, which is an event loop bound to one or more
        consumers, that yields any messages received.
    )rd   N)ranger   drain_eventssocketrd   )rm   rc   rd   re   ir   r   r   r      s   r   c              	   K   sH   |j |f| ||dt|jd |jdtj|j |jdfi |S )a  Send reply for request.

    Arguments:
        exchange (kombu.Exchange, str): Reply exchange
        req (~kombu.Message): Original request, a message with
            a ``reply_to`` property.
        producer (kombu.Producer): Producer instance
        retry (bool): If true must retry according to
            the ``reply_policy`` argument.
        retry_policy (Dict): Retry settings.
        **props (Any): Extra properties.
    )r9   rO   rP   reply_tocorrelation_id)routing_keyrs   
serializercontent_encoding)publishdict
propertiesgetserializerstype_to_namecontent_typerv   )r9   reqmsgproducerrO   rP   propsr   r   r   r      s   


r   c           	   	   o   s|    | dd}d}z*t| ||g|R i |D ]\}}|s!|  d}|V  qW |r2||j dS dS |r=||j w w )z,Generator collecting replies from ``queue``.no_ackTFN)
setdefaultr   ackafter_reply_message_receivedr7   )	rm   rK   r2   argsr>   r   receivedr^   r_   r   r   r   r      s&   
r   c                 C   s   t jd| |dd d S )Nz#Connection error: %r. Retry in %ss
T)exc_info)loggererror)excintervalr   r   r   _ensure_errback  s   
r   c              	   c   s,    zd V  W d S  | j | j y   Y d S w r   )connection_errorschannel_errors)rm   r   r   r   _ignore_errors  s   r   c                 O   sB   |rt |  ||i |W  d   S 1 sw   Y  t | S )a  Ignore connection and channel errors.

    The first argument must be a connection object, or any other object
    with ``connection_error`` and ``channel_error`` attributes.

    Can be used as a function:

    .. code-block:: python

        def example(connection):
            ignore_errors(connection, consumer.channel.close)

    or as a context manager:

    .. code-block:: python

        def example(connection):
            with ignore_errors(connection):
                consumer.channel.close()


    Note:
        Connection and channel errors should be properly handled,
        and not ignored.  Using this function is only acceptable in a cleanup
        phase, like when a connection is lost or at shutdown.
    N)r   )rm   funr   r>   r   r   r   ignore_errors  s
   
 r   c                 C   s   |r|| d S d S r   r   )rG   rK   	on_reviver   r   r   revive_connection@  s   r   c                 K   s   |pt }| jdd4}|j|d |j}tt||d}	|j||f||	d|}
|
|i t||d\}}|W  d   S 1 sAw   Y  dS )zFunction wrapper to handle connection errors.

    Ensures function performing broker commands completes
    despite intermittent connection failures.
    T)block)errback)r   )r   r   )rG   N)r   acquireensure_connectiondefault_channelr   r   	autoretryrx   )poolr   r   r>   r   r   optsrm   rK   reviver   retvalrj   r   r   r   r   E  s   $r   c                   @   s@   e Zd ZdZdZdd ZdddZddd	Zd
d Zdd Z	dS )QoSa  Thread safe increment/decrement of a channels prefetch_count.

    Arguments:
        callback (Callable): Function used to set new prefetch count,
            e.g. ``consumer.qos`` or ``channel.basic_qos``.  Will be called
            with a single ``prefetch_count`` keyword argument.
        initial_value (int): Initial prefetch count value..

    Example:
        >>> from kombu import Consumer, Connection
        >>> connection = Connection('amqp://')
        >>> consumer = Consumer(connection)
        >>> qos = QoS(consumer.qos, initial_prefetch_count=2)
        >>> qos.update()  # set initial

        >>> qos.value
        2

        >>> def in_some_thread():
        ...     qos.increment_eventually()

        >>> def in_some_other_thread():
        ...     qos.decrement_eventually()

        >>> while 1:
        ...    if qos.prev != qos.value:
        ...        qos.update()  # prefetch changed so update.

    It can be used with any function supporting a ``prefetch_count`` keyword
    argument::

        >>> channel = connection.channel()
        >>> QoS(channel.basic_qos, 10)


        >>> def set_qos(prefetch_count):
        ...     print('prefetch count now: %r' % (prefetch_count,))
        >>> QoS(set_qos, 10)
    Nc                 C   s   || _ t | _|pd| _d S r+   )callbackr.   RLock_mutexvalue)r<   r   initial_valuer   r   r   r;     s   
zQoS.__init__r   c                 C   sZ   | j  | jr| jt|d | _W d   | jS W d   | jS 1 s%w   Y  | jS )zIncrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   maxr<   nr   r   r   increment_eventually  s   

zQoS.increment_eventuallyc                 C   sx   | j . | jr|  j|8  _| jdk r(d| _W d   | jS W d   | jS W d   | jS 1 s4w   Y  | jS )zDecrement the value, but do not update the channels QoS.

        Note:
            The MainThread will be responsible for calling :meth:`update`
            when necessary.
        r   N)r   r   r   r   r   r   decrement_eventually  s   



zQoS.decrement_eventuallyc                 C   sH   || j kr"|}|tkrtdt d}td| | j|d || _ |S )z#Set channel prefetch_count setting.z(QoS: Disabled: prefetch_count exceeds %rr   zbasic.qos: prefetch_count->%s)prefetch_count)prevPREFETCH_COUNT_MAXr   warningdebugr   )r<   pcount	new_valuer   r   r   set  s   
zQoS.setc                 C   s6   | j  | | jW  d   S 1 sw   Y  dS )z)Update prefetch count with current value.N)r   r   r   )r<   r   r   r   update  s   
$z
QoS.update)r   )
rA   rB   rC   rD   r   r;   r   r   r   r   r   r   r   r   r   Y  s    (

r   )T)NF)r   NN)NNF)NFNr   )NN)6rD   r,   rp   r.   collectionsr   
contextlibr   	functoolsr   	itertoolsr   r   r   r   r   r	   amqpr
   r   rJ   r   r   logr   serializationr   r{   
utils.uuid__all__r   rA   r   r   r   r*   r1   r   rL   r   rS   rN   rM   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   sT    

	,




	&



!
