o
    ?Zc                     @   s|  d Z ddlZddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ ddlmZmZmZ ddlmZ ddlmZmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' zddl(Z(W n e)y   dZ(Y nw zddl(m*Z* W n e)y   dZ*Y nw edZ+e+j,e+j-Z.Z-dZ/dZ0dZ1g dZ2eddZ3dd Z4dd Z5G d d! d!e6Z7ed"d# Z8d$d% Z9G d&d' d'Z:G d(d) d)e:e(j;Z<G d*d+ d+e:e(j=j>Z?G d,d- d-e(j=j@ZAG d.d/ d/e'jBZBG d0d1 d1ZCG d2d3 d3e'jDZDG d4d5 d5e'jEZEe*r,G d6d7 d7e*jFe(jGZHG d8d9 d9eDZIG d:d; d;eEZJdS )<a  Redis transport module for Kombu.

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No

Connection String
=================
Connection string has the following format:

.. code-block::

    redis://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]
    rediss://[USER:PASSWORD@]REDIS_ADDRESS[:PORT][/VIRTUALHOST]

To use sentinel for dynamic Redis discovery,
the connection string has following format:

.. code-block::

    sentinel://[USER:PASSWORD@]SENTINEL_ADDRESS[:PORT]

Transport Options
=================
* ``sep``
* ``ack_emulation``: (bool) If set to True transport will
  simulate Acknowledge of AMQP protocol.
* ``unacked_key``
* ``unacked_index_key``
* ``unacked_mutex_key``
* ``unacked_mutex_expire``
* ``visibility_timeout``
* ``unacked_restore_limit``
* ``fanout_prefix``
* ``fanout_patterns``
* ``global_keyprefix``: (str) The global key prefix to be prepended to all keys
  used by Kombu
* ``socket_timeout``
* ``socket_connect_timeout``
* ``socket_keepalive``
* ``socket_keepalive_options``
* ``queue_order_strategy``
* ``max_connections``
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
    N)bisect)
namedtuple)contextmanager)Empty)time)promise)InconsistencyErrorVersionMismatch)
get_logger)register_after_fork)bytes_to_str)ERRREADpoll)accepts_argument)dumpsloads)cached_property)cycle_by_name)
_parse_url   )virtual)sentinelzkombu.transport.redisi     )r         	   error_classes_t)connection_errorschannel_errorsc               	   C   s^   ddl m}  t| dr| j}n| j}ttjjt	t
jtt| j| j| jf tjj|| j| jf S )z$Return tuple of redis error classes.r   
exceptionsInvalidData)redisr!   hasattrr"   	DataErrorr   r   	Transportr   r   socketerrorIOErrorOSErrorConnectionErrorAuthenticationErrorTimeoutErrorr   InvalidResponseResponseError)r!   r%    r0   K/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/redis.pyget_redis_error_classesw   s(   
r2   c                  C   s   ddl m}  | jS )z1Return the redis ConnectionError exception class.r   r    )r#   r!   r+   r    r0   r0   r1   get_redis_ConnectionError   s   r3   c                   @      e Zd ZdZdS )	MutexHeldz)Raised when another party holds the lock.N__name__
__module____qualname____doc__r0   r0   r0   r1   r5      s    r5   c                 c   s    | j ||d}d}z(|jdd}|rdV  nt W |r1z|  W dS  tjjy0   Y dS w dS |rGz|  W w  tjjyF   Y w w w )zTAcquire redis lock in non blocking way.

    Raise MutexHeld if not successful.
    timeoutF)blockingN)lockacquirer5   releaser#   r!   LockNotOwnedError)clientnameexpirer>   lock_acquiredr0   r0   r1   Mutex   s,   rF   c                 C      |    d S N)_after_forkchannelr0   r0   r1   _after_fork_cleanup_channel      rL   c                       sd   e Zd ZdZg dZdddddddddd	Zd
d Z fddZ fddZdddZ	  Z
S )GlobalKeyPrefixMixina  Mixin to provide common logic for global key prefixing.

    Overriding all the methods used by Kombu with the same key prefixing logic
    would be cumbersome and inefficient. Hence, we override the command
    execution logic that is called by all commands.
    )HDELHGETHSETLLENLPUSHPUBLISHRPUSHRPOPSADDSREMSETSMEMBERSZADDZREMZREVRANGEBYSCOREr   N)
args_startargs_end   r   )DELBRPOPEVALSHAc                    s   t |}|d}| jv r jt|d  |d< n<| jv rV j| d } j| d }|dkr7|d | ng }g }|d urE||d  }| fdd||| D  | }|g|S )Nr   r^   r_   c                       g | ]	} j t| qS r0   global_keyprefixstr.0argselfr0   r1   
<listcomp>       z5GlobalKeyPrefixMixin._prefix_args.<locals>.<listcomp>)listpopPREFIXED_SIMPLE_COMMANDSrg   rh   PREFIXED_COMPLEX_COMMANDS)rm   argscommandr^   r_   pre_args	post_argsr0   rl   r1   _prefix_args   s"   




z!GlobalKeyPrefixMixin._prefix_argsc                    sH   t  j||fi |}|dkr"|r"|\}}|t| jd }||fS |S )zParse a response from the Redis server.

        Method wraps ``redis.parse_response()`` to remove prefixes of keys
        returned by redis command.
        rc   N)superparse_responselenrg   )rm   
connectioncommand_nameoptionsretkeyvalue	__class__r0   r1   rz      s   z#GlobalKeyPrefixMixin.parse_responsec                       t  j| |i |S rH   ry   execute_commandrx   rm   rt   kwargsr   r0   r1   r         z$GlobalKeyPrefixMixin.execute_commandTc                 C   s   t | j| j||| jdS )Nrg   )PrefixedRedisPipelineconnection_poolresponse_callbacksrg   )rm   transaction
shard_hintr0   r0   r1   pipeline   s   zGlobalKeyPrefixMixin.pipeline)TN)r7   r8   r9   r:   rr   rs   rx   rz   r   r   __classcell__r0   r0   r   r1   rN      s    rN   c                   @   s    e Zd ZdZdd Zdd ZdS )PrefixedStrictRedisz@Returns a ``StrictRedis`` client that prefixes the keys it uses.c                 O   s,   | dd| _tjj| g|R i | d S Nrg    )rq   rg   r#   Redis__init__r   r0   r0   r1   r   	  s   zPrefixedStrictRedis.__init__c                 K   s   t | jfd| ji|S )Nrg   )PrefixedRedisPubSubr   rg   )rm   r   r0   r0   r1   pubsub  s   zPrefixedStrictRedis.pubsubN)r7   r8   r9   r:   r   r   r0   r0   r0   r1   r     s    r   c                   @   s   e Zd ZdZdd ZdS )r   a   Custom Redis pipeline that takes global_keyprefix into consideration.

    As the ``PrefixedStrictRedis`` client uses the `global_keyprefix` to prefix
    the keys it uses, the pipeline called by the client must be able to prefix
    the keys as well.
    c                 O   s.   | dd| _tjjj| g|R i | d S r   )rq   rg   r#   rB   Pipeliner   r   r0   r0   r1   r     s    zPrefixedRedisPipeline.__init__N)r7   r8   r9   r:   r   r0   r0   r0   r1   r     s    r   c                       sD   e Zd ZdZdZ fddZdd Z fddZ fd	d
Z  Z	S )r   zCRedis pubsub client that takes global_keyprefix into consideration.)	SUBSCRIBEUNSUBSCRIBE
PSUBSCRIBEPUNSUBSCRIBEc                    s$   | dd| _t j|i | d S r   )rq   rg   ry   r   r   r   r0   r1   r   ,  s   zPrefixedRedisPubSub.__init__c                    s8   t |}|d}| jv r fdd|D }|g|S )Nr   c                    re   r0   rf   ri   rl   r0   r1   rn   5  ro   z4PrefixedRedisPubSub._prefix_args.<locals>.<listcomp>)rp   rq   PUBSUB_COMMANDS)rm   rt   ru   r0   rl   r1   rx   0  s   



z PrefixedRedisPubSub._prefix_argsc                    sF   t  j|i |}|du r|S |^}}}|g fdd|D |S )zParse a response from the Redis server.

        Method wraps ``PubSub.parse_response()`` to remove prefixes of keys
        returned by redis command.
        Nc                    s   g | ]}|t  jd  qS rH   )r{   rg   )rj   rK   rl   r0   r1   rn   N  s    z6PrefixedRedisPubSub.parse_response.<locals>.<listcomp>)ry   rz   )rm   rt   r   r   message_typechannelsmessager   rl   r1   rz   <  s   z"PrefixedRedisPubSub.parse_responsec                    r   rH   r   r   r   r0   r1   r   R  r   z#PrefixedRedisPubSub.execute_command)
r7   r8   r9   r:   r   r   rx   rz   r   r   r0   r0   r   r1   r   "  s    r   c                       s   e Zd ZdZdZ fddZ fddZd#dd	Z fd
dZd$ddZ	e
d%ddZd#ddZd&ddZd'ddZedd Zedd Zedd Zedd  Zed!d" Z  ZS )(QoSzRedis Ack Emulation.Tc                    s   t  j|i | d| _d S )Nr   )ry   r   _vrestore_countr   r   r0   r1   r   [  s   
zQoS.__init__c              	      s   |j }|d |d }}tjd dkr|t ig}nt |g}|  (}|j| jg|R  | j|t	|j
||g  t || W d    d S 1 sNw   Y  d S )Nexchangerouting_keyr   r   )delivery_infor#   VERSIONr   pipe_or_acquirezaddunacked_index_keyhsetunacked_keyr   _rawexecutery   append)rm   r   delivery_tagdeliveryEXRK	zadd_argspiper   r0   r1   r   _  s   

"z
QoS.appendNc                 C   sT   | j |}| jD ]	}| j||d q
W d    n1 sw   Y  | j  d S )NrB   )rK   conn_or_acquire
_deliveredrestore_by_tagclear)rm   rB   tagr0   r0   r1   restore_unackedp  s   
zQoS.restore_unackedc                    s   |  |  t | d S rH   )_remove_from_indicesr   ry   ack)rm   r   r   r0   r1   r   v  s   zQoS.ackFc                 C   s    |r	| j |dd | | d S NT)leftmost)r   r   )rm   r   requeuer0   r0   r1   rejectz     z
QoS.rejectc                 c   sL    |r|V  d S | j |}| V  W d    d S 1 sw   Y  d S rH   )rK   r   r   )rm   r   rB   r0   r0   r1   r     s   
"zQoS.pipe_or_acquirec                 C   sF   |  |}|| j|| j|W  d    S 1 sw   Y  d S rH   )r   zremr   hdelr   )rm   r   r   r0   r0   r1   r     s   
$zQoS._remove_from_indicesr   
   c           	   
   C   s   |  j d7  _ | j d | rd S | j X}t | j }z7t|| j| j% |j| j	|d|o/||dd}|p7g D ]
\}}| 
|| q8W d    n1 sMw   Y  W n	 ty\   Y n	w W d    d S W d    d S 1 spw   Y  d S )Nr   r   T)startnum
withscores)r   rK   r   r   visibility_timeoutrF   unacked_mutex_keyunacked_mutex_expirezrevrangebyscorer   r   r5   )	rm   r   r   intervalrB   ceilvisibler   scorer0   r0   r1   restore_visible  s2   
"zQoS.restore_visiblec                    sP    fdd}j |}||j W d    d S 1 s!w   Y  d S )Nc                    sT   |  j}|   |  |r(tt|\}}}j||||   d S d S rH   )hgetr   multir   r   r   rK   _do_restore_message)r   pMr   r   r   rm   r   r0   r1   restore_transaction  s   z/QoS.restore_by_tag.<locals>.restore_transaction)rK   r   r   r   )rm   r   rB   r   r   r0   r   r1   r     s   "zQoS.restore_by_tagc                 C      | j jS rH   )rK   r   rl   r0   r0   r1   r        zQoS.unacked_keyc                 C   r   rH   )rK   r   rl   r0   r0   r1   r     r   zQoS.unacked_index_keyc                 C   r   rH   )rK   r   rl   r0   r0   r1   r     r   zQoS.unacked_mutex_keyc                 C   r   rH   )rK   r   rl   r0   r0   r1   r     r   zQoS.unacked_mutex_expirec                 C   r   rH   )rK   r   rl   r0   r0   r1   r     r   zQoS.visibility_timeoutrH   FNN)r   r   r   )NF)r7   r8   r9   r:   restore_at_shutdownr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r0   r0   r   r1   r   V  s.    








r   c                   @   s   e Zd ZdZeeB ZdZdZdd Z	dd Z
dd	 Zd
d Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd d! Zd"d# Zd(d$d%Zed&d' ZdS ))MultiChannelPollerz%Async I/O poller for Redis transport.FNc                 C   s(   t  | _i | _i | _t | _t  | _d S rH   )set	_channels_fd_to_chan_chan_to_sockr   poller
after_readrl   r0   r0   r1   r     s
   zMultiChannelPoller.__init__c              
   C   sX   | j  D ]}z| j| W q ttfy   Y qw | j  | j  | j   d S rH   )	r   valuesr   
unregisterKeyError
ValueErrorr   r   r   )rm   fdr0   r0   r1   close  s   

zMultiChannelPoller.closec                 C      | j | d S rH   )r   addrm   rK   r0   r0   r1   r        zMultiChannelPoller.addc                 C   r   rH   )r   discardr   r0   r0   r1   r     r   zMultiChannelPoller.discardc              	   C   s.   z
| j |j W d S  ttfy   Y d S w rH   )r   r   _sockAttributeError	TypeErrorrm   r|   r0   r0   r1   _on_connection_disconnect  s
   z,MultiChannelPoller._on_connection_disconnectc                 C   sr   |||f| j v r| ||| |jjd u r|j  |jj}||f| j| < || j |||f< | j|| j	 d S rH   )
r   _unregisterr|   r   connectr   filenor   register
eventflags)rm   rK   rB   typesockr0   r0   r1   	_register  s   
zMultiChannelPoller._registerc                 C   s   | j | j|||f  d S rH   )r   r   r   )rm   rK   rB   r   r0   r0   r1   r     s   zMultiChannelPoller._unregisterc                 C   s:   t |dd d u r|jd|_|jjd uo|||f| jv S )Nr|   _)getattrr   get_connectionr|   r   r   )rm   rK   rB   cmdr0   r0   r1   _client_registered  s
   z%MultiChannelPoller._client_registeredc                 C   sB   ||j df}| ||j dsd|_| j|  |js|  dS dS )zEnable BRPOP mode for channel.rc   FN)rB   r  _in_pollr   _brpop_start)rm   rK   identr0   r0   r1   _register_BRPOP  s   
z"MultiChannelPoller._register_BRPOPc                 C   s<   |  ||jdsd|_| ||jd |js|  dS dS )zEnable LISTEN mode for channel.LISTENFN)r  	subclient
_in_listenr   
_subscriber   r0   r0   r1   _register_LISTEN  s   z#MultiChannelPoller._register_LISTENc                 C   s:   | j D ]}|jr|j r| | |jr| | qd S rH   )r   active_queuesqoscan_consumer  active_fanout_queuesr  r   r0   r0   r1   on_poll_start  s   



z MultiChannelPoller.on_poll_startc                 C   s(   || _ | jD ]}|jj|jd  S d S N)r   )r   r   r  r   unacked_restore_limit)rm   r   rK   r0   r0   r1   on_poll_init  s   

zMultiChannelPoller.on_poll_initc                 C   s*   | j D ]}|jr|jj|jd  S qd S r  )r   r  r  r   r  r   r0   r0   r1   maybe_restore_messages  s   

z)MultiChannelPoller.maybe_restore_messagesc                 C   s<   | j D ]}|jd}|d urtt|dd r|  qd S )Nr  check_health)r   __dict__getcallabler   r  )rm   rK   rB   r0   r0   r1   maybe_check_subclient_health'  s   
z/MultiChannelPoller.maybe_check_subclient_healthc                 C   s,   | j | \}}|j r|j|   d S d S rH   )r   r  r  handlers)rm   r   chanr   r0   r0   r1   on_readable/  s   
zMultiChannelPoller.on_readablec                 C   s>   |t @ r| || fS |t@ r| j| \}}|| d S d S rH   )r   r  r   r   _poll_error)rm   r   eventr  r   r0   r0   r1   handle_event4  s   zMultiChannelPoller.handle_eventc           	      C   s   d| _ z]| jD ]}|jr|j r| | |jr| | q| j	|}|rZ|D ]0\}}| 
||}|rY W d| _ | jrWz| j }W n
 tyN   Y d S w |  | js=d S d S q)|   t d| _ | jr~z| j }W n	 tyw   Y w w |  | jsgw )NTF)_in_protected_readr   r  r  r  r  r  r  r   r   r  r   rq   r   r  r   )	rm   callbackr<   rK   eventsr   r  r   funr0   r0   r1   r  ;  sJ   



zMultiChannelPoller.getc                 C   s   | j S rH   )r   rl   r0   r0   r1   fdsY  s   zMultiChannelPoller.fdsrH   )r7   r8   r9   r:   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r  r  r  r  r  r  r  r  propertyr$  r0   r0   r0   r1   r     s0    

	
r   c                       s  e Zd ZdZeZdZdZdZdZdZ	dZ
dZdZdZi ZdZdZd	Zd
ZdZdZdZeZdZdZdZdZdZdZeZdZ dZ!dZ"dZ#dZ$dZ%e&j'j(d Z(e)rQe)j*ndZ+e)rXe)j,ndZ- fddZ.dd Z/dd Z0dd Z1	drddZ2dr fdd	Z3dd Z4 fdd Z5d!d" Z6 fd#d$Z7d%d& Z8d'd( Z9d)d* Z:d+d, Z;d-d. Z<d/d0 Z=d1d2 Z>dsd4d5Z?d6d7 Z@d8d9 ZAd:d; ZBd<d= ZCd>d? ZDd@dA ZEdBdC ZFdDdE ZGdrdFdGZHdHdI ZIdJdK ZJdLdM ZKdNdO ZLdPdQ ZM fdRdSZNdTdU ZOdVdW ZP		dtdXdYZQdrdZd[ZRdrd\d]ZSdrd^d_ZTd`da ZUeVdudbdcZWeXddde ZYeXdfdg ZZe[dhdi Z\e[djdk Z]dldm Z^dndo Z_eXdpdq Z`  ZaS )vChannelzRedis Channel.NFTz_kombu.binding.%sz/{db}.zunackedunacked_indexunacked_mutexi,  i  r   r   round_robin)sepack_emulationr   r   r   r   r   r  fanout_prefixfanout_patternsrg   socket_timeoutsocket_connect_timeoutsocket_keepalivesocket_keepalive_optionsqueue_order_strategymax_connectionshealth_check_intervalretry_on_timeoutpriority_stepsc                    s   t  j|i | | jstj| _t| j | _|  | _	| 
 | _t | _t | _i | _| j| jd| _| jrBt| jtrA| j| _nd| _z| j  W n tyX   |    w | jj|  | jj| _td urpt| t  d S d S )N)rc   r  r   )!ry   r   r,  r   r   r   r3  _queue_cycle_get_clientClient_get_response_errorr/   r   r  auto_delete_queues_fanout_to_queue_brpop_read_receiver  r-  
isinstancerh   keyprefix_fanoutrB   ping	Exception_disconnect_poolsr|   cycler   r   r   rL   r   r   r0   r1   r     s4   


zChannel.__init__c                 C   rG   rH   )rD  rl   r0   r0   r1   rI     rM   zChannel._after_forkc                 C   s@   | j }| j}d  | _| _ |d ur|  |d ur|  d S d S rH   )_pool_async_pool
disconnect)rm   pool
async_poolr0   r0   r1   rD    s   zChannel._disconnect_poolsc                 C   sH   | j |u rd | _ | j|u rd | _| jr | jjr"| jj| d S d S d S rH   )r  r	  r|   rE  r   r   r0   r0   r1   r     s   

z!Channel._on_connection_disconnectc                 C   s   z3zd|d d< d|d d d< W n	 t y   Y nw | ||D ]}|r(|jn|j|t| q!W d S  tyD   td|dd Y d S w )NTheadersredelivered
propertiesr   zCould not restore message: %rexc_info)r   _lookuplpushrpushr   rC  crit)rm   payloadr   r   r   r   queuer0   r0   r1   r     s   zChannel._do_restore_messagec                    sd   j s	t |S |j fdd} }||j W d    d S 1 s+w   Y  d S )Nc                    sT   |  j}|   | j |r(tt|\}}}||||   d S d S rH   )r   r   r   r   r   r   r   )r   Pr   r   r   r   r0   r1   r     s   z-Channel._restore.<locals>.restore_transaction)r,  ry   _restorer   r   r   r   )rm   r   r   r   rB   r   r   r1   rW    s   
"zChannel._restorec                 C   s   | j |ddS r   )rW  )rm   r   r0   r0   r1   _restore_at_beginning$  s   zChannel._restore_at_beginningc                    sT   || j v r| j | \}}| j| || j|< t j|g|R i |}|   |S rH   )_fanout_queuesr  r   r=  ry   basic_consume_update_queue_cycle)rm   rU  rt   r   r   r   r   r   r0   r1   rZ  '  s   

zChannel.basic_consumec                 C   s8   | j }|r|jjr|jjt| j|fS | |S d S rH   )r|   rE  r   r   r   r   _basic_cancel)rm   consumer_tagr|   r0   r0   r1   basic_cancel;  s   
zChannel.basic_cancelc                    s   z| j | }W n
 ty   Y d S w z| j| W n	 ty#   Y nw | | z| j| \}}| j| W n	 tyA   Y nw t 	|}| 
  |S rH   )_tag_to_queuer   r  remove_unsubscribe_fromrY  r=  rq   ry   r^  r[  )rm   r]  rU  r   r   r   r   r0   r1   r\  H  s(   
zChannel._basic_cancelc                 C   s.   |r| j rd| j|d|gS d| j|gS )Nr   /)r.  joinrA  )rm   r   r   r0   r0   r1   _get_publish_topic\  s   
zChannel._get_publish_topicc                 C   s   | j | \}}| ||S rH   )rY  rd  )rm   rU  r   r   r0   r0   r1   _get_subscribe_topica  s   zChannel._get_subscribe_topicc                    sN    fdd j D }|sd S  j}|jjd u r|j  |j _|| d S )Nc                    s   g | ]}  |qS r0   )re  rj   rU  rl   r0   r1   rn   f  s    z&Channel._subscribe.<locals>.<listcomp>)r  r  r|   r   r   r	  
psubscribe)rm   keyscr0   rl   r1   r
  e  s   

zChannel._subscribec                 C   s6   |  |}| j}|jr|jjr||g d S d S d S rH   )re  r  r|   r   unsubscribe)rm   rU  topicri  r0   r0   r1   ra  p  s
   
zChannel._unsubscribe_fromc                 C   s   t |d dkr|d dkrd|_d S t |d dkr.|d |d |d |d f\}}}}n|d d |d |d f\}}}}||||dS )	Nr   rj  ra   Fpmessager   r   )r   patternrK   data)r   
subscribed)rm   rB   rr   rm  rK   rn  r0   r0   r1   _handle_messagev  s   & zChannel._handle_messagec                 C   sz   | j }g }z
|| | W n	 ty   Y nw |jd ur9|jjddr9|| | |jd ur9|jjdds%t|S )Nr   r;   )r  r   _receive_oner   r|   can_readany)rm   ri  r   r0   r0   r1   r?    s   zChannel._receivec              	   C   s  d }z|  }W n | jy   d | _ w t|ttfr|| ||}t|d dr~t|d }|d r|d dkrC|	d\}}}z
t
t|d }W n ttfyg   td|t|d d	 d
d t w |dd
d }| j|| j|  dS d S d S d S )Nr   r   rK   rn  r   rb  .z&Cannot process event on channel %r: %si   r   rN  T)rz   r   r	  r@  rp   tuplerq  r   endswith	partitionr   r   r   warnreprr   splitr|   _deliverr=  )rm   ri  responserT  rK   r   r   r   r0   r0   r1   rr    s<   
zChannel._receive_oner   c                    sr   j tj  sd S  fddjD |pdg }jj_dg|}jr0j	|}jjj
|  d S )Nc                    s"   g | ]} D ]} ||qqS r0   )
_q_for_pri)rj   prirU  queuesrm   r0   r1   rn     s
    z(Channel._brpop_start.<locals>.<listcomp>r   rc   )r8  consumer{   r  r7  rB   r|   r  rg   rx   send_command)rm   r<   rh  command_argsr0   r  r1   r    s   

zChannel._brpop_startc                 K   s   zJz| j j| j jdfi |}W n | jy   | j j   w |rH|\}}t|| jdd }| j	| | j
tt|| W d | _dS t d | _w )Nrc   r   r   T)rB   rz   r|   r   rH  r   rsplitr+  r8  rotater|  r   r  r   )rm   r~   
dest__itemdestitemr0   r0   r1   r>    s(   

zChannel._brpop_readc                 K   s,   |dkr| j   d S | j| jj| d S )Nr  )r  rz   rB   r|   )rm   r   r~   r0   r0   r1   r    s   zChannel._poll_errorc                 C   sd   |   $}| jD ]}|| ||}|r$tt|  W  d    S qt 1 s+w   Y  d S rH   )r   r7  rpopr~  r   r   r   )rm   rU  rB   r  r  r0   r0   r1   _get  s   

zChannel._getc              	   C   s   |   @}| +}| jD ]}|| ||}q| }tdd |D W  d    W  d    S 1 s7w   Y  W d    d S 1 sGw   Y  d S )Nc                 s   s     | ]}t |tjr|V  qd S rH   )r@  numbersIntegral)rj   sizer0   r0   r1   	<genexpr>  s    
z Channel._size.<locals>.<genexpr>)r   r   r7  llenr~  r   sum)rm   rU  rB   r   r  sizesr0   r0   r1   _size  s   


"zChannel._sizec                 C   s$   |  |}|r| | j | S |S rH   )priorityr+  )rm   rU  r  r0   r0   r1   r~    s   
zChannel._q_for_pric                 C   s   | j }|t||d  S )Nr   )r7  r   )rm   nstepsr0   r0   r1   r    s   zChannel.priorityc                 K   sT   | j |dd}|  }|| ||t| W d   dS 1 s#w   Y  dS )zDeliver message.F)reverseN)_get_message_priorityr   rQ  r~  r   )rm   rU  r   r   r  rB   r0   r0   r1   _put  s   
"zChannel._putc                 K   sF   |   }|| ||t| W d   dS 1 sw   Y  dS )zDeliver fanout message.N)r   publishrd  r   )rm   r   r   r   r   rB   r0   r0   r1   _put_fanout  s   

"zChannel._put_fanoutc                 K   s   |r
| j | d S d S rH   )r<  r   )rm   rU  auto_deleter   r0   r0   r1   
_new_queue  s   zChannel._new_queuec              	   C   s   |  |jdkr||ddf| j|< |   }|| j|f | j|p%d|p(d|p+dg W d    d S 1 s:w   Y  d S )Nfanout#*r   )	typeofr   replacerY  r   saddkeyprefix_queuer+  rc  )rm   r   r   rm  rU  rB   r0   r0   r1   _queue_bind  s   

"zChannel._queue_bindc           
   	   O   s   | j | | j|ddO}|| j|f | j|pd|p d|p#dg | }| j	D ]}	|
| ||	}q/|  W d    n1 sIw   Y  W d    d S W d    d S 1 saw   Y  d S )NrB   r   r   )r<  r   r   r  sremr  r+  rc  r   r7  deleter~  r   )
rm   rU  r   r   rm  rt   r   rB   r   r  r0   r0   r1   _delete  s    


"zChannel._deletec              	   K   s   |   9}| $}| jD ]}|| ||}qt| W  d    W  d    S 1 s0w   Y  W d    d S 1 s@w   Y  d S rH   )r   r   r7  existsr~  rt  r   )rm   rU  r   rB   r   r  r0   r0   r1   
_has_queue  s   



"zChannel._has_queuec                    sh    j | }  !}||}|sg W  d    S  fdd|D W  d    S 1 s-w   Y  d S )Nc                    s    g | ]}t t| jqS r0   )rv  r   r{  r+  )rj   valrl   r0   r1   rn   )  s     z%Channel.get_table.<locals>.<listcomp>)r  r   smembers)rm   r   r   rB   r   r0   rl   r1   	get_table!  s   


$zChannel.get_tablec              	   C   s   |   E}| 0}| jD ]}| ||}|||}q| }t|d d d W  d    W  d    S 1 s<w   Y  W d    d S 1 sLw   Y  d S )Nra   )r   r   r7  r~  r  r  r   r  )rm   rU  rB   r   r  priqr  r0   r0   r1   _purge+  s   


"zChannel._purgec                    sp   d| _ | js1| jj|  | jd}|d ur)| jD ]}|| jv r(| j	||d q| 
  |   t   d S )NTrB   r   )_closingclosedr|   rE  r   r  r  rY  r<  queue_deleterD  _close_clientsry   r   )rm   rB   rU  r   r0   r1   r   4  s   

zChannel.closec                 C   sL   dD ]!}z| j | }|jd }|_|  W q tt| jfy#   Y qw d S )N)rB   r  )r  r|   rH  r   r   r/   )rm   attrrB   r|   r0   r0   r1   r  D  s   
zChannel._close_clientsc                 C   sd   t |tjs0|r|dkrt}n|dr|dd  }zt|}W |S  ty/   td|w |S )Nrb  r   z/Database is int between 0 and limit - 1, not {})r@  r  r  
DEFAULT_DB
startswithintr   format)rm   vhostr0   r0   r1   _prepare_virtual_hostN  s    

zChannel._prepare_virtual_hostc                 K   s   |S rH   r0   )rm   r1  r2  paramsr0   r0   r1   _filter_tcp_connparams]  s   zChannel._filter_tcp_connparamsc                    s  | j j}|jpd|jp| j j|j|j|j| j| j	| j
| j| j| j| jd}| j}t|dr8t|jds8|d |jrRz||j | j|d< W n	 tyQ   Y nw |d }d|v rt|\}}}}}	}
}|dkr| jdi |}|jtjd	|
 d
fi | |dd  |dd  |dd  ||d< |	|d< |dd  |dd  | |dd |d< |  |dp| j}|rG  fddd|}|}||d< |S )Nz	127.0.0.1)hostportvirtual_hostusernamepasswordr4  r/  r0  r1  r2  r5  r6  r   r5  connection_classr  z://r'   rb  )r  pathr0  r1  r2  r  r  r  r  dbc                       s   e Zd Z fddZ  ZS )z'Channel._connparams.<locals>.Connectionc                    s   t    |  d S rH   )ry   rH  r   rl   )r   rK   r0   r1   rH    s   
z2Channel._connparams.<locals>.Connection.disconnect)r7   r8   r9   rH  r   r0   rJ   r   r1   
Connection  s    r  r0   )r|   rB   hostnamer  default_portr  useridr  r4  r/  r0  r1  r2  r5  r6  r  r$   r   r   rq   sslupdateconnection_class_sslr   r   r  r#   UnixDomainSocketConnectionr  r  )rm   asynchronousconninfo
connparams
conn_classr  schemer   r  r  r  queryconnection_clsr  r0   rJ   r1   _connparamsa  sr   



zChannel._connparamsc                 C   s    |r	| j | jdS | j | jdS )N)r   )r:  rJ  rI  rm   r  r0   r0   r1   _create_client  r   zChannel._create_clientc                 C   s0   | j |d}| jj|d d| _tjdi |S )Nr  r  )r  r0   )r  rA  r  r#   ConnectionPool)rm   r  r  r0   r0   r1   	_get_pool  s   zChannel._get_poolc                 C   s4   t jdk rtdt | jrtjt| jdS t jS )N)r   ra   r   zSRedis transport requires redis-py versions 3.2.0 or later. You have {0.__version__}r   )	r#   r   r	   r  rg   	functoolspartialr   StrictRedisrl   r0   r0   r1   r9    s   
zChannel._get_clientc                 c   s    |r|V  d S |   V  d S rH   r  rm   rB   r0   r0   r1   r     s   
zChannel.conn_or_acquirec                 C   s   | j d u r
|  | _ | j S rH   )rF  r  rl   r0   r0   r1   rI    s   

zChannel.poolc                 C   s   | j d u r| jdd| _ | j S )NTr  )rG  r  rl   r0   r0   r1   rJ    s   
zChannel.async_poolc                 C   s   | j ddS )z+Client used to publish messages, BRPOP etc.Tr  r  rl   r0   r0   r1   rB     s   zChannel.clientc                 C   s   | j dd}| S )z1Pub/Sub connection used to consume fanout queues.Tr  )r  r   r  r0   r0   r1   r    s   zChannel.subclientc                 C   s   | j | j d S rH   )r8  r  r  rl   r0   r0   r1   r[    s   zChannel._update_queue_cyclec                 C   s   ddl m} |jS )Nr   r    )r#   r!   r/   )rm   r!   r0   r0   r1   r;    s   zChannel._get_response_errorc                    s    fdd j D S )z<Set of queues being consumed from (excluding fanout queues).c                    s   h | ]	}| j vr|qS r0   )r  rf  rl   r0   r1   	<setcomp>  s    
z(Channel.active_queues.<locals>.<setcomp>)_active_queuesrl   r0   rl   r1   r    s   zChannel.active_queuesr   )r   r   rH   )br7   r8   r9   r:   r   _client
_subclientr  supports_fanoutr  rA  r+  r  r	  rY  r,  r   r   r   r   r  r   PRIORITY_STEPSr7  r/  r0  r1  r2  r6  r4  DEFAULT_HEALTH_CHECK_INTERVALr5  r-  r.  rg   r3  rG  rF  r   r&  from_transport_optionsr#   r  r  SSLConnectionr  r   rI   rD  r   r   rW  rX  rZ  r^  r\  rd  re  r
  ra  rq  r?  rr  r  r>  r  r  r  r~  r  r  r  r  r  r  r  r  r  r   r  r  r  r  r  r  r9  r   r   r%  rI  rJ  r   rB   r  r[  r;  r  r   r0   r0   r   r1   r&  ^  s    %	

	

	



H




r&  c                       st   e Zd ZdZeZdZeZdZdZ	e
jjjdeg ddZer$e \ZZ fddZd	d
 Zdd Zdd Z  ZS )r&   zRedis Transport.Nr#   T)directrk  r  )r  exchange_typec                    s.   t d u rtdt j|i | t | _d S )Nz)Missing redis library (pip install redis))r#   ImportErrorry   r   r   rE  r   r   r0   r1   r     s   zTransport.__init__c                 C   s   t jS rH   )r#   __version__rl   r0   r0   r1   driver_version  s   zTransport.driver_versionc                    s   | j j jj | jfdd}|_ fddj 	dj
 |jjdt}	|j d S )Nc                    sD   | j r	| j   jr z	j W d S  ty   Y d S w d S rH   )r   r`  r$  on_tickr   )r|   )rE  loopr  r0   r1   _on_disconnect  s   z:Transport.register_with_event_loop.<locals>._on_disconnectc                      s       fddj D  d S )Nc                    s   g | ]} ||qS r0   r0   )rj   r   )
add_readerr  r0   r1   rn      s    zMTransport.register_with_event_loop.<locals>.on_poll_start.<locals>.<listcomp>)r$  r0   )r  rE  cycle_poll_startr  r0   r1   r    s   z9Transport.register_with_event_loop.<locals>.on_poll_startr   r5  )rE  r  r   r  r  r  r   r  r   call_repeatedlyr  rB   transport_optionsr  r  r  )rm   r|   r  r  r5  r0   )r  rE  r  r  r  r  r1   register_with_event_loop
  s$   z"Transport.register_with_event_loopc                 C   s   | j | dS )z1Handle AIO event for one of our file descriptors.N)rE  r  )rm   r   r0   r0   r1   r  ,  s   zTransport.on_readable)r7   r8   r9   r:   r&  polling_intervalDEFAULT_PORTr  driver_typedriver_namer   r&   
implementsextend	frozensetr#   r2   r   r   r   r  r  r  r   r0   r0   r   r1   r&     s"    

"r&   c                   @   r4   )SentinelManagedSSLConnectionzConnect to a Redis server using Sentinel + TLS.

        Use Sentinel to identify which Redis server is the current master
        to connect to and when connecting to the Master server, use an
        SSL Connection.
        Nr6   r0   r0   r0   r1   r  2  s    r  c                   @   sH   e Zd ZdZejd ZerejndZere	ndZ
d	ddZd	ddZdS )
SentinelChannela  Channel with explicit Redis Sentinel knowledge.

    Broker url is supposed to look like:

    .. code-block::

        sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

    where each sentinel is separated by a `;`.

    Other arguments for the sentinel should come from the transport options
    (see `transport_options` of :class:`~kombu.connection.Connection`).

    You must provide at least one option in Transport options:
     * `master_name` - name of the redis group to poll

    Example:

    .. code-block:: python

        >>> import kombu
        >>> c = kombu.Connection(
             'sentinel://sentinel1:26379;sentinel://sentinel2:26379',
             transport_options={'master_name': 'mymaster'}
        )
        >>> c.connect()
    )master_namemin_other_sentinelssentinel_kwargsNFc           	      C   s   |  |}| }|dd  |dd  g }| jjjD ]}t|}|jdkr6|jp-| jj	}|
|j|f q|sD|
|d |d f tj|ft| ddt| dd d|}t| dd }|d u rftd	||| jjS )
Nr  r  r   r  r   r   )r  r   r  z1'master_name' transport option must be specified.)r  copyrq   r|   rB   altr   r  r  r  r   r  r   Sentinelr   r   
master_forr:  r   )	rm   r  r  additional_params	sentinelsurlr  sentinel_instr  r0   r0   r1   _sentinel_managed_poold  s@   



z&SentinelChannel._sentinel_managed_poolc                 C   s
   |  |S rH   )r	  r  r0   r0   r1   r    s   
zSentinelChannel._get_poolr   )r7   r8   r9   r:   r&  r  r   SentinelManagedConnectionr  r  r  r	  r  r0   r0   r0   r1   r  ?  s    

%r  c                   @   s   e Zd ZdZdZeZdS )SentinelTransportzRedis Sentinel Transport.ig  N)r7   r8   r9   r:   r  r  r&  r0   r0   r0   r1   r    s    r  )Kr:   r  r  r'   r   collectionsr   
contextlibr   rU  r   r   viner   kombu.exceptionsr   r	   	kombu.logr
   kombu.utils.compatr   kombu.utils.encodingr   kombu.utils.eventior   r   r   kombu.utils.functionalr   kombu.utils.jsonr   r   kombu.utils.objectsr   kombu.utils.schedulingr   kombu.utils.urlr   r   r   r#   r  r   loggercriticalry  rS  r  r  r  r  r   r2   r3   rC  r5   rF   rL   rN   r   r   rB   r   r   PubSubr   r   r   r&  r&   r
  r  r  r  r  r0   r0   r0   r1   <module>   s    5

Q4i       D
N