o
    ?Zc                     @   sz   d Z ddlmZ ddlmZ ddlmZmZ dZej	Z
dd	d
ZG dd dejZG dd dejZG dd dejZdS )zYCarrot compatibility interface.

See https://pypi.org/project/carrot/ for documentation.
    )count   )	messaging)ExchangeQueue)	PublisherConsumerFNc                 c   s<    |j |d tdD ]}|r||kr d S |  V  qd S )N)no_ackr   )consumer   drain_events)
connectionconsumerr	   limit	iteration r   B/var/www/chikooza/env/lib/python3.10/site-packages/kombu/compat.py_iterconsume   s   r   c                       sp   e Zd ZdZdZdZdZdZdZdZ				d fdd	Z
d	d
 Z fddZdd Zdd Zedd Z  ZS )r   zCarrot compatible producer. directTFNc           	         s   |r|}|p| j | _ |p| j| _|p| j| _|d ur|| _|d ur$|| _t| j ts9t| j | j| j| j| jd| _ t j|| j fi | d S )N)nametyperouting_keyauto_deletedurable)	exchangeexchange_typer   r   r   
isinstancer   super__init__)	selfr   r   r   r   r   r   channelkwargs	__class__r   r   r   #   s"   zPublisher.__init__c                 O   s   | j |i |S N)publish)r   argsr!   r   r   r   send:      zPublisher.sendc                    s   t    d| _d S NT)r   close_closedr   r"   r   r   r*   =   s   

zPublisher.closec                 C      | S r$   r   r,   r   r   r   	__enter__A      zPublisher.__enter__c                 G      |    d S r$   r*   r   exc_infor   r   r   __exit__D      zPublisher.__exit__c                 C   s   | j S r$   )r    r,   r   r   r   backendG   s   zPublisher.backend)NNNNNN)__name__
__module____qualname____doc__r   r   r   r   r   r+   r   r'   r*   r.   r4   propertyr6   __classcell__r   r   r"   r   r      s$    r   c                       s   e Zd ZdZdZdZdZdZdZdZ	dZ
dZ			d fdd	Z fd	d
Zdd Zdd Zdd Zdd Zd ddZdd Zd!ddZd"ddZd!ddZd ddZ  ZS )#r   zCarrot compatible consumer.r   r   TFNc	           
         s   |  | _|d ur|| _|d ur|| _|d ur|| _|p| j| _|p$| j| _|p*| j| _|p0| j| _t	| j| j| j| j| jd}t
| j|| j| j| j| jd}t j| j|fi |	 d S )N)r   r   r   r   )r   r   r   	exclusiver   )r    r6   r   r=   r   queuer   r   r   r   r   r   r   )
r   r   r>   r   r   r   r   r=   r   r!   r"   r   r   r   X   s2   
zConsumer.__init__c                       || _ t | d S r$   r6   r   reviver   r    r"   r   r   rA   v      zConsumer.revivec                 C   s   |    | j  d| _d S r)   )cancelr6   r*   r+   r,   r   r   r   r*   z   s   

zConsumer.closec                 C   r-   r$   r   r,   r   r   r   r.      r/   zConsumer.__enter__c                 G   r0   r$   r1   r2   r   r   r   r4      r5   zConsumer.__exit__c                 C   s   | j ddS )NT)infinite)	iterqueuer,   r   r   r   __iter__   r5   zConsumer.__iter__c                 C   s8   |d u r| j }| jd |}|r|r| |j| |S )Nr   )r	   queuesgetreceivepayload)r   r	   enable_callbacksmessager   r   r   fetch   s   zConsumer.fetchc                 C   s   t d)Nz Use fetch(enable_callbacks=True))NotImplementedErrorr,   r   r   r   process_next      zConsumer.process_nextc                 C   s   |d urt d|  S )Nz&discard_all does not implement filters)rO   purge)r   
filterfuncr   r   r   discard_all   s
   zConsumer.discard_allc                 C      t | j| ||S r$   r   r   r   r   r	   r   r   r   iterconsume   r(   zConsumer.iterconsumec                 C   s   |  |}t|S r$   )rX   list)r   r   itr   r   r   wait   s   
zConsumer.waitc                 c   s>    t  D ]}|  }|s|d u s|r||kr d S |V  qd S r$   )r   rN   )r   r   rE   items_since_startitemr   r   r   rF      s   
zConsumer.iterqueue)NNNNNNNNFr$   )NN)r7   r8   r9   r:   r>   r   r   r   r   r=   r   r+   r   rA   r*   r.   r4   rG   rN   rP   rT   rX   r[   rF   r<   r   r   r"   r   r   L   s2    
	


r   c                       sX   e Zd Z		d fdd	ZdddZdd Zd	d
 Zdd Z fddZdd Z	  Z
S )ConsumerSetNc           
         s   |r	d| _ || _nd| _ | | _g }|r |D ]}||j q|r7| D ]\}}	|tj|fi |	 q&t	 j
| j|fi | d S )NTF)_provided_channelr6   r    extendrH   itemsappendr   	from_dictr   r   )
r   r   rd   	consumersr    r!   rH   r   
queue_namequeue_optionsr"   r   r   r      s   
zConsumerSet.__init__Fc                 C   rU   r$   rV   rW   r   r   r   rX      r(   zConsumerSet.iterconsumec                 C   s   |   S r$   )rR   r,   r   r   r   rT      rQ   zConsumerSet.discard_allc                 K   s   |  tj|fi |S r$   )	add_queuer   rd   )r   r>   optionsr   r   r   add_consumer_from_dict   s   z"ConsumerSet.add_consumer_from_dictc                 C   s   |j D ]}| | qd S r$   )rH   rh   )r   r   r>   r   r   r   add_consumer   s   
zConsumerSet.add_consumerc                    r?   r$   r@   rB   r"   r   r   rA      rC   zConsumerSet.revivec                 C   s    |    | js| j  d S d S r$   )rD   r`   r    r*   r,   r   r   r   r*      s   zConsumerSet.close)NNNr^   )r7   r8   r9   r   rX   rT   rj   rk   rA   r*   r<   r   r   r"   r   r_      s    
r_   )FN)r:   	itertoolsr   r   r   entityr   r   __all__rd   entry_to_queuer   Producerr   r   r_   r   r   r   r   <module>   s    
3^