o
    ?Zc7                     @   s   d Z ddlZddlmZ ddlZddlmZmZmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ dZdZG dd dZG dd dejZG dd dejZdS )a  MongoDB transport module for kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: Yes

Connection String
=================
 *Unreviewed*

Transport Options
=================

* ``connect_timeout``,
* ``ssl``,
* ``ttl``,
* ``capped_queue_size``,
* ``default_hostname``,
* ``default_port``,
* ``default_database``,
* ``messages_collection``,
* ``routing_collection``,
* ``broadcast_collection``,
* ``queues_collection``,
* ``calc_queue_size``,
    N)Empty)MongoClienterrors
uri_parser)
CursorType)VersionMismatch)_detect_environment)bytes_to_str)dumpsloads)cached_property   )virtualto_rabbitmq_queue_argumentsz3Kombu requires MongoDB version 1.3+ (server is {0})zKKombu requires MongoDB version 2.2+ (server is {0}) for TTL indexes supportc                   @   sF   e Zd ZdZdd Zdd Zdd Zdd	d
Zdd Zdd Z	e	Z
dS )BroadcastCursorzCursor for broadcast queues.c                 C   s   || _ | jdd d S )NF)rewind)_cursorpurge)selfcursor r   M/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/mongodb.py__init__@   s   zBroadcastCursor.__init__c                 C   s   | j  | j S N)r   count_offsetr   r   r   r   get_sizeE      zBroadcastCursor.get_sizec                 C   s   | j   d S r   )r   closer   r   r   r   r    H   s   zBroadcastCursor.closeTc                 C   s.   |r| j   | j  | _| j | j| _ d S r   )r   r   r   r   skip)r   r   r   r   r   r   K   s   
zBroadcastCursor.purgec                 C   s   | S r   r   r   r   r   r   __iter__S   s   zBroadcastCursor.__iter__c              
   C   sd   	 zt | j}W n tjjy' } zdt|v r"|   W Y d }~q  d }~ww 	 |  jd7  _|S )NTznot valid at serverr   )nextr   pymongor   OperationFailurestrr   r   )r   msgexcr   r   r   __next__V   s   
zBroadcastCursor.__next__N)T)__name__
__module____qualname____doc__r   r   r    r   r"   r)   r#   r   r   r   r   r   =   s    
r   c                       sT  e Zd ZdZdZi ZdZdZdZdZ	dZ
dZdZdZd	Zd
ZdZdZejjd Z fddZdd Zdd Z fddZdd Zdd Zdd Zdd Zdd Z fd d!ZdEd#d$Zd%d& Z d'd( Z!dEd)d*Z"d+d, Z#d-d. Z$d/d0 Z%e&d1d2 Z'e&d3d4 Z(e&d5d6 Z)e&d7d8 Z*e&d9d: Z+d;d< Z,d=d> Z-d?d@ Z.dAdB Z/dCdD Z0  Z1S )FChannelzMongoDB Channel.TFNi z	127.0.0.1ii  kombu_defaultmessageszmessages.routingzmessages.broadcastzmessages.queues)connect_timeoutsslttlcapped_queue_sizedefault_hostnamedefault_portdefault_databasemessages_collectionrouting_collectionbroadcast_collectionqueues_collectioncalc_queue_sizec                    s"   t  j|i | i | _| j d S r   )superr   _broadcast_cursorsclient)r   vargskwargs	__class__r   r   r      s   
zChannel.__init__c                 K   s4   | j r| jjd|i||| |dddd d S d S )N_id	x-expires)rD   options	expire_atTupsert)r3   queuesupdate_get_expire)r   queuerA   r   r   r   
_new_queue   s   

zChannel._new_queuec                 C   s   || j v rz	t| |}W n ty   d }Y nw | jjd|idtjfgdd}| jr1| 	| |d u r8t
 tt|d S )NrM   priorityT)querysortremovepayload)_fanout_queuesr#   _get_broadcast_cursorStopIterationr0   find_and_modifyr$   	ASCENDINGr3   _update_queues_expirer   r   r	   )r   rM   r'   r   r   r   _get   s    


zChannel._getc                    s>   | j s	t |S || jv r| | S | jd|i S NrM   )	r<   r=   _sizerT   rU   r   r0   findr   r   rM   rB   r   r   r\      s
   
zChannel._sizec                 K   s@   t ||| j|ddd}| jr| |d|d< | j| d S )NT)reverse)rS   rM   rO   zx-message-ttlrG   )r
   _get_message_priorityr3   rL   r0   insert)r   rM   messagerA   datar   r   r   _put   s   zChannel._putc                 K   s   | j t||d d S )N)rS   rM   )	broadcastra   r
   )r   exchangerb   routing_keyrA   r   r   r   _put_fanout   s   zChannel._put_fanoutc                 C   s:   |  |}|| jv r| |  |S | jd|i |S r[   )r\   rT   rU   r   r0   rR   )r   rM   sizer   r   r   _purge   s   

zChannel._purgec                 C   s:   t | jj| d }| jd|i}|t dd |D B S )Ntablerf   c                 s   s&    | ]}|d  |d |d fV  qdS )rg   patternrM   Nr   ).0rr   r   r   	<genexpr>   s
    
z$Channel.get_table.<locals>.<genexpr>)	frozensetstate	exchangesroutingr]   )r   rf   localRoutesbrokerRoutesr   r   r   	get_table   s   

zChannel.get_tablec                 C   sl   |  |jdkr| |||| || j|< ||||d}| }| jr+| |d|d< | jj||dd d S )Nfanout)rf   rM   rg   rl   rE   rG   TrH   )	typeoftype_create_broadcast_cursorrT   copyr3   rL   rs   rK   )r   rf   rg   rl   rM   lookuprc   r   r   r   _queue_bind   s   
zChannel._queue_bindc                    s   | j d|i | jr| jd|i t j|fi | || jv rAz| j|}W n
 t	y4   Y d S w |
  | j| d S d S )NrM   rD   )rs   rR   r3   rJ   r=   queue_deleterT   r>   popKeyErrorr    )r   rM   rA   r   rB   r   r   r~      s   
zChannel.queue_delete
mongodb://c                 C   s  | j j}|j}||s|| }|t|d  s|| j7 }|jrBd|vrB|d\}}|j}|jr8|d|j 7 }|d | d | }|j	rH|j	n| j
}t||}|d pW|j}	|	dv r_| j}	d| j| jrlt| jd nd d}
|
|d	  | |
}
||	|
fS )
N@z://:database)/NTi  )auto_start_requestr2   connectTimeoutMSrF   )
connectionr?   hostname
startswithlenr5   useridsplitpasswordportr6   r   	parse_urivirtual_hostr7   r2   r1   intrK   _prepare_client_options)r   schemer?   r   headtailcredentialsr   parseddbnamerF   r   r   r   
_parse_uri  s4   



zChannel._parse_uric                 C   sB   t jdkr|dd  t|dtrt jj}||d  |d< |S )N   r   readpreference)r$   version_tupler   
isinstancegetr   read_preferences_MONGOS_MODES)r   rF   modesr   r   r   r   5  s   
zChannel._prepare_client_optionsc                 K   s   t |fi |S r   r   )r   	argumentsrA   r   r   r   prepare_queue_arguments=  r   zChannel.prepare_queue_argumentsc                 C   s   | j |d\}}}||d< t }|dkrddlm} |  n|dkr,ddlm} |  tdi |}|| }	| d }
|
	d	d }
t
tt|
	d
}|dk rYtt|
| jrg|dk rgtt|
|	S )N)r   hostgeventr   )monkeyeventlet)monkey_patchversion-.)r   r   )   r   r   )r   r   r   r   	patch_allr   r   r   server_infor   tuplemapr   r   E_SERVER_VERSIONformatr3   E_NO_TTL_INDEXES)r   r   r   r   confenvr   r   	mongoconnr   version_strr   r   r   r   _open@  s&   
zChannel._openc                 C   s*   | j | v r	dS |j| j | jdd dS )z0Create capped collection for broadcast messages.NT)ri   capped)r:   collection_namescreate_collectionr4   r   r   r   r   r   _create_broadcast[  s   
zChannel._create_broadcastc                 C   s   || j  }|jg ddd || j dg || j }|ddg | jrC|jdgdd |jdgdd || j jdgdd d	S d	S )
zEnsure indexes on collections.)rM   r   )rO   r   )rD   r   T)
backgroundr   )rf   r   )rG   r   r   )expireAfterSecondsN)r8   ensure_indexr:   r9   r3   r;   )r   r   r0   rs   r   r   r   _ensure_indexesd  s   



zChannel._ensure_indexesc                 C   s    |   }| | | | |S )zActually creates connection.)r   r   r   r   r   r   r   _create_clientw  s   

zChannel._create_clientc                 C   s   |   S r   )r   r   r   r   r   r?     s   zChannel.clientc                 C      | j | j S r   )r?   r8   r   r   r   r   r0        zChannel.messagesc                 C   r   r   )r?   r9   r   r   r   r   rs     r   zChannel.routingc                 C   r   r   )r?   r:   r   r   r   r   re     r   zChannel.broadcastc                 C   r   r   )r?   r;   r   r   r   r   rJ     r   zChannel.queuesc              	   C   s6   z| j | W S  ty   | | j| d d | Y S w r   )r>   r   rz   rT   r^   r   r   r   rU     s   zChannel._get_broadcast_cursorc                 C   sR   t jdkrd|itjd}nd|idd}| jjdi |}t| }| j|< |S )Nr   rM   )filtercursor_typeT)rP   tailabler   )r$   r   r   TAILABLEre   r]   r   r>   )r   rf   rg   rl   rM   rP   r   retr   r   r   rz     s   
z Channel._create_broadcast_cursorc              	   C   sn   t |tr| jd|i}|sdS |d }n|}z|d | }W n ttfy,   Y dS w |  tj|d S )zGet expiration header named `argument` of queue definition.

        Note:
            `queue` must be either queue name or options itself.
        rD   NrF   r   )milliseconds)	r   r&   rJ   find_oner   	TypeErrorget_nowdatetime	timedelta)r   rM   argumentdocrc   valuer   r   r   rL     s   

zChannel._get_expirec                 C   sT   |  |d}|s
dS | jjd|idd|iidd | jjd|idd|iidd dS )	z,Update expiration field on queues documents.rE   NrM   z$setrG   T)multirD   )rL   rs   rK   rJ   )r   rM   rG   r   r   r   rY     s   
zChannel._update_queues_expirec                 C   s
   t j  S )zReturn current time in UTC.)r   utcnowr   r   r   r   r     s   
zChannel.get_now)r   )2r*   r+   r,   r-   supports_fanoutrT   r2   r3   r1   r4   r<   r5   r6   r7   r8   r9   r:   r;   r   r.   from_transport_optionsr   rN   rZ   r\   rd   rh   rj   rv   r}   r~   r   r   r   r   r   r   r   r   r?   r0   rs   re   rJ   rU   rz   rL   rY   r   __classcell__r   r   rB   r   r.   l   sb    
	

)
	




r.   c                   @   sn   e Zd ZdZeZdZdZejZej	j
ejf Z
ej	jejejf ZdZdZej	jjeg ddZdd	 Zd
S )	TransportzMongoDB Transport.Tr   mongodbr$   )directtopicrw   )exchange_typec                 C   s   t jS r   )r$   r   r   r   r   r   driver_version  s   zTransport.driver_versionN)r*   r+   r,   r-   r.   can_parse_urlpolling_intervalr6   r   r   connection_errorsr   ConnectionFailurechannel_errorsr%   driver_typedriver_name
implementsextendrp   r   r   r   r   r   r     s&    
r   )r-   r   rM   r   r$   r   r   r   pymongo.cursorr   kombu.exceptionsr   kombu.utils.compatr   kombu.utils.encodingr	   kombu.utils.jsonr
   r   kombu.utils.objectsr    r   baser   r   r   r   r.   r   r   r   r   r   <module>   s(    /  l