o
    ?Zc                     @   s   d Z ddlZddlmZ ddlmZ ddlmZmZ ddl	m
Z
 ddlmZ zdd	lmZ W n ey9   dZY nw d
d ejD ZG dd dejZG dd dejZdS )a  Azure Storage Queues transport module for kombu.

More information about Azure Storage Queues:
https://azure.microsoft.com/en-us/services/storage/queues/

Features
========
* Type: Virtual
* Supports Direct: *Unreviewed*
* Supports Topic: *Unreviewed*
* Supports Fanout: *Unreviewed*
* Supports Priority: *Unreviewed*
* Supports TTL: *Unreviewed*

Connection String
=================

Connection string has the following format:

.. code-block::

    azurestoragequeues://:STORAGE_ACCOUNT_ACCESS kEY@STORAGE_ACCOUNT_NAME

Note that if the access key for the storage account contains a slash, it will
have to be regenerated before it can be used in the connection URL.

Transport Options
=================

* ``queue_name_prefix``
    N)Empty)safe_str)dumpsloads)cached_property   )virtual)QueueServicec                 C   s   i | ]}t |d qS )-   )ord).0c r   X/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/azurestoragequeues.py
<dictcomp>1   s    
r   c                       s   e Zd ZdZdZdZi ZdZe Z	 fddZ
 fddZefd	d
Zdd Z fddZdd ZdddZdd Zdd Zedd Zedd Zedd Zedd Z  ZS ) ChannelzAzure Storage Queues channel.zkombu%(vhost)sNTc                    s@   t d u rtdt j|i | | j D ]}|| j|< qd S )NzGAzure Storage Queues transport requires the azure-storage-queue library)r	   ImportErrorsuper__init__queue_servicelist_queues_queue_name_cache)selfargskwargs
queue_name	__class__r   r   r   ?   s   zChannel.__init__c                    s,   |r| j | t j||g|R i |S N)_noack_queuesaddr   basic_consume)r   queueno_ackr   r   r   r   r   r!   I   s   zChannel.basic_consumec                 C   s   t t||S )z=Format AMQP queue name into a valid Azure Storage Queue name.)strr   	translate)r   nametabler   r   r   entity_nameP   s   zChannel.entity_namec                 C   sR   |  | j| }z| j| W S  ty(   | jj|dd | }| j|< | Y S w )zEnsure a queue exists.F)fail_on_exist)r(   queue_name_prefixr   KeyErrorr   create_queue)r   r"   qr   r   r   _ensure_queueT   s   zChannel._ensure_queuec                    s4   |  |}| j|d | j| t | dS )zDelete queue by name.N)r(   r   popr   delete_queuer   _delete)r   r"   r   r   r   r   r   r   r1   ^   s   
zChannel._deletec                 K   s$   |  |}t|}| j|| dS )zPut message onto queue.N)r.   r   r   put_message)r   r"   messager   r-   encoded_messager   r   r   _pute   s   
zChannel._putc                 C   s\   |  |}| jj|d|d}|st |d }| j|j}t|}| j||j|j	 |S )z/Try to retrieve a single message off ``queue``.r   )num_messagestimeoutr   )
r.   r   get_messagesr   decode_functioncontentr   delete_messageidpop_receipt)r   r"   r7   r-   messagesr3   raw_contentr:   r   r   r   _getk   s   

zChannel._getc                 C   s   |  |}| j|}|jS )z)Return the number of messages in a queue.)r.   r   get_queue_metadataapproximate_message_count)r   r"   r-   metadatar   r   r   _size|   s   
zChannel._sizec                 C   s$   |  |}| |}| j| |S )z'Delete all current messages in a queue.)r.   rD   r   clear_messages)r   r"   r-   nr   r   r   _purge   s   

zChannel._purgec                 C   s&   | j d u rt| jj| jjd| _ | j S )N)account_nameaccount_key)_queue_servicer	   conninfohostnamepasswordr   r   r   r   r      s   
zChannel.queue_servicec                 C   s   | j jS r   )
connectionclientrN   r   r   r   rK      s   zChannel.conninfoc                 C   s
   | j jjS r   )rO   rP   transport_optionsrN   r   r   r   rQ      s   
zChannel.transport_optionsc                 C   s   | j ddS )Nr*    )rQ   getrN   r   r   r   r*      s   zChannel.queue_name_prefixr   )__name__
__module____qualname____doc__domain_formatrJ   r   r#   setr   r   r!   CHARS_REPLACE_TABLEr(   r.   r1   r5   r@   rD   rG   propertyr   rK   rQ   r   r*   __classcell__r   r   r   r   r   6   s0    





r   c                   @   s   e Zd ZdZeZdZdZdS )	TransportzAzure Storage Queues transport.r   N)rT   rU   rV   rW   r   polling_intervaldefault_portr   r   r   r   r]      s
    r]   )rW   stringr"   r   kombu.utils.encodingr   kombu.utils.jsonr   r   kombu.utils.objectsr   rR   r   azure.storage.queuer	   r   punctuationrZ   r   r]   r   r   r   r   <module>   s"     i