o
    ?Zc                  	   @   s  d Z ddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ zddlZdd	lmZ dd
lmZ W n eyG   d Z ZZY nw dZdZdZeeZG dd dejZG dd dejZedureddd  ejejddG dd dZedkred e  AZ!ed"ej#j$ej#j% e& Z'ed"ej#j$ e!(eZ)e'(de) W d   n1 sw   Y  e!*  W d   dS 1 sw   Y  dS dS )a  Pyro transport module for kombu.

Pyro transport, and Kombu Broker daemon.

Requires the :mod:`Pyro4` library to be installed.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================

To use the Pyro transport with Kombu, use an url of the form:

.. code-block::

    pyro://localhost/kombu.broker

The hostname is where the transport will be looking for a Pyro name server,
which is used in turn to locate the kombu.broker Pyro service.
This broker can be launched by simply executing this transport module directly,
with the command: ``python -m kombu.transport.pyro``

Transport Options
=================
    N)EmptyQueue)reraise)
get_logger)cached_property   )virtual)NamingError)SerializerBasei#  z5Unable to locate pyro nameserver on host {0.hostname}zKUnable to lookup '{0.virtual_host}' in pyro nameserver on host {0.hostname}c                       s~   e Zd ZdZ fddZdd Zdd Zdd	 ZdddZdd Z	dd Z
dd Zdd Zdd Zdd Zedd Z  ZS )ChannelzPyro Channel.c                    s"   t    | jr| j  d S d S N)supercloseshared_queues_pyroReleaseself	__class__ J/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/pyro.pyr   A   s   
zChannel.closec                 C   s
   | j  S r   )r   get_queue_namesr   r   r   r   queuesF      
zChannel.queuesc                 K   s    ||   vr| j| d S d S r   r   r   	new_queuer   queuekwargsr   r   r   
_new_queueI   s   zChannel._new_queuec                 K      | j |S r   )r   	has_queuer   r   r   r   
_has_queueM      zChannel._has_queueNc                 C   s   |  |}| j|S r   )
_queue_forr   get)r   r   timeoutr   r   r   _getP   s   
zChannel._getc                 C   s   ||   vr| j| |S r   r   r   r   r   r   r   r$   T   s   zChannel._queue_forc                 K   s   |  |}| j|| d S r   )r$   r   put)r   r   messager   r   r   r   _putY   s   
zChannel._putc                 C   r    r   )r   sizer(   r   r   r   _size]   r#   zChannel._sizec                 O   s   | j | d S r   )r   delete)r   r   argsr   r   r   r   _delete`   s   zChannel._deletec                 C   r    r   )r   purger(   r   r   r   _purgec   r#   zChannel._purgec                 C   s   d S r   r   r(   r   r   r   after_reply_message_receivedf   s   z$Channel.after_reply_message_receivedc                 C   s   | j jS r   )
connectionr   r   r   r   r   r   i      zChannel.shared_queuesr   )__name__
__module____qualname____doc__r   r   r   r"   r'   r$   r+   r-   r0   r2   r3   r   r   __classcell__r   r   r   r   r   >   s    
r   c                       sT   e Zd ZdZeZe ZeZ	d Z
Z fddZdd Zdd Zed	d
 Z  ZS )	TransportzPyro Transport.pyroc                    s    t  j|fi | | j| _d S r   )r   __init__global_statestate)r   clientr   r   r   r   r=   {   s   zTransport.__init__c              	   C   s   t d | j}ztj|j| jd}W n ty+   tttt	
|t d  Y nw z||j}t|W S  tyQ   tttt
|t d  Y d S w )Nz0trying Pyro nameserver to find the broker daemon)hostport   )loggerdebugr@   r<   locateNShostnamedefault_portr	   r   E_NAMESERVERformatsysexc_infolookupvirtual_hostProxyE_LOOKUP)r   conninfo
nameserverurir   r   r   _open   s&   




zTransport._openc                 C   s   t jS r   )r<   __version__r   r   r   r   driver_version   s   zTransport.driver_versionc                 C   s   |   S r   )rT   r   r   r   r   r      r5   zTransport.shared_queues)r6   r7   r8   r9   r   r   BrokerStater>   DEFAULT_PORTrH   driver_typedriver_namer=   rT   rV   r   r   r:   r   r   r   r   r;   n   s    r;   zqueue.Emptyc                 C   s   t  S r   )r   )clsdatar   r   r   <lambda>   s    r]   single)instance_modec                   @   sX   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd ZdS )KombuBrokerzmKombu Broker used by the Pyro transport.

        You have to run this as a separate (Pyro) service.
        c                 C   s
   i | _ d S r   r   r   r   r   r   r=      r   zKombuBroker.__init__c                 C   s
   t | jS r   )listr   r   r   r   r   r      r   zKombuBroker.get_queue_namesc                 C   s   || j v rd S t | j |< d S r   )r   r   r(   r   r   r   r      s   
zKombuBroker.new_queuec                 C   s
   || j v S r   ra   r(   r   r   r   r!      r   zKombuBroker.has_queuec                 C   s   | j | jddS )NF)block)r   r%   r(   r   r   r   r%      s   zKombuBroker.getc                 C   s   | j | | d S r   )r   r)   )r   r   r*   r   r   r   r)      s   zKombuBroker.putc                 C   s   | j |  S r   )r   qsizer(   r   r   r   r,      s   zKombuBroker.sizec                 C   s   | j |= d S r   ra   r(   r   r   r   r.      r#   zKombuBroker.deletec                 C   s0   	 z| j | jdd W n
 ty   Y d S w q)NTF)blocking)r   r%   r   r(   r   r   r   r1      s   zKombuBroker.purgeN)r6   r7   r8   r9   r=   r   r   r!   r%   r)   r,   r.   r1   r   r   r   r   r`      s    r`   __main__z,Launching Broker for Kombu's Pyro transport.z'(Expecting a Pyro name server at {}:{})zAYou can connect with Kombu using the url 'pyro://{}/kombu.broker'zkombu.broker)+r9   rK   r   r   r   kombu.exceptionsr   	kombu.logr   kombu.utils.objectsr    r   Pyro4r<   Pyro4.errorsr	   
Pyro4.utilr
   ImportErrorrX   rI   rP   r6   rD   r   r;   register_dict_to_classexposebehaviorr`   printDaemondaemonrJ   configNS_HOSTNS_PORTrF   nsregisterrS   requestLoopr   r   r   r   <module>   sV    "0*
*




"