o
    ?Zc                     @   sV  d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	m
Z
 ddlmZ z]ddlZddlmZ dd	lmZ ejjejjejjejjejjejjejjejjejjf	Zejjejjejjejjejjejjejj ejj!ejjejj"ejj#ejj$ejjejj%ej&fZ'W n e(y   dZd
 ZZ'Y nw dZ)dZ*G dd dej+Z+G dd dej,Z,dS )a  Zookeeper transport module for kombu.

Zookeeper based transport. This transport uses the built-in kazoo Zookeeper
based queue implementation.

**References**

- https://zookeeper.apache.org/doc/current/recipes.html#sc_recipes_Queues
- https://kazoo.readthedocs.io/en/latest/api/recipe/queue.html

**Limitations**
This queue does not offer reliable consumption.  An entry is removed from
the queue prior to being processed.  So if an error occurs, the consumer
has to re-queue the item or it will be lost.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connects to a zookeeper node as:

.. code-block::

    zookeeper://SERVER:PORT/VHOST

The <vhost> becomes the base for all the other znodes.  So we can use
it like a vhost.


Transport Options
=================

    N)Empty)bytes_to_strensure_bytes)dumpsloads   )virtual)KazooClient)Queue i  z!Mahendra M <mahendra.m@gmail.com>c                       s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	dd Z
dd Zdd Zdd Zdd Zdd Zdd Zedd Z  ZS )ChannelzZookeeper Channel.Nc                    s4   t  j|fi | | jjj}d|d| _d S )Nz/{}/)super__init__
connectionclientvirtual_hostformatstrip_vhost)selfr   kwargsvhost	__class__r   O/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/zookeeper.pyr   g   s   
zChannel.__init__c                 C   s   t j| j|S N)ospathjoinr   )r   
queue_namer   r   r   	_get_pathl   s   zChannel._get_pathc                 C   s>   | j |d }|d u rt| j| |}|| j |< t| |S r   )_queuesgetr
   r   r!   len)r   r    queuer   r   r   
_get_queueo   s   
zChannel._get_queuec                 K   s&   |  |jtt|| j|dddS )NT)reverse)priority)r&   putr   r   _get_message_priority)r   r%   messager   r   r   r   _put{   s   

zChannel._putc                 C   s,   |  |}| }|d u rt tt|S r   )r&   r#   r   r   r   )r   r%   msgr   r   r   _get   s
   
zChannel._getc                 C   s0   d}|  |}	 | }|d u r	 |S |d7 }q)Nr   Tr   )r&   r#   )r   r%   countr-   r   r   r   _purge   s   
zChannel._purgec                 O   s.   |  |r| | | j| | d S d S r   )
_has_queuer0   r   deleter!   )r   r%   argsr   r   r   r   _delete   s   

zChannel._deletec                 C   s   |  |}t|S r   )r&   r$   r   r%   r   r   r   _size   s   
zChannel._sizec                 K   s   |  |s| |}d S d S r   )r1   r&   )r   r%   r   r   r   r   
_new_queue   s   
zChannel._new_queuec                 C   s   | j | |d uS r   )r   existsr!   r5   r   r   r   r1      s   zChannel._has_queuec              	   C   s   | j j}g }|jrO|jD ]B}|dr|tdd  }|sqz|dd\}}|t|f}W n tyH   ||jkrB||j	p?t
f}n|t
f}Y nw || q|j|j	pUt
f}||vra|d| ddd |D }t|}|  |S )Nzzookeeper://:r   r   ,c                 S   s   g | ]\}}| d | qS )r9   r   ).0hpr   r   r   
<listcomp>   s    z!Channel._open.<locals>.<listcomp>)r   r   alt
startswithr$   splitint
ValueErrorhostnameportDEFAULT_PORTappendinsertr   r	   start)r   conninfohosts	host_porthostrE   conn_strconnr   r   r   _open   s2   


zChannel._openc                 C   s   | j d u r
|  | _ | j S r   )_clientrP   r   r   r   r   r      s   

zChannel.client)__name__
__module____qualname____doc__rQ   r"   r   r!   r&   r,   r.   r0   r4   r6   r7   r1   rP   propertyr   __classcell__r   r   r   r   r   a   s"    	r   c                       sT   e Zd ZdZeZdZeZej	j
e Z
ej	je ZdZdZ fddZdd Z  ZS )		TransportzZookeeper Transport.r   	zookeeperkazooc                    s&   t d u rtdt j|i | d S )Nz"The kazoo library is not installed)r[   ImportErrorr   r   )r   r3   r   r   r   r   r      s   zTransport.__init__c                 C   s   t jS r   )r[   __version__rR   r   r   r   driver_version   s   zTransport.driver_version)rS   rT   rU   rV   r   polling_intervalrF   default_portr   rY   connection_errorsKZ_CONNECTION_ERRORSchannel_errorsKZ_CHANNEL_ERRORSdriver_typedriver_namer   r^   rX   r   r   r   r   rY      s    

rY   )-rV   r   socketr%   r   kombu.utils.encodingr   r   kombu.utils.jsonr   r    r   r[   kazoo.clientr	   kazoo.recipe.queuer
   
exceptionsSystemErrorExceptionConnectionLossExceptionMarshallingErrorExceptionUnimplementedExceptionOperationTimeoutExceptionNoAuthExceptionInvalidACLExceptionAuthFailedExceptionSessionExpiredExceptionrb   RuntimeInconsistencyExceptionDataInconsistencyExceptionBadArgumentsExceptionApiErrorExceptionNoNodeExceptionNodeExistsException NoChildrenForEphemeralsExceptionNotEmptyExceptionInvalidCallbackExceptionerrorrd   r\   rF   
__author__r   rY   r   r   r   r   <module>   sZ   )f