o
    ?Zcv                     @   sH  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZmZ ddlmZmZ ddlmZ d	d
lmZ dZdeeeZejdkrpddlZddlZddlZejZdZ ej!Z"e# Z$dd Z%dd Z&n ejdkrddl'Z'ddl'mZm"Z"m Z  dd Z%dd Z&ne(dG dd dej)Z)G dd dej*Z*dS )a=	  File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
    N)Empty)	monotonic)ChannelError)bytes_to_strstr_to_bytes)dumpsloads)cached_property   )virtual)r
   r   r   .ntc                 C   s$   t |  }t ||ddt dS )Create file lock.r         N)	win32file_get_osfhandlefileno
LockFileEx__overlapped)fileflagshfile r   P/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/filesystem.pylocky   s   r   c                 C   s"   t |  }t |ddt dS )Remove file lock.r   r   N)r   r   r   UnlockFileExr   )r   r   r   r   r   unlock~   s   r   posix)LOCK_EXLOCK_NBLOCK_SHc                 C   s   t |  | dS )r   N)fcntlflockr   )r   r   r   r   r   r      s   c                 C   s   t |  t j dS )r   N)r"   r#   r   LOCK_UN)r   r   r   r   r      s   z9Filesystem plugin only defined for NT and POSIX platformsc                   @   sl   e Zd ZdZdd Zdd Zdd Zdd	 Zed
d Z	e
dd Ze
dd Ze
dd Ze
dd ZdS )ChannelzFilesystem Channel.c                 K   s   d ttt d t |}tj| j	|}z0zt
|d}t|t |tt| W n ty=   td|dw W t| |  dS t| |  w )zPut `message` onto `queue`.z{}_{}.{}.msgi  wbzCannot add file z to directoryN)formatintroundr   uuiduuid4ospathjoindata_folder_outopenr   r   writer   r   OSErrorr   r   close)selfqueuepayloadkwargsfilenamefr   r   r   _put   s$   



zChannel._putc                 C   s   d| d }t | j}t|}t|dkrz|d}||dk r#q| jr*| j}nt	
 }ztt j| j|| W n	 tyE   Y nw t j||}zt|d}| }|  | jsct | W n tys   td|dw tt|S t )zGet next message from `queue`.r   .msgr   rbzCannot read file z from queue.)r,   listdirdata_folder_insortedlenpopfindstore_processedprocessed_foldertempfile
gettempdirshutilmover-   r.   r2   r0   readr3   remover   r   r   r   )r4   r5   
queue_findfolderr8   rD   r9   r6   r   r   r   _get   s@   



zChannel._getc                 C   s   d}d| d }t | j}t|dkrD| }z||dk r"W qt j| j|}t | |d7 }W n	 t	y=   Y nw t|dks|S )z!Remove all messages from `queue`.r   r   r;   r
   )
r,   r=   r>   r@   rA   rB   r-   r.   rJ   r2   r4   r5   countrK   rL   r8   r   r   r   _purge   s    
zChannel._purgec                 C   sX   d}d| d}t | j}t|dkr*| }||dk r q|d7 }t|dks|S )z<Return the number of messages in `queue` as an :class:`int`.r   r   r;   r
   )r,   r=   r>   r@   rA   rB   rN   r   r   r   _size   s   	zChannel._sizec                 C   s
   | j jjS N)
connectionclienttransport_optionsr4   r   r   r   rU      s   
zChannel.transport_optionsc                 C      | j ddS )Nr>   data_inrU   getrV   r   r   r   r>         zChannel.data_folder_inc                 C   rW   )Nr/   data_outrY   rV   r   r   r   r/     r[   zChannel.data_folder_outc                 C   rW   )NrC   FrY   rV   r   r   r   rC     r[   zChannel.store_processedc                 C   rW   )NrD   	processedrY   rV   r   r   r   rD   	  r[   zChannel.processed_folderN)__name__
__module____qualname____doc__r:   rM   rP   rQ   propertyrU   r	   r>   r/   rC   rD   r   r   r   r   r%      s     '



r%   c                       s@   e Zd ZdZeZe ZdZdZ	dZ
 fddZdd Z  ZS )	TransportzFilesystem Transport.r   
filesystemc                    s    t  j|fi | | j| _d S rR   )super__init__global_statestate)r4   rT   r7   	__class__r   r   rf     s   zTransport.__init__c                 C   s   dS )NzN/Ar   rV   r   r   r   driver_version  s   zTransport.driver_version)r^   r_   r`   ra   r%   r   BrokerStaterg   default_portdriver_typedriver_namerf   rk   __classcell__r   r   ri   r   rc     s    rc   )+ra   r,   rG   rE   r*   r5   r   timer   kombu.exceptionsr   kombu.utils.encodingr   r   kombu.utils.jsonr   r   kombu.utils.objectsr	    r   VERSIONr.   mapstr__version__name
pywintypeswin32conr   LOCKFILE_EXCLUSIVE_LOCKr   r!   LOCKFILE_FAIL_IMMEDIATELYr    
OVERLAPPEDr   r   r   r"   RuntimeErrorr%   rc   r   r   r   r   <module>   sD    Z



z