o
    ?Zc                     @   s~   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	 ddl
mZ dd	lmZ d
d ZG dd deZG dd dZdS )z%Generic resource pool implementation.    N)deque)Empty)	LifoQueue   )
exceptions)register_after_fork)lazyc                 C   s$   z|    W d S  ty   Y d S w N)force_close_all	Exception)resource r   D/var/www/chikooza/env/lib/python3.10/site-packages/kombu/resource.py_after_fork_cleanup_resource   s
   r   c                   @   s   e Zd ZdZdd ZdS )r   z#Last in first out version of Queue.c                 C   s   t  | _d S r	   )r   queue)selfmaxsizer   r   r   _init      zLifoQueue._initN)__name__
__module____qualname____doc__r   r   r   r   r   r      s    r   c                   @   s   e Zd ZdZejZdZd&ddZdd Zdd	 Z	d'd
dZ
dd Zdd Zdd Zdd Zdd Zdd Zdd Zd(ddZd)ddZedd  Zejd!d  Zejd"rfe
ZeZd#Zd$d Z
d%d ZdS dS )*ResourcezPool of resources.FNc                 C   s^   || _ |pd| _d| _|d ur|n| j| _t | _t | _| jr)td ur)t| t	 | 
  d S )Nr   F)_limitpreload_closedclose_after_forkr   	_resourceset_dirtyr   r   setup)r   limitr   r   r   r   r   __init__"   s   

zResource.__init__c                 C   s   t d)Nzsubclass responsibility)NotImplementedErrorr   r   r   r   r!   1   s   zResource.setupc                 C   s6   | j rt| j| j kr| | j | j|   d S r	   )r"   lenr    LimitExceededr   
put_nowaitnewr%   r   r   r   _add_when_empty4   s   zResource._add_when_emptyc                    s   j rtdjrM	 z
jj||d W n ty"     Y n)w z  W n tyC   t	 t
r=j      w j  nqn   fdd}| _ S )av  Acquire resource.

        Arguments:
            block (bool): If the limit is exceeded,
                then block until there is an available item.
            timeout (float): Timeout to wait
                if ``block`` is true.  Default is :const:`None` (forever).

        Raises:
            LimitExceeded: if block is false and the limit has been exceeded.
        zAcquire on closed poolr   )blocktimeoutc                      s      dS )a  Release resource so it can be used by another thread.

            Warnings:
                The caller is responsible for discarding the object,
                and to never use the resource again.  A new resource must
                be acquired if so needed.
            N)releaser   Rr   r   r   r-   `   s   z!Resource.acquire.<locals>.release)r   RuntimeErrorr"   r   getr   r*   prepareBaseException
isinstancer   r(   r-   r    addr)   )r   r+   r,   r-   r   r.   r   acquire<   s4   

	zResource.acquirec                 C   s   |S r	   r   r   r   r   r   r   r2   m      zResource.preparec                 C   s   |   d S r	   )closer7   r   r   r   close_resourcep   r   zResource.close_resourcec                 C      d S r	   r   r7   r   r   r   release_resources   r8   zResource.release_resourcec                 C   s    | j r	| j| | | dS )zqReplace existing resource with a new instance.

        This can be used in case of defective resources.
        N)r"   r    discardr:   r7   r   r   r   replacev   s   zResource.replacec                 C   s:   | j r| j| | j| | | d S | | d S r	   )r"   r    r=   r   r(   r<   r:   r7   r   r   r   r-      s
   zResource.releasec                 C   r;   r	   r   r7   r   r   r   collect_resource   r8   zResource.collect_resourcec                 C   s   | j rdS d| _ | j}| j}	 z| }W n	 ty   Y nw z| | W n	 ty/   Y nw q	 z|j }W n
 tyC   Y dS w z| | W n	 tyT   Y nw q2)zClose and remove all resources in the pool (also those in use).

        Used to close resources from parent processes after fork
        (e.g. sockets/connections).
        NT)	r   r    r   popKeyErrorr?   AttributeErrorr   
IndexError)r   dirtyr   dresresr   r   r   r
      s:   	zResource.force_close_allc                 C   s   | j }| jr"d|  k r| j k r"n n|s"|s td| j |d}|| _ |r7z|   W n	 ty6   Y nw |   ||k rI| j|dkd d S d S )Nr   z,Can't shrink pool when in use: was={} now={}T)collect)r   r    r0   formatr
   r   r!   _shrink_down)r   r"   forceignore_errorsreset
prev_limitr   r   r   resize   s(   $zResource.resizeTc                 C   s   G dd d}| j }t|d| - t|j| jkr6|j }|r&| | t|j| jksW d    d S W d    d S 1 sAw   Y  d S )Nc                   @   s   e Zd Zdd Zdd ZdS )z#Resource._shrink_down.<locals>.Noopc                 S   r;   r	   r   r%   r   r   r   	__enter__   r8   z-Resource._shrink_down.<locals>.Noop.__enter__c                 S   r;   r	   r   )r   typevalue	tracebackr   r   r   __exit__   r8   z,Resource._shrink_down.<locals>.Noop.__exit__N)r   r   r   rO   rS   r   r   r   r   Noop   s    rT   mutex)r   getattrr&   r   r"   popleftr?   )r   rG   rT   r   r/   r   r   r   rI      s   

"zResource._shrink_downc                 C   s   | j S r	   )r   r%   r   r   r   r"      s   zResource.limitc                 C   s   |  | d S r	   )rN   )r   r"   r   r   r   r"      s   KOMBU_DEBUG_POOLr   c                 O   s   dd l }| jd  }| _td| d| jj  | j|i |}||_td| d| jj  t|ds7g |_|j	|
  |S )Nr   r   +z	 ACQUIRE -acquired_by)rR   _next_resource_idprint	__class__r   _orig_acquire_resource_idhasattrr[   appendformat_stack)r   argskwargsrR   idrr   r   r   r6      s   
c                 C   sR   |j }td| d| jj  | |}td| d| jj  |  jd8  _|S )NrY   z	 RELEASE rZ   r   )r`   r]   r^   r   _orig_releaser\   )r   r   rf   rg   r   r   r   r-      s   
)NNN)FN)FFF)T)r   r   r   r   r   r'   r   r#   r!   r*   r6   r2   r:   r<   r>   r-   r?   r
   rN   rI   propertyr"   setterosenvironr1   r_   rh   r\   r   r   r   r   r      s8    

1	
!


r   )r   rk   collectionsr   r   r   r   
_LifoQueue r   utils.compatr   utils.functionalr   r   r   r   r   r   r   <module>   s    