o
    DZc+                     @   s   d Z ddlZddlZddlmZ ddlmZ ddlmZ ddl	m
Z
mZmZmZmZmZ ddlmZ ddlmZ dd	lmZ dd
lmZmZ ddlmZ ddlmZ dZeeddZG dd deZ G dd dZ!G dd de!Z"dS )zIntegration testing utilities.    N)defaultdict)partial)count)AnyCallableDictSequenceTextIOTupleretry_over_timestates)TimeoutError)AsyncResult	ResultSet)truncate)humanize_secondsz4Still waiting for {0}.  Trying again {when}: {exc!r}T)microsecondsc                   @   s   e Zd ZdZdS )SentinelzSignifies the end of something.N)__name__
__module____qualname____doc__ r   r   T/var/www/chikooza/env/lib/python3.10/site-packages/celery/contrib/testing/manager.pyr      s    r   c                   @   s   e Zd ZdZ		d;ddZd<dd	Zd
d Z									d=ddZ			d>ddZdd Z	d?ddZ
d@ddZdAd d!ZdAd"d#Z		$dBd%d&Z		'dCd(d)Z		*dDd+d,Z	dAd-d.Zed/d0 ZdAd1d2Zd3d4 Zd5d6 ZdAd7d8Zd9d: ZdS )EManagerMixinz.Mixin that adds :class:`Manager` capabilities.      @FNc                 C   sF   |d u rt jn|| _|d u rt jn|| _| j j| _|| _|| _d S N)	sysstdoutstderrapp
connectionrecoverable_connection_errors
connerrorsblock_timeoutno_join)selfr&   r'   r    r!   r   r   r   _init_manager   s
   
zManagerMixin._init_manager-c                 C   s   t | | | jd d S )N)file)printr    )r(   ssepr   r   r   remark'   s   zManagerMixin.remarkc                 C   s   dd |D S )Nc                 S   s    g | ]}|j |jjvr|j qS r   )idbackend_cache).0resr   r   r   
<listcomp>-   s     z0ManagerMixin.missing_results.<locals>.<listcomp>r   )r(   rr   r   r   missing_results+   s   zManagerMixin.missing_resultsthingr   
   皙?      ?      @c              	      s@   |si n|} fdd}j ||f||||||	d|S )zWait for event to happen.

        The `catch` argument specifies the exception that means the event
        has not happened yet.
        c                    s>   t |}rtj t|dd| d r| || |S )Nin )whenexc)nextwarnE_STILL_WAITINGformatr   )r@   	intervalsretriesintervaldescemit_warningerrbackr(   r   r   on_errorF   s   z'ManagerMixin.wait_for.<locals>.on_error)argskwargsrK   max_retriesinterval_startinterval_stepr   )r(   funcatchrI   rM   rN   rK   rO   rP   rQ   interval_maxrJ   optionsrL   r   rH   r   wait_for/   s   
zManagerMixin.wait_for   {Gz?      ?c	           
   
   K   s2   z| j ||||||||dW S  |y   Y dS w )z;Make sure something does not happen (at least for a while).)rI   rO   rP   rQ   rT   rJ   zShould not have happened: N)rV   AssertionError)
r(   rR   rS   rI   rO   rP   rQ   rT   rJ   rU   r   r   r   ensure_not_for_a_whileX   s   z#ManagerMixin.ensure_not_for_a_whilec                 O   s   t |i |S r   r   )r(   rM   rN   r   r   r   r   i      zManagerMixin.retry_over_timec           	         s  | j rd S t|ts| j|g}g   fdd}|rt|ntdD ]d}g  d d < z|jd
||d|W   S  tjt	fyl } z$| 
|}| dt|t  t|td||d W Y d }~q#d }~w | jy } z| d|d W Y d }~q#d }~ww td	)Nc                    s     |  d S r   )append)task_idvaluereceivedr   r   	on_results   r\   z$ManagerMixin.join.<locals>.on_resultr   )callback	propagatez#Still waiting for {}/{}: [{}]: {!r}z, !zjoin: connection lost: z!Test failed: Missing task resultsr   )r'   
isinstancer   r"   ranger   getsockettimeoutr   r7   r/   rD   lenr   joinr%   rZ   )	r(   r6   rd   rO   rN   rb   ir@   waiting_forr   r`   r   rl   l   s2   

zManagerMixin.join      @c                 C   s   | j jj|dS Nrj   )r"   controlinspect)r(   rj   r   r   r   rs      s   zManagerMixin.inspectc                 c   s(    |  |j| p
i }| E d H  d S r   )rs   
query_taskitems)r(   idsrj   tasksr   r   r   query_tasks   s   zManagerMixin.query_tasksc           	      C   sH   t t}| j||dD ]\}}| D ]\}\}}|| | qq|S rp   )r   setrx   ru   add)	r(   rv   rj   r   hostnamereplyr^   state_r   r   r   query_task_states   s   zManagerMixin.query_task_states waiting for tasks to be acceptedc                 K      | j | j|f||d|S N)rG   rI   assert_task_worker_stateis_acceptedr(   rv   rG   rI   policyr   r   r   assert_accepted      zManagerMixin.assert_accepted waiting for tasks to be receivedc                 K   r   r   r   r   r   r   r   assert_received   r   zManagerMixin.assert_received,waiting for tasks to be started or completedc                 K   r   r   )assert_task_state_from_resultis_result_task_in_progress)r(   async_resultsrG   rI   r   r   r   r   ,assert_result_tasks_in_progress_or_completed   s   z9ManagerMixin.assert_result_tasks_in_progress_or_completedc                 K   $   | j t| j|||dtffi |S rp   rV   r   true_or_raiser   )r(   rR   resultsrG   r   r   r   r   r      s   z*ManagerMixin.assert_task_state_from_resultc                    s"   t jt jf t fdd| D S )Nc                 3   s    | ]}|j  v V  qd S r   )r}   )r3   resultpossible_statesr   r   	<genexpr>   s    z:ManagerMixin.is_result_task_in_progress.<locals>.<genexpr>)r   STARTEDSUCCESSall)r   rN   r   r   r   r      s   z'ManagerMixin.is_result_task_in_progressc                 K   r   rp   r   )r(   rR   rv   rG   r   r   r   r   r      s   z%ManagerMixin.assert_task_worker_statec                 K   s   | j g d|fi |S )N)reservedactiveready_ids_matches_stater(   rv   rN   r   r   r   is_received   s
   zManagerMixin.is_receivedc                 K   s   | j ddg|fi |S )Nr   r   r   r   r   r   r   r      s   zManagerMixin.is_acceptedc                    s&   | j ||dt fdd|D S )Nrq   c                 3   s4    | ] t  fd dfddD D V  qdS )c                 3   s    | ]} |v V  qd S r   r   )r3   r-   tr   r   r      s    z<ManagerMixin._ids_matches_state.<locals>.<genexpr>.<genexpr>c                    s   g | ]} | qS r   r   )r3   kr   r   r   r5      s    z=ManagerMixin._ids_matches_state.<locals>.<genexpr>.<listcomp>N)any)r3   expected_statesr   r   r   r      s
    "
z2ManagerMixin._ids_matches_state.<locals>.<genexpr>)r   r   )r(   r   rv   rj   r   r   r   r      s   zManagerMixin._ids_matches_statec                 O   s   ||i |}|st  |S r   )r   )r(   rR   rM   rN   r4   r   r   r   r      s   zManagerMixin.true_or_raise)r   FNN)r*   )	r8   r   NNr9   r:   r;   r<   F)r8   rW   r:   rX   rY   F)Fr9   )ro   )r;   )r;   r   )r;   r   )r;   r   )r   r   r   r   r)   r/   r7   rV   r[   r   rl   rs   rx   r   r   r   r   r   staticmethodr   r   r   r   r   r   r   r   r   r   r      sX    



*






	




r   c                   @   s   e Zd ZdZdd ZdS )Managerz(Test helpers for task integration tests.c                 K   s   || _ | jdi | d S )Nr   )r"   r)   )r(   r"   rN   r   r   r   __init__   s   zManager.__init__N)r   r   r   r   r   r   r   r   r   r      s    r   )#r   ri   r   collectionsr   	functoolsr   	itertoolsr   typingr   r   r   r   r	   r
   kombu.utils.functionalr   celeryr   celery.exceptionsr   celery.resultr   r   celery.utils.textr   celery.utils.timer   _humanize_secondsrC   	Exceptionr   r   r   r   r   r   r   <module>   s&      :