o
    ?Zc                     @   s   d Z ddlZddlmZmZ ddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZmZ dZdeeeZe  Z!G dd dej"Z"G dd dej#Z#dS )a?  SQLAlchemy Transport module for kombu.

Kombu transport using SQL Database as the message store.

Features
========
* Type: Virtual
* Supports Direct: yes
* Supports Topic: yes
* Supports Fanout: no
* Supports Priority: no
* Supports TTL: no

Connection String
=================

.. code-block::

    sqla+SQL_ALCHEMY_CONNECTION_STRING
    sqlalchemy+SQL_ALCHEMY_CONNECTION_STRING

For details about ``SQL_ALCHEMY_CONNECTION_STRING`` see SQLAlchemy Engine Configuration documentation.
Examples:

.. code-block::

    # PostgreSQL with default driver
    sqla+postgresql://scott:tiger@localhost/mydatabase

    # PostgreSQL with psycopg2 driver
    sqla+postgresql+psycopg2://scott:tiger@localhost/mydatabase

    # PostgreSQL with pg8000 driver
    sqla+postgresql+pg8000://scott:tiger@localhost/mydatabase

    # MySQL with default driver
    sqla+mysql://scott:tiger@localhost/foo

    # MySQL with mysqlclient driver (a maintained fork of MySQL-Python)
    sqla+mysql+mysqldb://scott:tiger@localhost/foo

    # MySQL with PyMySQL driver
    sqla+mysql+pymysql://scott:tiger@localhost/foo

Transport Options
=================

* ``queue_tablename``: Name of table storing queues.
* ``message_tablename``: Name of table storing messages.

Moreover parameters of :func:`sqlalchemy.create_engine()` function can be passed as transport options.
    N)dumpsloads)Empty)create_engine)OperationalError)sessionmaker)virtual)cached_property)bytes_to_str   )Message)	ModelBase)Queue)class_registrymetadata)r      r   .c                       s   e Zd ZdZdZi Z fddZdd Zdd Zd	d
 Z	e
dd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zedd Zedd  Z  ZS )!ChannelzThe channel class.Nc                    s&   |  |jj t j|fi | d S N)_configure_entity_tablenamesclienttransport_optionssuper__init__)self
connectionkwargs	__class__ Y/var/www/chikooza/env/lib/python3.10/site-packages/kombu/transport/sqlalchemy/__init__.pyr   V   s   zChannel.__init__c                 C   s2   | dd| _| dd| _| jo| j d S  d S )Nqueue_tablenamekombu_queuemessage_tablenamekombu_message)getr!   r#   	queue_clsmessage_cls)r   optsr   r   r    r   Z   s   z$Channel._configure_entity_tablenamesc                 C   s<   | j j}|j }|dd  |dd  t|jfi |S )Nr!   r#   )r   r   r   copypopr   hostname)r   conninfor   r   r   r    _engine_from_confige   s
   
zChannel._engine_from_configc                 C   s   | j j}|j| jvrEt1 |j| jv r | j|j W  d    S |  }t|d}t| ||f| j|j< W d    n1 s@w   Y  | j|j S )N)bind)	r   r   r+   _engines_MUTEXr-   r   r   
create_all)r   r,   engineSessionr   r   r    _openl   s   


zChannel._openc                 C   s$   | j d u r|  \}}| | _ | j S r   )_sessionr4   )r   _r3   r   r   r    session|   s   
zChannel.sessionc              	   C   s   | j | j| jj|k }|sitM | j | j| jj|k }|r0|W  d    S | |}| j | z| j   W n t	yP   | j 
  Y n	w W d    |S W d    |S 1 sdw   Y  |S r   )r7   queryr&   filternamefirstr0   addcommitr   rollbackr   queueobjr   r   r    _get_or_create   s4   


zChannel._get_or_createc                 K   s   |  | d S r   )rB   )r   r@   r   r   r   r    
_new_queue      zChannel._new_queuec                 K   sV   |  |}| t||}| j| z| j  W d S  ty*   | j  Y d S w r   )rB   r'   r   r7   r<   r=   r   r>   )r   r@   payloadr   rA   messager   r   r    _put   s   
zChannel._putc                 C   s   |  |}| jjjdkr| jd z<| j| j | jj	|j
k| jjdk| jj| jj
d }|rLd|_tt|jW | j  S t | j  w )NsqlitezBEGIN IMMEDIATE TRANSACTIONFr   )rB   r7   r.   r:   executer8   r'   with_for_updater9   queue_ididvisibleorder_bysent_atlimitr;   r   r
   rE   r=   r   )r   r@   rA   msgr   r   r    _get   s&   


zChannel._getc                 C   s(   |  |}| j| j| jj|jkS r   )rB   r7   r8   r'   r9   rK   rL   r?   r   r   r    
_query_all   s   
zChannel._query_allc                 C   sB   |  |jdd}z| j  W |S  ty    | j  Y |S w )NF)synchronize_session)rS   deleter7   r=   r   r>   )r   r@   countr   r   r    _purge   s   zChannel._purgec                 C   s   |  | S r   )rS   rV   )r   r@   r   r   r    _size   rD   zChannel._sizec                 C   sf   |t vr/t! |t v rt | W  d    S tt||tf|W  d    S 1 s*w   Y  t | S r   )r   r0   typestrr   )r   r:   basensr   r   r    _declarative_cls   s    zChannel._declarative_clsc                 C      |  dtd| jiS )Nr   __tablename__)r]   	QueueBaser!   r   r   r   r    r&      
   zChannel.queue_clsc                 C   r^   )Nr   r_   )r]   MessageBaser#   ra   r   r   r    r'      rb   zChannel.message_cls)__name__
__module____qualname____doc__r5   r/   r   r   r-   r4   propertyr7   rB   rC   rG   rR   rS   rW   rX   r]   r	   r&   r'   __classcell__r   r   r   r    r   P   s,    
	
r   c                   @   s2   e Zd ZdZeZdZdZdZdZe	fZ
dd ZdS )		TransportzThe transport class.Tr   sql
sqlalchemyc                 C   s   dd l }|jS )Nr   )rl   __version__)r   rl   r   r   r    driver_version   s   zTransport.driver_versionN)rd   re   rf   rg   r   can_parse_urldefault_portdriver_typedriver_namer   connection_errorsrn   r   r   r   r    rj      s    rj   )$rg   	threadingjsonr   r   r@   r   rl   r   sqlalchemy.excr   sqlalchemy.ormr   kombu.transportr   kombu.utilsr	   kombu.utils.encodingr
   modelsr   rc   r   r   r`   r   r   VERSIONjoinmaprZ   rm   RLockr0   r   rj   r   r   r   r    <module>   s(    8 