a
    _D                     @   s   d dl Z d dlZd dlZd dlmZmZ d dlmZmZ d dl	m
Z
mZmZmZmZ d dl	Z	e	jrrd dl	mZmZ g dZG dd deZG d	d
 d
eZG dd deZG dd deZG dd deZG dd deZG dd deZdS )    N)genioloop)Future"future_set_result_unless_cancelled)UnionOptionalTypeAny	Awaitable)DequeSet)	ConditionEvent	SemaphoreBoundedSemaphoreLockc                   @   s,   e Zd ZdZddddZddddZdS )_TimeoutGarbageCollectorzBase class for objects that periodically clean up timed-out waiters.

    Avoids memory leak in a common pattern like:

        while True:
            yield condition.wait(short_timeout)
            print('looping....')
    Nreturnc                 C   s   t  | _d| _d S )Nr   )collectionsdeque_waiters	_timeoutsself r   ,lib/python3.9/site-packages/tornado/locks.py__init__)   s    
z!_TimeoutGarbageCollector.__init__c                 C   s:   |  j d7  _ | j dkr6d| _ tdd | jD | _d S )N   d   r   c                 s   s   | ]}|  s|V  qd S N)done).0wr   r   r   	<genexpr>2       z<_TimeoutGarbageCollector._garbage_collect.<locals>.<genexpr>)r   r   r   r   r   r   r   r   _garbage_collect-   s    
z)_TimeoutGarbageCollector._garbage_collect)__name__
__module____qualname____doc__r   r&   r   r   r   r   r      s   	r   c                       sx   e Zd ZdZdd fddZedddZdeee	e
jf  ee dd	d
ZdeddddZddddZ  ZS )r   a  A condition allows one or more coroutines to wait until notified.

    Like a standard `threading.Condition`, but does not need an underlying lock
    that is acquired and released.

    With a `Condition`, coroutines can wait to be notified by other coroutines:

    .. testcode::

        from tornado import gen
        from tornado.ioloop import IOLoop
        from tornado.locks import Condition

        condition = Condition()

        async def waiter():
            print("I'll wait right here")
            await condition.wait()
            print("I'm done waiting")

        async def notifier():
            print("About to notify")
            condition.notify()
            print("Done notifying")

        async def runner():
            # Wait for waiter() and notifier() in parallel
            await gen.multi([waiter(), notifier()])

        IOLoop.current().run_sync(runner)

    .. testoutput::

        I'll wait right here
        About to notify
        Done notifying
        I'm done waiting

    `wait` takes an optional ``timeout`` argument, which is either an absolute
    timestamp::

        io_loop = IOLoop.current()

        # Wait up to 1 second for a notification.
        await condition.wait(timeout=io_loop.time() + 1)

    ...or a `datetime.timedelta` for a timeout relative to the current time::

        # Wait up to 1 second.
        await condition.wait(timeout=datetime.timedelta(seconds=1))

    The method returns False if there's no notification before the deadline.

    .. versionchanged:: 5.0
       Previously, waiters could be notified synchronously from within
       `notify`. Now, the notification will always be received on the
       next iteration of the `.IOLoop`.
    Nr   c                    s   t    tj | _d S r    )superr   r   IOLoopcurrentio_loopr   	__class__r   r   r   q   s    
zCondition.__init__c                 C   s.   d| j jf }| jr&|dt| j 7 }|d S )Nz<%sz waiters[%s]>)r0   r'   r   len)r   resultr   r   r   __repr__u   s    zCondition.__repr__timeoutr   c                    sX   t  j |rTddfdd}tj   || fdd S )zWait for `.notify`.

        Returns a `.Future` that resolves ``True`` if the condition is notified,
        or ``False`` after a timeout.
        Nr   c                      s     std    d S NF)r!   r   r&   r   r   waiterr   r   
on_timeout   s    
z"Condition.wait.<locals>.on_timeoutc                    s
     S r    Zremove_timeout_r.   timeout_handler   r   <lambda>   r%   z Condition.wait.<locals>.<lambda>)r   r   appendr   r,   r-   add_timeoutadd_done_callbackr   r6   r:   r   r.   r   r?   r9   r   wait{   s    
zCondition.waitr   )nr   c                 C   sL   g }|r4| j r4| j  }| s|d8 }|| q|D ]}t|d q8dS )zWake ``n`` waiters.r   TN)r   popleftr!   rA   r   )r   rG   waitersr9   r   r   r   notify   s    

zCondition.notifyc                 C   s   |  t| j dS )zWake all waiters.N)rJ   r2   r   r   r   r   r   
notify_all   s    zCondition.notify_all)N)r   )r'   r(   r)   r*   r   strr4   r   r   floatdatetime	timedeltar
   boolrF   intrJ   rK   __classcell__r   r   r/   r   r   5   s   ; r   c                   @   sz   e Zd ZdZddddZedddZeddd	Zddd
dZ	ddddZ
deeeejf  ed dddZdS )r   a  An event blocks coroutines until its internal flag is set to True.

    Similar to `threading.Event`.

    A coroutine can wait for an event to be set. Once it is set, calls to
    ``yield event.wait()`` will not block unless the event has been cleared:

    .. testcode::

        from tornado import gen
        from tornado.ioloop import IOLoop
        from tornado.locks import Event

        event = Event()

        async def waiter():
            print("Waiting for event")
            await event.wait()
            print("Not waiting this time")
            await event.wait()
            print("Done")

        async def setter():
            print("About to set the event")
            event.set()

        async def runner():
            await gen.multi([waiter(), setter()])

        IOLoop.current().run_sync(runner)

    .. testoutput::

        Waiting for event
        About to set the event
        Not waiting this time
        Done
    Nr   c                 C   s   d| _ t | _d S r7   )_valuesetr   r   r   r   r   r      s    zEvent.__init__c                 C   s   d| j j|  rdndf S )Nz<%s %s>rT   clear)r0   r'   is_setr   r   r   r   r4      s    zEvent.__repr__c                 C   s   | j S )z-Return ``True`` if the internal flag is true.rS   r   r   r   r   rV      s    zEvent.is_setc                 C   s.   | j s*d| _ | jD ]}| s|d qdS )zSet the internal flag to ``True``. All waiters are awakened.

        Calling `.wait` once the flag is set will not block.
        TN)rS   r   r!   
set_result)r   futr   r   r   rT      s
    
z	Event.setc                 C   s
   d| _ dS )zkReset the internal flag to ``False``.

        Calls to `.wait` will block until `.set` is called.
        FNrW   r   r   r   r   rU      s    zEvent.clearr5   c                    sj   t   jr d  S j   fdd |du rD S t| }| fdd |S dS )zBlock until the internal flag is true.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        Nc                    s    j | S r    )r   removerY   r   r   r   r@      r%   zEvent.wait.<locals>.<lambda>c                    s      s  S d S r    )r!   Zcancel)Ztfr[   r   r   r@     r%   )r   rS   rX   r   addrC   r   Zwith_timeout)r   r6   Ztimeout_futr   )rY   r   r   rF      s    

z
Event.wait)N)r'   r(   r)   r*   r   rL   r4   rP   rV   rT   rU   r   r   rM   rN   rO   r
   rF   r   r   r   r   r      s   ' r   c                   @   sL   e Zd ZdZeddddZddddZd	ee ee	j
 dd
ddZdS )_ReleasingContextManagerzReleases a Lock or Semaphore at the end of a "with" statement.

        with (yield semaphore.acquire()):
            pass

        # Now semaphore.release() has been called.
    N)objr   c                 C   s
   || _ d S r    )_obj)r   r^   r   r   r   r     s    z!_ReleasingContextManager.__init__r   c                 C   s   d S r    r   r   r   r   r   	__enter__  s    z"_ReleasingContextManager.__enter__Optional[Type[BaseException]])exc_typeexc_valexc_tbr   c                 C   s   | j   d S r    )r_   release)r   rb   rc   rd   r   r   r   __exit__  s    z!_ReleasingContextManager.__exit__)r'   r(   r)   r*   r	   r   r`   r   BaseExceptiontypesTracebackTyperf   r   r   r   r   r]     s   r]   c                       s   e Zd ZdZdedd fddZed fdd	Zddd
dZde	e
eejf  ee dddZddddZde	e e	ej ddddZddddZde	e e	ej ddddZ  ZS )r   aS  A lock that can be acquired a fixed number of times before blocking.

    A Semaphore manages a counter representing the number of `.release` calls
    minus the number of `.acquire` calls, plus an initial value. The `.acquire`
    method blocks if necessary until it can return without making the counter
    negative.

    Semaphores limit access to a shared resource. To allow access for two
    workers at a time:

    .. testsetup:: semaphore

       from collections import deque

       from tornado import gen
       from tornado.ioloop import IOLoop
       from tornado.concurrent import Future

       # Ensure reliable doctest output: resolve Futures one at a time.
       futures_q = deque([Future() for _ in range(3)])

       async def simulator(futures):
           for f in futures:
               # simulate the asynchronous passage of time
               await gen.sleep(0)
               await gen.sleep(0)
               f.set_result(None)

       IOLoop.current().add_callback(simulator, list(futures_q))

       def use_some_resource():
           return futures_q.popleft()

    .. testcode:: semaphore

        from tornado import gen
        from tornado.ioloop import IOLoop
        from tornado.locks import Semaphore

        sem = Semaphore(2)

        async def worker(worker_id):
            await sem.acquire()
            try:
                print("Worker %d is working" % worker_id)
                await use_some_resource()
            finally:
                print("Worker %d is done" % worker_id)
                sem.release()

        async def runner():
            # Join all workers.
            await gen.multi([worker(i) for i in range(3)])

        IOLoop.current().run_sync(runner)

    .. testoutput:: semaphore

        Worker 0 is working
        Worker 1 is working
        Worker 0 is done
        Worker 2 is working
        Worker 1 is done
        Worker 2 is done

    Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
    the semaphore has been released once, by worker 0.

    The semaphore can be used as an async context manager::

        async def worker(worker_id):
            async with sem:
                print("Worker %d is working" % worker_id)
                await use_some_resource()

            # Now the semaphore has been released.
            print("Worker %d is done" % worker_id)

    For compatibility with older versions of Python, `.acquire` is a
    context manager, so ``worker`` could also be written as::

        @gen.coroutine
        def worker(worker_id):
            with (yield sem.acquire()):
                print("Worker %d is working" % worker_id)
                yield use_some_resource()

            # Now the semaphore has been released.
            print("Worker %d is done" % worker_id)

    .. versionchanged:: 4.3
       Added ``async with`` support in Python 3.5.

    r   Nvaluer   c                    s$   t    |dk rtd|| _d S )Nr   z$semaphore initial value must be >= 0)r+   r   
ValueErrorrS   r   rk   r/   r   r   r   ~  s    
zSemaphore.__init__r   c                    sP   t   }| jdkrdn
d| j}| jr<d|t| j}d|dd |S )Nr   lockedzunlocked,value:{0}z{0},waiters:{1}z<{0} [{1}]>r   )r+   r4   rS   formatr   r2   )r   resZextrar/   r   r   r4     s    
zSemaphore.__repr__c                 C   sJ   |  j d7  _ | jrF| j }| s|  j d8  _ |t|  qFqdS )*Increment the counter and wake one waiter.r   N)rS   r   rH   r!   rX   r]   r8   r   r   r   re     s    
zSemaphore.releaser5   c                    s   t  jdkr. jd8  _t nNj |r|ddfdd}tj   	||
 fdd S )	zDecrement the counter. Returns an awaitable.

        Block if the counter is zero and wait for a `.release`. The awaitable
        raises `.TimeoutError` after the deadline.
        r   r   Nr   c                      s"     st     d S r    )r!   Zset_exceptionr   TimeoutErrorr&   r   r8   r   r   r:     s    z%Semaphore.acquire.<locals>.on_timeoutc                    s
     S r    r;   r<   r>   r   r   r@     r%   z#Semaphore.acquire.<locals>.<lambda>)r   rS   rX   r]   r   rA   r   r,   r-   rB   rC   rD   r   rE   r   acquire  s    

zSemaphore.acquirec                 C   s   t dd S )Nz0Use 'async with' instead of 'with' for SemaphoreRuntimeErrorr   r   r   r   r`     s    zSemaphore.__enter__ra   )typrk   	tracebackr   c                 C   s   |    d S r    r`   )r   rw   rk   rx   r   r   r   rf     s    zSemaphore.__exit__c                    s   |   I d H  d S r    rt   r   r   r   r   
__aenter__  s    zSemaphore.__aenter__rw   rk   tbr   c                    s   |    d S r    re   r   rw   rk   r}   r   r   r   	__aexit__  s    zSemaphore.__aexit__)r   )N)r'   r(   r)   r*   rQ   r   rL   r4   re   r   r   rM   rN   rO   r
   r]   rt   r`   rg   rh   ri   rf   r{   r   rR   r   r   r/   r   r     s*   _	 r   c                       s<   e Zd ZdZd
edd fddZdd fdd	Z  ZS )r   a:  A semaphore that prevents release() being called too many times.

    If `.release` would increment the semaphore's value past the initial
    value, it raises `ValueError`. Semaphores are mostly used to guard
    resources with limited capacity, so a semaphore released too many times
    is a sign of a bug.
    r   Nrj   c                    s   t  j|d || _d S )Nrk   )r+   r   _initial_valuerm   r/   r   r   r     s    zBoundedSemaphore.__init__r   c                    s"   | j | jkrtdt   dS )rr   z!Semaphore released too many timesN)rS   r   rl   r+   re   r   r/   r   r   re     s    zBoundedSemaphore.release)r   )r'   r(   r)   r*   rQ   r   re   rR   r   r   r/   r   r     s   r   c                   @   s   e Zd ZdZddddZedddZdeee	e
jf  ee dd	d
ZddddZddddZdee eej ddddZddddZdee eej ddddZdS )r   a  A lock for coroutines.

    A Lock begins unlocked, and `acquire` locks it immediately. While it is
    locked, a coroutine that yields `acquire` waits until another coroutine
    calls `release`.

    Releasing an unlocked lock raises `RuntimeError`.

    A Lock can be used as an async context manager with the ``async
    with`` statement:

    >>> from tornado import locks
    >>> lock = locks.Lock()
    >>>
    >>> async def f():
    ...    async with lock:
    ...        # Do something holding the lock.
    ...        pass
    ...
    ...    # Now the lock is released.

    For compatibility with older versions of Python, the `.acquire`
    method asynchronously returns a regular context manager:

    >>> async def f2():
    ...    with (yield lock.acquire()):
    ...        # Do something holding the lock.
    ...        pass
    ...
    ...    # Now the lock is released.

    .. versionchanged:: 4.3
       Added ``async with`` support in Python 3.5.

    Nr   c                 C   s   t dd| _d S )Nr   r   )r   _blockr   r   r   r   r     s    zLock.__init__c                 C   s   d| j j| jf S )Nz<%s _block=%s>)r0   r'   r   r   r   r   r   r4     s    zLock.__repr__r5   c                 C   s   | j |S )zAttempt to lock. Returns an awaitable.

        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
        timeout.
        )r   rt   )r   r6   r   r   r   rt     s    zLock.acquirec                 C   s.   z| j   W n ty(   tdY n0 dS )zUnlock.

        The first coroutine in line waiting for `acquire` gets the lock.

        If not locked, raise a `RuntimeError`.
        zrelease unlocked lockN)r   re   rl   rv   r   r   r   r   re     s    zLock.releasec                 C   s   t dd S )Nz+Use `async with` instead of `with` for Lockru   r   r   r   r   r`   '  s    zLock.__enter__ra   r|   c                 C   s   |    d S r    ry   r   r   r   r   rf   *  s    zLock.__exit__c                    s   |   I d H  d S r    rz   r   r   r   r   r{   2  s    zLock.__aenter__c                    s   |    d S r    r~   r   r   r   r   r   5  s    zLock.__aexit__)N)r'   r(   r)   r*   r   rL   r4   r   r   rM   rN   rO   r
   r]   rt   re   r`   rg   rh   ri   rf   r{   r   r   r   r   r   r     s*   $ 
r   )r   rN   rh   Ztornador   r   Ztornado.concurrentr   r   typingr   r   r   r	   r
   TYPE_CHECKINGr   r   __all__objectr   r   r   r]   r   r   r   r   r   r   r   <module>   s"   md 5