o
    ?Zc9                     @   s  d Z ddlZddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ dd	lmZmZmZmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZ ddlm Z  ddl!m"Z" dZ#dZ$dZ%ee&Z'e'j(e'j)Z(Z)G dd dZ*G dd dZ+dS )zGeneric process mailbox.    N)defaultdictdeque)contextmanager)copy)count)time   )ConsumerExchangeProducerQueue)LamportClock)maybe_declareoid_from)InconsistencyError)
get_logger)match)maybe_evaluatereprcall)cached_property)uuid
   zA node named {node.hostname} is already using this process mailbox!

Maybe you forgot to shutdown the other node or did not do so properly?
Or if you meant to start multiple nodes on the same host please make sure
you give each node a unique node name!
)NodeMailboxc                   @   s   e Zd ZdZdZdZdZdZdZ		dddZ	dddZ
dd	 Zd
d ZdddZ		dddZdddZdd Zdd ZdddZeZdd ZdS )r   zMailbox node.Nc                 C   s:   || _ || _|| _|| _| jjj| _|d u ri }|| _d S N)channelmailboxhostnamestateclockadjustadjust_clockhandlers)selfr   r   r   r"   r    r$   B/var/www/chikooza/env/lib/python3.10/site-packages/kombu/pidbox.py__init__6   s   
zNode.__init__Tc                    sP    j  j} fdd}||_t|p j|gf||d u r! j jn|d|S )Nc                    s   |rt tj d d S d S )N)node)warningswarnW_PIDBOX_IN_USEformat)namemessages	consumersr#   r$   r%   verify_exclusiveD   s   z'Node.Consumer.<locals>.verify_exclusive)no_ackaccept)r   	get_queuer   on_declaredr	   r   r2   )r#   r   r1   r2   optionsqueuer0   r$   r/   r%   r	   A   s   zNode.Consumerc                 C   s   || j |j< |S r   )r"   __name__)r#   funr$   r$   r%   handlerO   s   zNode.handlerc                 C   s   t d|dd d S )NzCannot decode message: %rr   exc_info)error)r#   messageexcr$   r$   r%   on_decode_errorS   s   zNode.on_decode_errorc                 C   s&   | j ||p| jg| jd}|  |S )N)r   	callbacksr?   )r	   handle_messager?   consume)r#   r   callbackconsumerr$   r$   r%   listenV   s   
zNode.listenc           	   
   K   s   |pi }t dt|d|d|| |r| jp| j}z|||}W n& ty'     tyE } ztd|dd dt|i}W Y d }~nd }~ww |rX| j| j	|i|d |d	 |d
 |S )Nz1pidbox received method %s [reply_to:%s ticket:%s]r$   )kwargszpidbox command error: %rr   r:   r<   exchangerouting_key)rG   rH   ticket)
debugr   handle_callhandle_cast
SystemExit	Exceptionr<   reprreplyr   )	r#   method	argumentsreply_torI   rF   handlerP   r>   r$   r$   r%   dispatch]   s*   zNode.dispatchc                 C   s$   |si n|}| j | | jfi |S r   )r"   r   r#   rQ   rR   r$   r$   r%   rT   r   s   zNode.handlec                 C      |  ||S r   rT   rV   r$   r$   r%   rK   v      zNode.handle_callc                 C   rW   r   rX   rV   r$   r$   r%   rL   y   rY   zNode.handle_castc                 C   s   | d}| d}| d}|r| |j dpd | j}d}|r*||v r)d}n|r7|r7t|||r6d}nd}|rC| jdi |S d S )	Ndestinationpatternmatcherr   r   FTr$   )getr!   headersr   r   rU   )r#   bodyr=   rZ   r[   r\   r   run_dispatchr$   r$   r%   rA   |   s&   


zNode.handle_messagec                 K   s"   | j j||||| j| j jd d S )N)r   
serializer)r   _publish_replyr   ra   )r#   datarG   rH   rI   rF   r$   r$   r%   rP      s   
z
Node.replyNNNN)NTNNN)NNNr   )r7   
__module____qualname____doc__r   r   r"   r   r   r&   r	   r9   r?   rE   rU   rT   rK   rL   rA   dispatch_from_messagerP   r$   r$   r$   r%   r   $   s.    





r   c                   @   s  e Zd ZdZeZdZdZdZdZ	dZ
dZdZdgZdZ				d0dd	Zd
d Zd1ddZ		d1ddZd2ddZd2ddZ		d3ddZdd Zedd Zdd Zed4ddZ	d4dd Z			d5d!d"Z				d6d$d%Z		d3d&d'Zd(d) Zd*d+ Z e!d,d- Z"ed.d/ Z#dS )7r   zProcess Mailbox.z	%s.pidboxzreply.%s.pidboxNdirectjson      $@c                 C   s   || _ || _|| _|d u rt n|| _| | j | j| _| | j | _t	t
| _|d u r/| jn|| _|d u r9| jn|| _|| _|	| _|
| _|| _|| _d S r   )	namespace
connectiontyper   r   _get_exchangerG   _get_reply_exchangereply_exchanger   r   	unclaimedr2   ra   	queue_ttlqueue_expiresreply_queue_ttlreply_queue_expires_producer_pool)r#   rm   ro   rn   r   r2   ra   producer_poolrt   ru   rv   rw   r$   r$   r%   r&      s   

zMailbox.__init__c                 C   s   t | }||_|S r   )r   rn   )r#   rn   boundr$   r$   r%   __call__   s   zMailbox.__call__c                 C   s    |pt  }| j||||| dS )N)r   )socketgethostnamenode_cls)r#   r   r   r   r"   r$   r$   r%   r      s   zMailbox.Nodec              	   C   s$   |si n|}| j |||d|||dS )NT)rP   timeoutrC   r   
_broadcast)r#   rZ   commandrF   r   rC   r   r$   r$   r%   call      
zMailbox.callc                 C   s   |si n|}| j |||ddS NF)rP   r   )r#   rZ   r   rF   r$   r$   r%   cast   s   zMailbox.castc                 C   s   |si n|}| j ||ddS r   r   )r#   r   rF   r$   r$   r%   abcast   s   zMailbox.abcastr   c              	   C   s$   |si n|}| j ||d||||dS )NT)rP   r   limitrC   r   r   )r#   r   rF   r   r   rC   r   r$   r$   r%   
multi_call   r   zMailbox.multi_callc              	   C   s0   | j }t| d| jj | j|dd| j| jdS )N.FT)rG   rH   durableauto_deleteexpiresmessage_ttl)oidr   rr   r,   rw   rv   )r#   r   r$   r$   r%   get_reply_queue   s   zMailbox.get_reply_queuec                 C   s   |   S r   )r   r/   r$   r$   r%   reply_queue      zMailbox.reply_queuec                 C   s(   t | d| j d| jdd| j| jdS )Nr   z.pidboxFT)rG   r   r   r   r   )r   rm   rG   ru   rt   )r#   r   r$   r$   r%   r3      s   zMailbox.get_queuec                 c   s^    |r|V  d S | j r&| j  }|V  W d    d S 1 sw   Y  d S t|ddV  d S )NF)auto_declare)ry   acquirer   )r#   producerr   r$   r$   r%   producer_or_acquire  s   
"zMailbox.producer_or_acquirec           	   	   K   s   |p| j j}t|dddd}| ||3}z|j|f|||g|| j ddd| W n	 ty6   Y n	w W d    d S W d    d S 1 sJw   Y  d S )Nrj   	transientF)exchange_typedelivery_moder   )rI   r   T)rG   rH   declarer^   retry)rn   default_channelr
   r   publishr   forwardr   )	r#   rP   rG   rH   rI   r   r   optschanr$   r$   r%   rb     s2   

"zMailbox._publish_replyc              	   C   s   ||||	|
d}|p| j j}| j}|r't| | |j|| jj| jdd |p+| j	}| 
||#}|j||j|g| j |rEt | ndd|dd W d    d S 1 sXw   Y  d S )N)rQ   rR   rZ   r[   r\   )rG   rH   )rI   rS   r   )r   r   T)rG   r   r^   ra   r   )rn   r   rG   r   r   updaterr   r,   r   ra   r   r   r   r   r   )r#   ro   rR   rZ   reply_ticketr   r   ra   r   r[   r\   r=   r   rG   r$   r$   r%   _publish  s2   

"zMailbox._publishFc                 C   s   |d urt |ttfstdt||
d ur2t |
ts2|d ur2t |ts2tdt|
t||p5i }|r;t p<d }|pB| jj	}|d u rQ|rQ|rOt
|pPd }|	pU| j}	| j|||||||	|
|d	 |rp| j|||||dS d S )Nz'destination must be a list/tuple not {}z.pattern and matcher must be strings not {}, {})rZ   r   r   r   ra   r[   r\   )r   r   rC   r   )
isinstancelisttuple
ValueErrorr+   ro   strr   rn   r   lenra   r   _collect)r#   r   rR   rZ   rP   r   r   rC   r   ra   r[   r\   r   r   r$   r$   r%   r   6  sL   

zMailbox._broadcastc              
      s  |d u r| j }|p| jj}| j}t||g|dd}	g | j| jj zW S  t	y1   Y nw  fdd}
|	
|
 zD|	1 |rKt|pMt D ]}z	| jj|d W qN tjyd   Y  nw W  d    W ||j S 1 sxw   Y  W ||j d S ||j w )NT)r2   r1   c                    sp   |j j} |dp
d |d}|rt |krd S |d}|kr/r(|  |  d S | |  d S )Nr   r   r   rI   )r^   r]   r   append)r_   r=   headerr   this_idr!   rC   	responsesrI   rs   r$   r%   
on_messagen  s   
z$Mailbox._collect.<locals>.on_message)r   )r2   rn   r   r   r	   rs   r   r    popKeyErrorregister_callbackranger   drain_eventsr|   r   after_reply_message_receivedr,   )r#   rI   r   r   rC   r   r2   r   r6   rD   r   ir$   r   r%   r   ]  s8   
zMailbox._collectc                 C   s   t | j| |dddS )NFr   ro   r   r   )r
   exchange_fmt)r#   rm   ro   r$   r$   r%   rp     
   
zMailbox._get_exchangec                 C   s   t | j| ddddS )Nrj   Fr   r   )r
   reply_exchange_fmt)r#   rm   r$   r$   r%   rq     r   zMailbox._get_reply_exchangec                 C   s   t | S r   )r   r/   r$   r$   r%   r     r   zMailbox.oidc                 C   s
   t | jS r   )r   rx   r/   r$   r$   r%   ry     s   
zMailbox.producer_pool)
rj   NNNNNNNNrl   rd   r   )Nr   NNNre   )NNNNNNNN)
NNFr   NNNNNN)$r7   rf   rg   rh   r   r~   r   r   rm   rn   ro   rG   rr   r2   ra   r&   r{   r   r   r   r   r   r   r   r3   r   r   rb   r   r   r   rp   rq   propertyr   ry   r$   r$   r$   r%   r      sj    











(
,
r   ),rh   r|   r(   collectionsr   r   
contextlibr   r   	itertoolsr   r    r	   r
   r   r   clocksr   commonr   r   
exceptionsr   logr   r\   r   utils.functionalr   r   utils.objectsr   
utils.uuidr   REPLY_QUEUE_EXPIRESr*   __all__r7   loggerrJ   r<   r   r   r$   r$   r$   r%   <module>   s0    r