a
    2`/b                 	   @  sv  U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlmZmZmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZ d dlmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z%m&Z&m'Z'm(Z( e%r0ddl)m*Z* ddl+m,Z, ddl-m.Z. ddl/m0Z0 d dl1m2Z2m3Z3m4Z4m5Z5 d dl6m7Z7m8Z8 d dl9Z9d dl:m;Z; d dl<m=Z= d dl>m?Z?m@Z@mAZAmBZBmCZCmDZDmEZE ddlFmGZGmHZHmIZImJZJmKZK ddlLmMZM ddlGmNZNmOZOmPZP ddlQmRZRmSZS ddlTmUZU ddlVmWZWmXZXmYZYmZZZm[Z[m\Z\ ddl]m^Z^ ddl)m_Z_ ddl`maZambZb ddlcmdZd ddlemfZf dd lgmhZh dd!limjZj dd"lkm"Z"mlZl dd#lmmnZn dd$lompZp dd%lqmrZr dd&lsmtZs dd'lumvZv dd(lumwZx dd)lKmyZymzZzm{Z{m|Z|m}Z}m~Z~mZmZmZmZmZmZmZmZmZmZmZ dd*lmZmZmZ dd+lmZmZmZ dd,lmZ eeZe9jd-Zd.Zh d/Zd0d1hZenergZd2ed3< i Zd4ed5< i Zd4ed6< eBe9jd7Zed8g d9ZG d:d; d;eZG d<d= d=ZG d>d? d?eZG d@dA dAehZdAdBdCdDZddFdBdGdHZdIdJ ZwG dKdL dLeZe=fdMdBdNdOZddPdQZd gZeydRdSZdTdU ZdddefdVdWZdXdY ZeydRdSZe ZdZdBd[d\Zd]d^ Zd_gZe"jd`fdadbZdcdd Zdedf Zdgdh Zdidj Zdkdl ZddMdmdndodpZddqdMdmdrdsdtZddvdwZejZze^ġ dk r&ełW n eefy@   Y n"0 dxdy Zeedz< d{d| Zeedz< d}d~ Zdd ZdS )    )annotationsN)defaultdictdeque
namedtuple)Callable
Collection	ContainerIterableIteratorMappingMutableMapping)Executor)suppress)	timedeltaisawaitable)PicklingError)TYPE_CHECKINGAnyClassVarLiteral   )WorkerPluginActorClient)Nanny)firstkeymapmergepluck)IOLoopPeriodicCallback)istask)	CPU_COUNT)applyformat_bytesfuncnameparse_bytesparse_timedelta	stringifytypename)comm
preloadingprofilesystemutils)BatchedSend)Commconnectget_address_host)address_from_user_argsparse_address)OFFLOAD_THRESHOLD)CommClosedErrorStatuscoerce_to_addresserror_messagepingpong	send_recv)nvml)_get_plugin_name)WorkDir	WorkSpace)get_handlers)time)
ServerNode)setproctitle)pickleto_serialize)PubSubWorkerExtension)Security)ShuffleWorkerExtension)safe_sizeof)ThreadPoolExecutor)secede)LRUTimeoutError_maybe_complexget_iphas_argimport_filein_async_calliscoroutinefunctionjson_load_robust	key_split
log_errorsoffloadparse_portsrecursive_to_dictsilence_loggingthread_statewarn_on_duration)gather_from_workers	pack_dataretry_operation)ThrottledGCdisable_gc_diagnosisenable_gc_diagnosis)get_versionszdistributed.admin.pdb-on-errz--no-value-sentinel-->   readyconstrainedresumed	executingwaiting	cancelledlong-runningrg   rh   z
list[type]DEFAULT_EXTENSIONS"dict[str, Callable[[Worker], Any]]DEFAULT_METRICSDEFAULT_STARTUP_INFORMATIONz'distributed.scheduler.default-data-sizeSerializedTaskfunctionargskwargstaskc                   @  s   e Zd ZdS )InvalidTransitionN)__name__
__module____qualname__ r|   r|   1lib/python3.9/site-packages/distributed/worker.pyrx      s   rx   c                   @  s.  e Zd ZU dZded< ded< ded< ded< d	ed
< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< d ed!< d	ed"< d	ed#< ded$< d	ed%< d&ed'< d(ed)< ded*< ded+< d<d-d.Zd/d0 Zdd1d2d3Zd4d5d6dd7d8d9Zd(d1d:d;Z	d,S )=	TaskStatea	  Holds volatile state relating to an individual Dask task


    * **dependencies**: ``set(TaskState instances)``
        The data needed by this key to run
    * **dependents**: ``set(TaskState instances)``
        The keys that use this dependency.
    * **duration**: ``float``
        Expected duration the a task
    * **priority**: ``tuple``
        The priority this task given by the scheduler.  Determines run order.
    * **state**: ``str``
        The current state of the task. One of ["waiting", "ready", "executing",
        "fetch", "memory", "flight", "long-running", "rescheduled", "error"]
    * **who_has**: ``set(worker)``
        Workers that we believe have this data
    * **coming_from**: ``str``
        The worker that current task data is coming from if task is in flight
    * **waiting_for_data**: ``set(keys of dependencies)``
        A dynamic version of dependencies.  All dependencies that we still don't
        have for a particular key.
    * **resource_restrictions**: ``{str: number}``
        Abstract resources required to run a task
    * **exception**: ``str``
        The exception caused by running a task if it erred
    * **traceback**: ``str``
        The exception caused by running a task if it erred
    * **type**: ``type``
        The type of a particular piece of data
    * **suspicious_count**: ``int``
        The number of times a dependency has not been where we expected it
    * **startstops**: ``[{startstop}]``
        Log of transfer, load, and compute times for a task
    * **start_time**: ``float``
        Time at which task begins running
    * **stop_time**: ``float``
        Time at which task finishes running
    * **metadata**: ``dict``
        Metadata related to task. Stored metadata should be msgpack
        serializable (e.g. int, string, list, dict).
    * **nbytes**: ``int``
        The size of a particular piece of data
    * **annotations**: ``dict``
        Task annotations

    Parameters
    ----------
    key: str
    run_spec: SerializedTask
        A named tuple containing the ``function``, ``args``, ``kwargs`` and
        ``task`` associated with this `TaskState` instance. This defaults to
        ``None`` and can remain empty if it is a dependency that this worker
        will receive from another worker.

    strkeyobjectrun_specset[TaskState]dependencies
dependentsfloat | Nonedurationztuple[int, ...] | Noneprioritystateset[str]who_hascoming_fromwaiting_for_datawaitersdictresource_restrictionszException | None	exception
str | Noneexception_textzobject | None	tracebacktraceback_textztype | Nonetypeintsuspicious_countz
list[dict]
startstops
start_time	stop_timemetadatanbytesdict | Noner   booldone	_previous_nextNc                 C  s   |d usJ || _ || _t | _t | _d | _d | _d| _t | _d | _	t | _
t | _i | _d | _d| _d | _d| _d | _d| _g | _d | _d | _i | _d | _d | _d| _d | _d | _d S )Nreleased r   F)r   r   setr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   )selfr   r   r|   r|   r}   __init__   s8    zTaskState.__init__c                 C  s   d| j d| j dS )Nz<TaskState  >)r   r   r   r|   r|   r}   __repr__   s    zTaskState.__repr__returnc                 C  s   | j }|d ur|S tS N)r   DEFAULT_DATA_SIZE)r   r   r|   r|   r}   
get_nbytes  s    zTaskState.get_nbytesr|   excludeContainer[str])r   r   c                C  s   t | |ddS )a~  Dictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict

        Notes
        -----
        This class uses ``_to_dict_no_nest`` instead of ``_to_dict``.
        When a task references another task, just print the task repr. All tasks
        should neatly appear under Worker.tasks. This also prevents a RecursionError
        during particularly heavy loads, which have been observed to happen whenever
        there's an acyclic dependency chain of ~200+ tasks.
        T)r   members)r\   )r   r   r|   r|   r}   _to_dict_no_nest  s    zTaskState._to_dict_no_nestc                 C  s   | j tv ptdd | jD S )Nc                 s  s   | ]}|j tv V  qd S r   )r   
PROCESSING).0dep_tsr|   r|   r}   	<genexpr>  s   z)TaskState.is_protected.<locals>.<genexpr>)r   r   anyr   r   r|   r|   r}   is_protected  s    zTaskState.is_protected)N)
ry   rz   r{   __doc____annotations__r   r   r   r   r   r|   r|   r|   r}   r~      sB   
8
r~   c                   @  s   e Zd ZdZd ddddZddd	d
dZddddZddddZdddddZddddZ	ddddZ
ddddZdS )!UniqueTaskHeapzA heap of TaskState objects ordered by TaskState.priority
    Ties are broken by string comparison of the key. Keys are guaranteed to be
    unique. Iterating over this object returns the elements in priority order.
    r|   zCollection[TaskState])
collectionc                 C  s0   dd |D | _ dd |D | _t| j d S )Nc                 S  s   h | ]
}|j qS r|   r   r   tsr|   r|   r}   	<setcomp>%      z*UniqueTaskHeap.__init__.<locals>.<setcomp>c                 S  s   g | ]}|j |j|fqS r|   )r   r   r   r|   r|   r}   
<listcomp>&  r   z+UniqueTaskHeap.__init__.<locals>.<listcomp>)_known_heapheapqheapify)r   r   r|   r|   r}   r   $  s    zUniqueTaskHeap.__init__r~   None)r   r   c                 C  sD   t |tsJ |j| jvr@t| j|j|j|f | j|j dS )zAdd a new TaskState instance to the heap. If the key is already
        known, no object is added.

        Note: This does not update the priority / heap order in case priority
        changes.
        N)	
isinstancer~   r   r   r   heappushr   r   addr   r   r|   r|   r}   push)  s    zUniqueTaskHeap.pushr   c                 C  s"   t | j\}}}| j| |S )z1Pop the task with highest priority from the heap.)r   heappopr   r   remove)r   _r   r   r|   r|   r}   pop5  s    zUniqueTaskHeap.popc                 C  s   | j d d S )zDGet the highest priority TaskState without removing it from the heapr      )r   r   r|   r|   r}   peek;  s    zUniqueTaskHeap.peekr   r   )xr   c                 C  s   t |tr|j}|| jv S r   )r   r~   r   r   )r   r   r|   r|   r}   __contains__?  s    
zUniqueTaskHeap.__contains__zIterator[TaskState]c                 C  s   dd t | jD S )Nc                 s  s   | ]\}}}|V  qd S r   r|   )r   r   r   r|   r|   r}   r   E  r   z*UniqueTaskHeap.__iter__.<locals>.<genexpr>)sortedr   r   r|   r|   r}   __iter__D  s    zUniqueTaskHeap.__iter__r   c                 C  s
   t | jS r   )lenr   r   r|   r|   r}   __len__G  s    zUniqueTaskHeap.__len__r   c                 C  s   dt | j dt|  dS )N<: z items>)r   ry   r   r   r|   r|   r}   r   J  s    zUniqueTaskHeap.__repr__N)r|   )ry   rz   r{   r   r   r   r   r   r   r   r   r   r|   r|   r|   r}   r     s   r   c                .      s^  e Zd ZU dZe Zded< e Zded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded < d!ed"< d#ed$< d%ed&< d'ed(< d)ed*< ded+< d,ed-< d.ed/< ded0< ded1< ded2< ded3< d4ed5< d6ed7< d6ed8< ded9< d:ed;< d<ed=< ded>< ded?< ded@< dedA< dedB< dCedD< dCedE< dCedF< dGedH< dIedJ< dKedL< dMedN< dOedP< dQedR< dSedT< dUedV< dWedX< dUedY< dUedZ< dKed[< dUed\< d:ed]< dKed^< ded_< d`eda< d`edb< dcedd< dcede< dcedf< d:edg< dhedi< djedk< d'edl< dWedm< dnedo< dnedp< dnedq< dreds< dtedu< dvedw< dxedy< d:edz< d{ed|< d}ed~< ded< dCed< ded< dCed< d:ed< dZ	ded< i Z
d'ed< d'ed< ded< ded< d:ed< ded< d'ed< dced< d:ed< ded< ded< ded< dddddddddddddddddddddddddeeddddddddddddddddd*dUdWdUdWdddUddd:ddddWdddddUdddddddddddUdUdWdUdUd:dKdddddddd+ fddZdd Zedd Zdd ZeddddZeddddZedd Zedd ZeddÄ Zejjddń ZddddǄZdhdddɄZdd˄ Zdd̈́ ZdddΜdddhdќ fddӄZddՄ Zddׄ Z ddل Z!ddۄ Z"dd݄ Z#ddd߄Z$dd Z%ddddZ&dddZ' fddZ(dd Z)d fdd	Z*dddZ+dd:dddZ,dd Z-dd Z.dddZ/ddd:dKdddZ0ddd Z1dd Z2dd Z3dd Z4dddKd	d
dZ5dKddKddddZ6ddde7ddddddKdddCdd:dKdddZ8dd Z9dd Z:dd Z;dd Z<dd Z=d d! Z>d"d# Z?d$d% Z@d&d' ZAd(d) ZBd*d+ ZCd,d- ZDd.d/ ZEd0d1 ZFd2d3 ZGd4d5 ZHd6d7 ZId8d9 ZJd:d; ZKd<d= ZLd>d? ZMd@dA ZNdBdC ZOdDdE ZPdFdG ZQe7fdHdIZRe7fdJdKZSe7fdLdMZTdNdO ZUdPdQ ZVdRdS ZWdTdU ZXdVdW ZYdXdY ZZdZd[ Z[d\d] Z\d^d_ Z]d`da Z^dbdc Z_dKdddedfZ`dhdgdhdiZaddjdkdlZbdmdn Zcdodp Zddqdr Zedsdt Zfdudv Zgdwdx Zhedydz Zid{d|d}d~dZjdCdCdhddKddddZkdKd{ddddZldd ZmdKddddZndddddZodd ZpdKddddZqddKddUd:ddddZrdddZsdddZtdddZudddZvdddddZwdddZxdKd:dddZydd Zzdd Z{dd Z|dd Z}dd Z~dd Zdd ZdddZdddZdddZdd Zdd Zdd Zdd ZdÐdĄ ZdŐdƄ ZdǐdȄ Zdɐdʄ Zdːd̄ Zd͐d΄ ZdϐdЄ Zdѐd҄ ZdӐdԄ Zeddd֐dׄZddcdՐd؜dِdڄZdKddېd܄Z  ZS (  WorkeraY   Worker node in a Dask distributed cluster

    Workers perform two functions:

    1.  **Serve data** from a local dictionary
    2.  **Perform computation** on that data and on data from peers

    Workers keep the scheduler informed of their data and use that scheduler to
    gather data from other workers when necessary to perform a computation.

    You can start a worker with the ``dask-worker`` command line application::

        $ dask-worker scheduler-ip:port

    Use the ``--help`` flag to see more options::

        $ dask-worker --help

    The rest of this docstring is about the internal state the the worker uses
    to manage and track internal computations.

    **State**

    **Informational State**

    These attributes don't change significantly during execution.

    * **nthreads:** ``int``:
        Number of nthreads used by this worker process
    * **executors:** ``dict[str, concurrent.futures.Executor]``:
        Executors used to perform computation. Always contains the default
        executor.
    * **local_directory:** ``path``:
        Path on local machine to store temporary files
    * **scheduler:** ``rpc``:
        Location of scheduler.  See ``.ip/.port`` attributes.
    * **name:** ``string``:
        Alias
    * **services:** ``{str: Server}``:
        Auxiliary web servers running on this worker
    * **service_ports:** ``{str: port}``:
    * **total_out_connections**: ``int``
        The maximum number of concurrent outgoing requests for data
    * **total_in_connections**: ``int``
        The maximum number of concurrent incoming requests for data
    * **comm_threshold_bytes**: ``int``
        As long as the total number of bytes in flight is below this threshold
        we will not limit the number of outgoing connections for a single tasks
        dependency fetch.
    * **batched_stream**: ``BatchedSend``
        A batched stream along which we communicate to the scheduler
    * **log**: ``[(message)]``
        A structured and queryable log.  See ``Worker.story``

    **Volatile State**

    These attributes track the progress of tasks that this worker is trying to
    complete.  In the descriptions below a ``key`` is the name of a task that
    we want to compute and ``dep`` is the name of a piece of dependent data
    that we want to collect from others.

    * **tasks**: ``{key: TaskState}``
        The tasks currently executing on this worker (and any dependencies of those tasks)
    * **data:** ``{key: object}``:
        Prefer using the **host** attribute instead of this, unless
        memory_limit and at least one of memory_target_fraction or
        memory_spill_fraction values are defined, in that case, this attribute
        is a zict.Buffer, from which information on LRU cache can be queried.
    * **data.memory:** ``{key: object}``:
        Dictionary mapping keys to actual values stored in memory. Only
        available if condition for **data** being a zict.Buffer is met.
    * **data.disk:** ``{key: object}``:
        Dictionary mapping keys to actual values stored on disk. Only
        available if condition for **data** being a zict.Buffer is met.
    * **data_needed**: UniqueTaskHeap
        The tasks which still require data in order to execute, prioritized as a heap
    * **ready**: [keys]
        Keys that are ready to run.  Stored in a LIFO stack
    * **constrained**: [keys]
        Keys for which we have the data to run, but are waiting on abstract
        resources like GPUs.  Stored in a FIFO deque
    * **executing_count**: ``int``
        A count of tasks currently executing on this worker
    * **executed_count**: int
        A number of tasks that this worker has run in its lifetime
    * **long_running**: {keys}
        A set of keys of tasks that are running and have started their own
        long-running clients.
    * **has_what**: ``{worker: {deps}}``
        The data that we care about that we think a worker has
    * **pending_data_per_worker**: ``{worker: UniqueTaskHeap}``
        The data on each worker that we still want, prioritized as a heap
    * **in_flight_tasks**: ``int``
        A count of the number of tasks that are coming to us in current
        peer-to-peer connections
    * **in_flight_workers**: ``{worker: {task}}``
        The workers from which we are currently gathering data and the
        dependencies we expect from those connections
    * **comm_bytes**: ``int``
        The total number of bytes in flight
    * **threads**: ``{key: int}``
        The ID of the thread on which the task ran
    * **active_threads**: ``{int: key}``
        The keys currently running on active threads
    * **waiting_for_data_count**: ``int``
        A count of how many tasks are currently waiting for data
    * **generation**: ``int``
        Counter that decreases every time the compute-task handler is invoked by the
        Scheduler. It is appended to TaskState.priority and acts as a tie-breaker
        between tasks that have the same priority on the Scheduler, determining a
        last-in-first-out order between them.

    Parameters
    ----------
    scheduler_ip: str, optional
    scheduler_port: int, optional
    scheduler_file: str, optional
    ip: str, optional
    data: MutableMapping, type, None
        The object to use for storage, builds a disk-backed LRU dict by default
    nthreads: int, optional
    loop: tornado.ioloop.IOLoop
    local_directory: str, optional
        Directory where we place local resources
    name: str, optional
    memory_limit: int, float, string
        Number of bytes of memory that this worker should use.
        Set to zero for no limit.  Set to 'auto' to calculate
        as system.MEMORY_LIMIT * min(1, nthreads / total_cores)
        Use strings or numbers like 5GB or 5e9
    memory_target_fraction: float or False
        Fraction of memory to try to stay beneath
        (default: read from config key distributed.worker.memory.target)
    memory_spill_fraction: float or false
        Fraction of memory at which we start spilling to disk
        (default: read from config key distributed.worker.memory.spill)
    memory_pause_fraction: float or False
        Fraction of memory at which we stop running new tasks
        (default: read from config key distributed.worker.memory.pause)
    max_spill: int, string or False
        Limit of number of bytes to be spilled on disk.
        (default: read from config key distributed.worker.memory.max-spill)
    executor: concurrent.futures.Executor, dict[str, concurrent.futures.Executor], "offload"
        The executor(s) to use. Depending on the type, it has the following meanings:
            - Executor instance: The default executor.
            - Dict[str, Executor]: mapping names to Executor instances. If the
              "default" key isn't in the dict, a "default" executor will be created
              using ``ThreadPoolExecutor(nthreads)``.
            - Str: The string "offload", which refer to the same thread pool used for
              offloading communications. This results in the same thread being used
              for deserialization and computation.
    resources: dict
        Resources that this worker has like ``{'GPU': 2}``
    nanny: str
        Address on which to contact nanny, if it exists
    lifetime: str
        Amount of time like "1 hour" after which we gracefully shut down the worker.
        This defaults to None, meaning no explicit shutdown time.
    lifetime_stagger: str
        Amount of time like "5 minutes" to stagger the lifetime value
        The actual lifetime will be selected uniformly at random between
        lifetime +/- lifetime_stagger
    lifetime_restart: bool
        Whether or not to restart a worker after it has reached its lifetime
        Default False
    kwargs: optional
        Additional parameters to ServerNode constructor

    Examples
    --------

    Use the command line to start a worker::

        $ dask-scheduler
        Start scheduler at 127.0.0.1:8786

        $ dask-worker 127.0.0.1:8786
        Start worker at:               127.0.0.1:1234
        Registered with scheduler at:  127.0.0.1:8786

    See Also
    --------
    distributed.scheduler.Scheduler
    distributed.nanny.Nanny
    z!ClassVar[weakref.WeakSet[Worker]]
_instancesz!ClassVar[weakref.WeakSet[Client]]_initialized_clientszdict[str, TaskState]tasksr   waiting_for_data_countzdefaultdict[str, set[str]]has_whatz defaultdict[str, UniqueTaskHeap]pending_data_per_workerzNanny | Nonenannyzthreading.Lock_lockr   data_neededzdict[str, set[str]]in_flight_workerstotal_out_connectionstotal_in_connectionscomm_threshold_bytescomm_nbytesr   _missing_dep_flightzdict[str, int]threadsactive_threads_lockzdict[int, str]active_threadsr   active_keysz defaultdict[str, dict[str, Any]]profile_keysz.deque[tuple[float, dict[str, dict[str, Any]]]]profile_keys_historyzdict[str, Any]profile_recentz#deque[tuple[float, dict[str, Any]]]profile_history
generationz	list[str]rg   z
deque[str]rh   
_executing_in_flight_tasksexecuted_countlong_runningzdeque[tuple]logzdeque[dict[str, Any]]incoming_transfer_logoutgoing_transfer_logtarget_message_sizer   validatezdict[tuple[str, str], Callable]_transitions_table_transition_counterincoming_countoutgoing_countoutgoing_current_countrepetitively_busyfloat	bandwidthlatencyprofile_cycle_intervalrB   Z	workspacerA   _workdirr   local_directoryzClient | None_clientz#defaultdict[str, tuple[float, int]]bandwidth_workersz$defaultdict[type, tuple[float, int]]bandwidth_typeszlist[preloading.Preload]preloadsr   contact_address
int | None_start_port_start_host
_interface	_protocol_dashboard_address
_dashboard_http_prefixnthreadszdict[str, float]total_resourcesavailable_resourcesr   death_timeoutlifetimelifetime_staggerlifetime_restartr   
extensionsrJ   securityconnection_argsmemory_limitzfloat | Literal[False]memory_target_fractionmemory_spill_fractionmemory_pause_fractionzint | Literal[False]	max_spillzMutableMapping[str, Any]datazdict[str, Actor | None]actorsr"   loop	reconnectzdict[str, Executor]	executorsr2   batched_streamr   namescheduler_delayzdict[str, BatchedSend]stream_commsheartbeat_intervalheartbeat_activeNz
Any | None_ipython_kernelservicesservice_specsro   metricsstartup_informationlow_level_profiler	schedulerexecution_statememory_monitor_interval_memory_monitoringrc   _throttled_gczdict[str, WorkerPlugin]pluginsztuple[WorkerPlugin, ...]_pending_pluginsTauto1sZ200msF/r|   )*scheduler_filer  r*  	local_dirr  r4  r.  r+  r#  executor	resourcessilence_logsr  preloadpreload_argvr!  r  r1  r;  r$  r%  r&  r'  r   r6  r7  r(  	interfacehostportprotocoldashboard_address	dashboardhttp_prefixr   r>  r8  r   r	  r  r  r  zIOLoop | Noner   r   zstr | floatz:Executor | dict[str, Executor] | Literal['offload'] | Nonezdict[str, float] | Nonezlist[str] | Nonez"list[str] | list[list[str]] | Nonez Security | dict[str, Any] | Nonezfloat | Literal[False] | Nonez#float | str | Literal[False] | Nonezlist[type] | Nonez%Mapping[str, Callable[[Worker], Any]]zMutableMapping[str, Any] | Callable[[], MutableMapping[str, Any]] | tuple[Callable[..., MutableMapping[str, Any]], dict[str, Any]] | Nonezbool | None)+scheduler_ipscheduler_portrC  r  r*  rD  r  r4  r.  r+  r#  rE  rF  rG  r  rH  rI  r!  r  r1  r;  r$  r%  r&  r'  r   r6  r7  r(  rJ  rK  rL  rM  rN  rO  rP  r   r>  r8  r   r  r  r  c       *   :   0     s
  i  _ d _tt _tt _|% _t	  _
t  _i  _tjd _tjd _td _d _t  _i  _t	  _i  _t  _ttj _tdd _t  _tdd _ d _!g  _"t  _#t  _$t  _%d _&t  _'td _(tdd _)|(d u rtjd	}(|( _* j+ j, j- j. j/ j0 j1 j2 j3 j4 j5 j6 j7 j8 j4 j4 j9 j: j; j< j= j> j4 j? j@ jA jB jC j3 jD j= j< jE jF jG j3 j3 jH j4 j3 jI jJ jK jL jM jN j4d
/ _Od _Ptdd _Qd _Rtdd _Sd _Td _Ud _VtWtjd _Xtdd  _Ytdd  _Zd _[d  _\|)d u rltjd})t]|)dd})|)sJ  ^t_ |d urt`ad |}|stjdptbc }tbjd|dd tbjef|d}tgdd< thtbjei| _j jjjkdd _l jljm _nW d    n1 s*0    Y  |sFtjd}|sXtjd}|d usfJ |d ustJ tojp || jnd _q|rtr|}.|.d }/nH|d u rtjd d rtjd }/n |d u rts|}/nts||f}/| _t|!d u r$|/ud!}0tv|0d"kr|0d }!|!s$J |  _w| _x|rlty|\}1}2|2zd#d$krl|2{d%slt|d&|2 | _}|! _~|pt _|d u rtjd'd }t|tsJ |pi  _|pi   _t]| _i  _|rt|d( t|trtf i |}|pt  _t jts"J  jd) _t| j _|d urL|n
tjd* _|d urh|n
tjd+ _|d ur|n
tjd, _|d u rtjd-}|d.u rd.ntW| _t|tr| _nt|r|  _nt|tr|d f i |d$  _nr jrz js& jrzd$d/lm}3  jrTt j jpL j }4ntj}4|3tbjef jnd0|4 jd1 _ni  _i  _|pt  _|
 _tjtd$d2d3d4 _t dkrtd$d5d3 jd6< |d7kr jd7  jd8< n.t|tr j| n|d ur| jd8< d8 jvr<t jd9d3 jd8< td: jd; _|	 _d _i  _d. _d  _ jntjevrtjed jn i  _|pi  _|" _|# _|$ _|rt|ni  _|rt|ni  _|'d u rtjd<}'|' _ j j j j j j jt j j j j jÈ jĈ jň jƈ jǈ jȈ jɈ jʈ jd=}5 j j̈ j͈ jΈ j jψ jЈ jd>}6t҃ jf |5|6 j jd?|-  |/ _Ո jjֈ j d@ _t]|dd _tو jڈ jdA }7|7 jdB< tه fdCddD}7|7 jdE< tو jdA}7|7 jdF< | _t]|dd _d. _߈ j	rX jd u	s<J tو j jdA }7|7 jdG< |d u 	rft}|D ]}8|8  	qjtt_dH _tdI t]tjdJdd}9tو j|9dA }7|7 jdK< tو j|)dA }7|7 jdL< i  _|& _|*d u 	rtjdM}*t]|* _|+d u 
rtjdN}+t]|+}+|,d u 
r6tjdO},|, _ j
rt  jt d" d$ |+ 7  _ j j j tj  d S )PNr   z'distributed.worker.connections.outgoingz'distributed.worker.connections.incomingg    cAi  )maxleng    ׇA順 zdistributed.scheduler.validate)/)rl   ri   )rl   fetch)rl   r   )rl   rk   )rl   	forgotten)rl   memory)rl   error)ri   rW  )ri   rX  )ri   r   ri   rk   ri   rU  )ri   missing)rh   rj   )rh   r   )rX  r   )rj   rX  )rj   rm   )rj   rW  )rj   r   )rj   rescheduled)rU  flight)rU  r   )r]  rX  )r]  rU  )r]  rW  )r]  r[  )r]  r   )rm   rX  )rm   rW  )rm   r\  )rm   r   )rW  r   )r[  rU  )r[  r   )r[  rX  )rg   rX  )rg   rj   )rg   r   )r   rX  )r   rU  )r   rV  )r   rW  )r   rk   )rk   rh   )rk   rg   )rk   r   zdistributed.scheduler.bandwidthc                   S  s   dS Nr   r   r|   r|   r|   r|   r}   <lambda>  r   z!Worker.__init__.<locals>.<lambda>c                   S  s   dS r^  r|   r|   r|   r|   r}   r`    r   gMbP?z distributed.worker.profile.cyclems)defaultz2The local_dir keyword has moved to local_directoryztemporary-directoryT)exist_okzdask-worker-spacerA  zCreating scratch directories is taking a surprisingly long time. This is often due to running workers on a network file system. Consider specifying a local-directory to point workers to write scratch data to a local disk.zworker-prefixzdistributed.worker.preloadzdistributed.worker.preload-argv)Zfile_diraddresszscheduler-addressz://r   :r   [z;Host address with IPv6 must be bracketed like '[::1]'; got zdistributed.worker.resources)levelworkerz distributed.worker.memory.targetzdistributed.worker.memory.spillzdistributed.worker.memory.pausez#distributed.worker.memory.max-spillF)SpillBufferZstorage)targetr'  zDask-Actor-Threads)Zthread_name_prefix)rZ   actorzDask-GPU-ThreadsgpurZ   rb  zDask-Default-ThreadsZ2msintervalr*  z$distributed.worker.profile.low-level)gatherrunrun_coroutineget_dataupdate_dataZ	free_keys	terminateZpingupload_filestart_ipython
call_stackr/   Zprofile_metadataget_logskeysversionsactor_executeactor_attributez
plugin-addzplugin-removeget_monitor_info)closecancel-computezacquire-replicascompute-task	free-keysremove-replicaszsteal-requestworker-status-change)handlersstream_handlersio_loopr"  )r9  Ziolooprj    	heartbeatc                     s    j ddiS )Nop
keep-aliver-  sendr|   r   r|   r}   r`    r   i`  r  find-missingrW  )loggerzdask-worker [not started]z#distributed.worker.profile.intervalr/   zprofile-cyclez$distributed.worker.lifetime.durationz#distributed.worker.lifetime.staggerz#distributed.worker.lifetime.restart)r   r   r   r   r   r   r   r   	threadingLockr   r   r   daskconfiggetr   r   r   r   r   r   r   r   r   r   r/   creater   r   r   r   r   r   rg   rh   r   r   r   r   r   r   r   transition_cancelled_resumedtransition_cancelled_fetchtransition_cancelled_releasedtransition_cancelled_waitingtransition_cancelled_forgottentransition_cancelled_memorytransition_cancelled_errortransition_generic_memorytransition_generic_errortransition_generic_releasedtransition_resumed_waitingtransition_resumed_fetchtransition_resumed_missing transition_constrained_executingtransition_executing_error!transition_executing_long_runningtransition_executing_memorytransition_executing_released transition_executing_rescheduledtransition_fetch_flighttransition_flight_errortransition_flight_fetchtransition_flight_memorytransition_flight_missingtransition_flight_releasedtransition_long_running_memorytransition_memory_releasedtransition_missing_fetchtransition_missing_releasedtransition_ready_executingtransition_released_fetchtransition_released_forgottentransition_released_memorytransition_released_waitingtransition_waiting_constrainedtransition_waiting_readyr   r  r   r  r   r  r  r  r)   r  r  r  r  r  r*   Z_setup_loggingr  warningswarnosgetcwdmakedirspathjoinr_   rB   abspathZ
_workspaceZnew_work_dirr
  Zdir_pathr  r.   Zprocess_preloadsr  rW   r;   r  splitr   r  r  r7   count
startswith
ValueErrorr  r  r%   r  r   r   r  copyr  r  r   r]   rJ   r!  Zget_connection_argsr"  parse_memory_limitr#  r$  r%  r&  r'  r   r(  callabletupleZspillrk  sysmaxsizer)  r"   currentr*  r+  r1   _offload_executorrM   r,  r?   device_get_countupdater2   r-  r.  r/  r0  r2  r3  insertr4  r5  r  r  r  r6  r7  r8  rq  rr  rs  rt  ru  handle_free_keysr  r=   rw  rx  get_call_stackget_profileget_profile_metadatarz  r{  r|  r}  r~  
plugin_addplugin_remover  handle_cancel_computehandle_acquire_replicashandle_compute_taskhandle_remove_replicashandle_steal_requesthandle_worker_status_changesuperr   rpcr9  rf  r:  r1  r#   r  periodic_callbacksfind_missingZ_addressr;  r<  memory_monitorrn   rc   r=  rF   trigger_profilecycle_profiler>  r?  r  r  randomr  
call_laterclose_gracefullyr   r   r   ):r   rQ  rR  rC  r  r*  rD  r  r4  r.  r+  r#  rE  rF  rG  r  rH  rI  r!  r  r1  r;  r$  r%  r&  r'  r   r6  r7  r(  rJ  rK  rL  rM  rN  rO  rP  r   r>  r8  r   r	  r  r  r  rv   ZcfgZscheduler_addrZprotocol_addressr   Zhost_addressrk  rl  r  r  pcextZprofile_trigger_interval	__class__r   r}   r   m  s   :







2




*
































zWorker.__init__c                 C  sz   | j | jkrd| j  nd}d| jj d| j| d| jj  dt| j d| j d| j d	t| j	 d
| j
 d| j dS )Nz, name: r   r   r   z
, status: z
, stored: z, running: rB  z	, ready: z, comm: z, waiting: r   )r.  rf  r  ry   statusr   r(  executing_countr  rg   in_flight_tasksr   )r   r.  r|   r|   r}   r   F  s"    zWorker.__repr__c                 C  s   | j jS r   )Z_deque_handlerr   r   r|   r|   r}   logsR  s    zWorker.logsc                 C  s   | j d||d d S )Nz	log-event)r  topicmsgr  )r   r  r  r|   r|   r}   	log_eventV  s    zWorker.log_eventr   c                 C  s
   t | jS r   )r   r   r   r|   r|   r}   r  _  s    zWorker.executing_countc                 C  s
   t | jS r   )r   r   r   r|   r|   r}   r  c  s    zWorker.in_flight_tasksc                 C  s   | j S ) For API compatibility with Nanny)rf  r   r|   r|   r}   worker_addressg  s    zWorker.worker_addressc                 C  s   t jddd | jS )r  z4The local_dir attribute has moved to local_directoryr   
stacklevel)r  r  r  r   r|   r|   r}   rD  l  s    zWorker.local_dirc                 C  s
   | j d S )Nrb  )r,  r   r|   r|   r}   rE  t  s    zWorker.executorc                 C  s   t j| | |   dS )z@Override Server.status to notify the Scheduler of status changesN)rE   r  __set___send_worker_status_change)r   valuer|   r|   r}   r  x  s    zWorker.statusc                 C  sR   | j r2| j jr2| j j s2| j d| jjd n| jtjkrN| jd| j	 d S )Nr  )r  r  皙?)
r-  r-   closedr  _statusr.  r:   r*  r  r  r   r|   r|   r}   r  ~  s    
z!Worker._send_worker_status_changec              
     s   z| j j\}}W n ty*   d\}}Y n0 t| jt| j t| j| j| jt| j	t
t| jd||dd}|| j  | j D ]F\}}z*|| }t|r|I d H }||| W q ty   Y q0 q|S )Nr_  )totalworkerstypes)rW  Zdisk)rj   Z	in_memoryrg   Z	in_flightr  Zspilled_nbytes)r(  Zspilled_totalAttributeErrorr   r  r   rg   r  r  r  r   r,   r  r  monitorrecentr6  itemsr   
setdefault	Exception)r   Zspilled_memoryZspilled_diskoutkZmetricresultr|   r|   r}   get_metrics  s6    

zWorker.get_metricsc              	     sV   i }| j  D ]B\}}z&|| }t|r2|I d H }|||< W q tyN   Y q0 q|S r   )r7  r  r   r  )r   r  r  fvr|   r|   r}   get_startup_information  s    
zWorker.get_startup_informationc                 C  s    t | j| j| jj| j| jdS )N)r   idr9  r  r#  )r   ry   r  r9  rf  r  r#  r   r|   r|   r}   identity  s    zWorker.identityr   zComm | Noner   )r-   r   r   c                  s   t  j d}| j| j| jt| jdd | j D | j	| j
| j| j| j| j| j| j| j| j|  tjj| j| jd}||  fdd| D }t| dS )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Worker.identity
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        r   c                 S  s   i | ]\}}|t |qS r|   )list)r   wr  r|   r|   r}   
<dictcomp>  s   z#Worker._to_dict.<locals>.<dictcomp>)r  rg   rh   r   r   r   r  r  r   r   r   r#  r$  r%  r&  r  r  r   r   c                   s   i | ]\}}| vr||qS r|   r|   r   r  r  r   r|   r}   r    r   )r  _to_dictr  rg   rh   r  r   r   r  r   r  r  r   r   r   r#  r$  r%  r&  rz  r  r  r   r   r  r\   )r   r-   r   infoextrar  r   r}   r    s4    
zWorker._to_dictc                   sj   j d    j d   t } jd u r4 j _td z<t }t jjfi  j	I d H }d|_
t |_|jtdd j jj
t j j j
dd  j D d	d  j D t  j j j j jt t   I d H    I d H d
dgdI d H  |j!dgd}|I d H }|"dr>t#|d  t }|| d } $||  |d |  _%t&j' _W qW q> t(y   td jj t)*dI d H  Y q> t+y   td Y q>0 q>|d dkrt,d|n@t)j- fdd|d  D  I d H  td jj td  j./|  j d /   j d /   j01 j2| d S )Nr  r  1-------------------------------------------------zWorker->Schedulerzregister-workerFc                 S  s"   i | ]}|j d kr|j| qS )rW  )r   r   r   r   r|   r|   r}   r    s   
z3Worker._register_with_scheduler.<locals>.<dictcomp>c                 S  s   i | ]\}}|t |qS r|   )r,   r  r|   r|   r}   r    r   )r  Zreplyrf  r  r{  r  r.  r   r  nowrF  r#  r  r4  r   pidr|  r6  r  Zmsgpackserializersdeserializerswarningr   rD   Waiting to connect to: %26s皙?z&Timed out when connecting to schedulerr  OKz#Unexpected response from register: c                 3  s    | ]\}} j ||d V  qdS ))r.  pluginNr  )r   r.  r!  r   r|   r}   r   1  s   z2Worker._register_with_scheduler.<locals>.<genexpr>zworker-pluginsz        Registered to: %26s)3r  stoprD   r  rf  r  r  r4   r9  r"  r.  weakrefrefZ_serverwriter   r  r  r(  r  r   valuesr  r  r#  r  service_portsr   r  getpidrf   r	  r  readr  r  _update_latencyr/  r:   runningOSErrorasynciosleeprP   r  rq  r-  startr*  add_callbackhandle_scheduler)r   r0  Z_startr-   ZfutureresponseZ_endmiddler|   r   r}   _register_with_scheduler  s    







zWorker._register_with_schedulerc                 C  s2   |d | j d  | _ | jd ur.| jd | d S )Nr  ffffff?r  )r  digestsr   )r   r  r|   r|   r}   r+  ?  s    
zWorker._update_latencyc              
     s   j rtd d S d _ td j zz t t jj j 	 I d H  fdd j
D dI d H }t }| d } |  |d d	krt  jtjv rt d
 k rtdI d H  q jtjv r  I d H  W W d _ d S |d |  _|d d  jd _ j   j  W n tyj   tjddd  jsf jddI d H  Y nL ty } z2dt|v r jddI d H  n|W Y d }~n
d }~0 0 W d _ nd _ 0 d S )NzHeartbeat skipped: channel busyTzHeartbeat: %sc                   s(   i | ] }| j v r| j | j qS r|   )r   r   r   r   r   r0  r|   r}   r  Q  s   
z$Worker.heartbeat.<locals>.<dictcomp>)rf  r  r6  rj   r   r  r[        ?{Gz?FrD   zheartbeat-intervalr  r  zHeartbeat to scheduler failedexc_inforeportzTimed out trying to connect)r2  r  debugrf  rD   rb   r9  Zheartbeat_workerr  r	  r   r+  r  r:   ANY_RUNNINGr.  r/  r5  r/  r  callback_timer  clearr  r9   r  r+  r  r-  r   )r   r3  endr4  er|   r9  r}   r  D  sT    



zWorker.heartbeatc              
     s   zz | j || j| jgdI d H  W n0 tyR } zt|  W Y d }~n
d }~0 0 W | jr| jtj	v rt
d | j| j q| jddI d H  n@| jr| jtj	v rt
d | j| j n| jddI d H  0 d S )N)Zevery_cyclez0Connection to scheduler broken.  Reconnecting...Fr>  )Zhandle_streamensure_communicatingensure_computingr  r  r   r+  r  r:   rA  r  r*  r1  r  r  )r   r-   rE  r|   r|   r}   r2  y  s    


zWorker.handle_schedulerc                 C  s6   ddl m} | jdu r,|| jd| itd| _| j S )zUStart an IPython kernel

        Returns Jupyter connection info dictionary.
        r   )rx  Nrj  )ipnsr   )Z_ipython_utilsrx  r3  rH  r  Zget_connection_info)r   r-   rx  r|   r|   r}   rx    s    
zWorker.start_ipythonc              
     s   t j| j|  fdd}t|dk r2||}nt||I d H }|rzt  tj	  W n2 t
y } zt| |W Y d }~n
d }~0 0 dt|dS )Nc                   sR   t | tr|  } t d"}||  |  W d    n1 sD0    Y  | S )Nwb)r   r   encodeopenr&  flush)r(  r
  Zout_filenamer|   r}   func  s    

&z Worker.upload_file.<locals>.funci'  r   )r  r   )r  r  r  r  r   rZ   rT   cache_loadsr(  rC  r  r  r   )r   r-   filenamer(  loadrO  rE  r|   rN  r}   rw    s    

zWorker.upload_filec                 C  s
   t | jS r   )r  r(  r   r|   r|   r}   r{    s    zWorker.keyszdict[str, list[str]])r   c                   sl    fdd|  D }t| j jdI d H \}}} j|dd |r`td||| d|dS d	d
iS d S )Nc                   s*   i | ]"\}}| j vr|d d |D qS )c                 S  s   g | ]}t |qS r|   )r;   )r   Zaddrr|   r|   r}   r     r   z,Worker.gather.<locals>.<dictcomp>.<listcomp>r(  r  r   r|   r}   r    s   
z!Worker.gather.<locals>.<dictcomp>)r  whoF)r(  r?  z4Could not find data: %s on workers: %s (who_has: %s)zpartial-fail)r  r{  r  r   )r  r`   r  rf  ru  r  r  )r   r   r  Zmissing_keysZmissing_workersr|   r   r}   rq    s     


zWorker.gatherr   c                 C  sV   t |r| j n| jj|d| jj| jjd}t dkrR| jj|d< | jj	|d< |S )N)r0  )range_queryr  	last_timer   gpu_namegpu_memory_total)
r   r  r  rU  r  rV  r?   r  rW  rX  )r   r  r0  r  r|   r|   r}   r    s    	zWorker.get_monitor_infoc                   s   j r  j tjtjtjfv r d S  j tju s6J  j t  I d H  t  t	 j
}|D ]}t j| j j jd} jd} jdv r| }tt jj|d< z j|fi |I d H  W nN ty } z4t|dkr|jtjkrW Y d }~qZn W Y d }~qZd }~0 0 | _ q<qZtd j d j
 t tj d j!d	} "| j#  j$rzd
d l%}W n t&y   t'(d Y n0 |j)j*j+ j, j-  j!d t j _. j/d u r҈ j _/ j0D ]}| I d H  q؈ 1 j. zd j2j3 j. j4f }	W n& t5y:    j2j3  j. }	Y n0 t'6d j t'6d|	  j78 D ]*\}
}t'6d9|
 j.d t:|  q`t'6d jj t'6d t'6d j;  j<rt'6dt= j< t'6d j> t?d j  t@jA fdd jBD ddiI d H }dd |D }t|dkrft|dkr^t'Cd |D ]}t'CtD| qH|d
 d _B E I d H   F   S ) N)rK  rL  rJ  rM  r!  rj  )ZtcpZtlsZdefault_hostr   zCould not start Worker on host z
with port zdistributed.worker.http.routes)servermodulesre  r   z4To start diagnostics web server please install Bokehrd  z%s%s:%dz      Start worker at: %26sz         Listening to: %26sz  {:>16} at: {:>26}rg  r  r  z              Threads: %26dz               Memory: %26sz      Local Directory: %26szdask-worker [%s]c                 3  s   | ]} j |d dV  qdS )F)r!  catch_errorsNr"  r   r!  r   r|   r}   r   9  s   zWorker.start.<locals>.<genexpr>Zreturn_exceptionsTc                 S  s   g | ]}t |tr|qS r|   )r   r  )r   r  r|   r|   r}   r   ?  r   z Worker.start.<locals>.<listcomp>zVMultiple plugin exceptions raised. All exceptions will be logged, the first is raised.r|   )Gr  r:   r  closingclosing_gracefullyZ	undefinedr  r0  re   r[   r  r6   r  r  r  r!  Zget_listen_argsr  rR   r5   r9  rf  Zlistenr-  r   errnoZ
EADDRINUSEZ_start_addressr  rC   r  r  r  r  Zstart_http_serverr  r  Zdistributed.dashboard.workerImportErrorr  r@  rO  rj  r4   Zhttp_applicationZhttp_serverrH  r.  r  Zstart_servicesZlistenerre  rL  r  r  r(  r  formatr   r  r#  r'   r  rF   r.  rq  r?  rX  reprr5  Zstart_periodic_callbacks)r   ZportsrL  Zstart_addressrv   rE  ZroutesZdistributedrH  Zlistening_addressr  r  Zplugins_msgsZplugins_exceptionsexcr  r   r}   r0    s    






$

zWorker.startc                 O  s   t jddd | j|i |S )Nz'Worker._close has moved to Worker.closer   r  )r  r  r  )r   ru   rv   r|   r|   r}   _closeP  s    zWorker._close   c              	     sh  t  J  jtjtjfv r:  I d H  W d    d S d _t  zt	d j
 W n tyt   t	d Y n0  jtjvrt	d j tj _ jD ]}| I d H  q|r jr  j}| I d H  W d    n1 s0    Y  td  fdd j D }tjdd	 |D  I d H   j D ]}	|	  q: jrt fd
d	tjD stjD ],}
d|
_|
jr|
  I d H  n|
   qrt!t"t#B |rވ j$d urt% j&j' j$|d|I d H  W d    n1 s0    Y   j&( I d H   j)*   +   j,dkr<t-dI d H   j.rl j.j/rl j.j/ sl j.0ddi  j.rt!t#(  j. t1|dI d H  W d    n1 s0    Y   j2 D ]J}|t3j4u rԐqt5|t6r|j7j89  |j:||d n|j:|d q    j  I d H  tj _t;   I d H  td W d    n1 sZ0    Y  dS )NFzStopping worker at %szStopping workerz%Closed worker has not yet started: %szdask-worker [closing]c                   s    g | ]}t |d r| qS )teardown)hasattrrf  r\  r   r|   r}   r   p  s   
z Worker.close.<locals>.<listcomp>c                 s  s   | ]}t |r|V  qd S r   r   )r   Ztdr|   r|   r}   r   v  r   zWorker.close.<locals>.<genexpr>c                 3  s&   | ]}| kr|j tjv r|V  qd S r   r  r:   rA  r   r  r   r|   r}   r   ~  s   T)rf  safeZucxg?r  zclose-stream)Zseconds)waittimeout)rk  zdask-worker [closed]r   )<rY   r  r:   r  r]  finishedr+  rd   r  r  rf  r  rA  r  rf  r   r  r  rF   r>  r'  r.  rq  r  r#  r  r   r   r   r   Z_asynchronousasynchronousr  r   EnvironmentErrorrP   r  wait_forr9  Z
unregisterZ	close_rpcr
  releaseZstop_servicesr  r/  r-  r-   r  r   r,  r1   r  r   rM   Z_work_queueZqueuerC  Zshutdownr  )r   r?  rl  r   Zexecutor_waitrj  rH  rZ	teardownsr  crE  r  r   r}   r  T  s    


,


*


8(zWorker.closec                   s   | j tjtjfv r |  I dH  | j tjkr0dS |du r>| j}td| j	 | j
j| j	gdddI dH  | jd| dI dH  dS )zGracefully shut down a worker

        This first informs the scheduler that we're shutting down, and asks it
        to move our data elsewhere.  Afterwards, we close as normal
        NzClosing worker gracefully: %sF)r  Zclose_workersr   T)rj  r   )r  r:   r]  r^  rm  r  r  r  r  rf  r9  Zretire_workersr  )r   Zrestartr|   r|   r}   r    s    
zWorker.close_gracefullyr>  c                   s   | j f d|i|I d H  dS )Nr?  r   )r  )r   r?  rv   r|   r|   r}   rv    s    zWorker.terminatec                   s,   t d |  I d H  | jtjks(J d S )Nz)wait_until_closed has moved to finished())r  r  rm  r  r:   r  r   r|   r|   r}   wait_until_closed  s    
zWorker.wait_until_closedc                   sR    j vr>tdjdj  <  fdd}j| j   | d S )NZ1msro  c                    s@   t  fi jI d H } d| _| ddiI d H  |  d S )NzWorker->Workerr  Zconnection_stream)r4   r"  r.  r&  r0  )r-   rf  Zbcommr   r|   r}   batched_send_connect  s    z3Worker.send_to_worker.<locals>.batched_send_connect)r0  r2   r*  r1  r  )r   rf  r  rv  r|   ru  r}   send_to_worker  s    

	zWorker.send_to_workerc                   s~  t  }|d u r j}|r8|r8t|jt jkr8|d } jtjkrNd}d}nd}|dur j|krt	
d j| j|| ddiS   jd7  _ fd	d
|D }t|t|k rt|t| D ]:}	|	 jv rddlm}
 |
t j|	  j|	 d||	< qddd
 | D d} fdd
|D }t  } jd urR jd ||  t  }zz<|j||dI d H }|j|dI d H }|dksJ |W n2 ty   t	jd j|dd |   Y n0 W   jd8  _n  jd8  _0 t  } jd ur jd ||  ttd | }  jd7  _|| p>d} j| j | j || d ||||||| d	 tj S )Nr   r   z: Throttling outgoing connections because worker is paused.r   FzUWorker %s has too many open connections to respond to data request from %s (%d/%d).%sr  busyc                   s"   i | ]}| j v r| j | qS r|   rS  r   r  r   r|   r}   r    r   z#Worker.get_data.<locals>.<dictcomp>r   rj  r   c                 S  s   i | ]\}}|t |qS r|   )rH   r  r|   r|   r}   r  (  r   )r  r(  c                   s$   i | ]}| j v r| j | jqS r|   r   r   ry  r   r|   r}   r  )  r   zget-data-load-durationr  r  z$failed during get data with %s -> %sTr<  zget-data-send-durationr:  )	r0  r#  r4  r   rT  r{  r  
compressedr  )!rD   r   r5   Zpeer_addressrf  r  r:   pausedr  r  r@  r   r   r)  rm  r   r   r  r7  r   r&  r*  r-  r   abortsumfilterr'  r  r   appendr/  Z
dont_reply)r   r-   r{  rT  r  max_connectionsr0  Zthrottle_msgr(  r  r   r  r   r#  r|  r3  total_bytesr   r|   r   r}   rt    s    	
"

 
zWorker.get_datazdict[str, object])r(  r?  stimulus_idc                 C  s8  |d u rdt   }i }g }| D ]\}}z| j| }d|f||< W n ty   t| | j|< }z| j|||d}	W n< ty }
 z$t|
}|t|	 i}W Y d }
~
nd }
~
0 0 |
|	 Y n0 | j|d|t  f q$|r|dt||d | j||d |D ]}| j| qdd | D d	d
S )Nzupdate-data-rW  r  zreceive-from-scatteradd-keysr  r{  r  c                 S  s   i | ]\}}|t |qS r|   )sizeofr  r|   r|   r}   r  z  r   z&Worker.update_data.<locals>.<dictcomp>r   )r   r  )rD   r  r   KeyErrorr~   _put_key_in_memoryr  r<   r  r'  r  r   r  r  transitionsr-  r  )r   r(  r?  r  recommendationsZscheduler_messagesr   r  r   recsrE  r  r|   r|   r}   ru  W  s2    
&zWorker.update_datac                 C  sN   | j d||t f i }|D ]}| j|}|rd||< q| j||d dS )a  
        Handler to be called by the scheduler.

        The given keys are no longer referred to and required by the scheduler.
        The worker is now allowed to release the key, if applicable.

        This does not guarantee that the memory is released since the worker may
        still decide to hold on to the data and task since it is required by an
        upstream dependency.
        r  r   r  N)r   r  rD   r   r  r  )r   r{  r  r  r   r   r|   r|   r}   r  |  s    
zWorker.handle_free_keysc                 C  s   | j d||t f i }g }|D ]X}| j|}|du s"|jdkrFq"| sp| j |jd|t f d||< q"|| q"|r| j d||t f | j	d||d | j
||d	 d
S )a  Stream handler notifying the worker that it might be holding unreferenced,
        superfluous data.

        This should not actually happen during ordinary operations and is only intended
        to correct any erroneous state. An example where this is necessary is if a
        worker fetches data for a downstream task but that task is released before the
        data arrives. In this case, the scheduler will notify the worker that it may be
        holding this unnecessary data, if the worker hasn't released the data itself,
        already.

        This handler does not guarantee the task nor the data to be actually
        released but only asks the worker to release the data on a best effort
        guarantee. This protects from race conditions where the given keys may
        already have been rescheduled for compute in which case the compute
        would win and this handler is ignored.

        For stronger guarantees, see handler free_keys
        r  NrW  zremove-replica-confirmedr   zremove-replica-rejectedr  r  r  r   )r   r  rD   r   r  r   r   r   r-  r  r  )r   r{  r  r  Zrejectedr   r   r|   r|   r}   r    s(    

zWorker.handle_remove_replicasc                   sn   |  D ]D\}}|| jv r8| j|  || j|  7  < n
|| j|< || j|< qt| jj| j| jdI d H  d S )N)rF  rj  )r  r  r  rb   r9  set_resourcesr  )r   rF  rr  quantityr|   r|   r}   r    s    

zWorker.set_resourcesc                 C  sT   | j |}|rP|jtdhB v rP| j|d|t f |jr@J | j|d|d dS )z
        Cancel a task on a best effort basis. This is only possible while a task
        is in state `waiting` or `ready`.
        Nothing will happen otherwise.
        rk   r  r   r  N)	r   r  r   READYr   r  rD   r   
transition)r   r   reasonr   r|   r|   r}   r    s
    
zWorker.handle_cancel_computezCollection[str]zdict[str, Collection[str]])r{  r   r  c                C  sL   i }|D ]&}| j |d|d}|jdkrd||< q| | | j||d d S )N)r   r   r   r  rW  rU  r  )ensure_task_existsr   update_who_hasr  )r   r{  r   r  r  r   r   r|   r|   r}   r    s    	


zWorker.handle_acquire_replicasztuple[int, ...]r~   )r   r   r  r   c                C  st   z| j | }td|| W n$ ty@   t| | j |< }Y n0 |jsV|sPJ ||_| j|d|j|t	 f |S )Nz+Data task %s already known (stimulus_id=%s)zensure-task-exists)
r   r  r@  r  r~   r   r   r  r   rD   )r   r   r   r  r   r|   r|   r}   r    s    
zWorker.ensure_task_exists)rt   ru   rv   rw   r   r   rm  r   zdict[str, int] | None)r   r   r   r   r   rm  r  c                C  s  | j |d|t f z | j| }td||d W n$ tyZ   t| | j|< }Y n0 t|||||_	t
|tszJ || jf }|  jd8  _|rd | j|j< d |_d |_d|_d|_||_||_|
r|
|_||_i }g }|D ],}| j|||d}|j| |j| q|jth dB v r(nP|jdkrNd||< || | n*|jd	v rdd
||< ntd| d| |D ]}| j | q|| !| | j"||d |	d ur|	# D ]\}}|| j| _$qd S )Nr  z)Asked to compute an already known task %s)rw   r  r   r   r  >   rj   rk   ri   rW  >   r   r[  rX  r]  rU  rl   rk   z"Unexpected task state encountered r   r  )%r   r  rD   r   r  r@  r  r~   rr   r   r   r  r   r)  r   r   r   r   r   r   r   r   r   r  r   r   r   r   r  _get_task_finished_msgRuntimeErrorr-  r  r  r  r  r   )r   r   r   r   r   rt   ru   rv   rw   r   r   rm  r   r  r   r  Zscheduler_msgs
dependencyr   r  r  r|   r|   r}   r    sb    



zWorker.handle_compute_taskc                C  sN   | j r"|jdksJ |jd us"J | j| d|_d|_| j| i g fS )Nr[  rU  F)r   r   r   r   discardr   r   r   r   r   r  r|   r|   r}   r  [  s    zWorker.transition_missing_fetchc                C  s6   | j | | j||d\}}|j| jv s.J ||fS Nr  )r   r  r  r   r   )r   r   r  r  smsgsr|   r|   r}   r  f  s    
z"Worker.transition_missing_releasedc                C  s*   |j s
J d|_| j| d|_ i g fS )Nr[  F)r   r   r   r   r  r|   r|   r}   r  n  s
    
z Worker.transition_flight_missingc                C  s^   | j r"|jdksJ |jd us"J |jD ]}| j| | q(d|_d|_| j| i g fS )Nr   rU  F)r   r   r   r   r   r   r   r   )r   r   r  r  r|   r|   r}   r  u  s    
z Worker.transition_released_fetchc                C  sX   | j |j|d i }|jD ]&}|js|jttB dhB vrd||< q|jsPd||< |g fS )N)r  rW  r   rV  )release_keyr   r   r   r   r  r   r   )r   r   r  r  r  r|   r|   r}   r    s    

z"Worker.transition_generic_releasedc                  s    j r0|jdksJ t fdd|jD s0J i }|j  |jD ]8}|jdksD|j| |j| |jdvrDd||< qD|jr  jd7  _n|j	rd||< nd	||< d
|_|g fS )Nr   c                 3  s   | ]}|j  jv V  qd S r   )r   r   r   dr   r|   r}   r     r   z5Worker.transition_released_waiting.<locals>.<genexpr>rW  >   rU  r]  rU  r   rh   rg   rk   )
r   r   allr   r   rC  r   r   r   r   )r   r   r  r  r   r|   r   r}   r    s$    





z"Worker.transition_released_waitingc                C  sD   | j r|jdksJ |jsJ d|_d|_||_| j| i g fS )NrU  Fr]  )r   r   r   r   r   r   r   )r   r   rj  r  r|   r|   r}   r    s    
zWorker.transition_fetch_flightc                C  s,   | j ||d\}}|d|jd ||fS )Nr  zrelease-worker-data)r  r   )r  r  r   r   r   r  r  r  r|   r|   r}   r    s    z!Worker.transition_memory_releasedc                  s~    j rb|jdksJ |jrJ t fdd|jD s:J tdd |jD sRJ |j jvsbJ d|_ j|j i g fS )Nrk   c                 3  s&   | ]}|j  jv p|j  jv V  qd S r   r   r(  r)  r   depr   r|   r}   r     s   z8Worker.transition_waiting_constrained.<locals>.<genexpr>c                 s  s   | ]}|j d kV  qdS )rW  Nr   r  r|   r|   r}   r     r   rh   )	r   r   r   r  r   r   rg   rh   r  r  r|   r   r}   r    s    
z%Worker.transition_waiting_constrainedc                C  s"   |di}d|j | jdg}||fS Nr   Z
reschedule)r  r   rj  )r   rf  r  r|   r|   r}   #transition_long_running_rescheduled  s    z*Worker.transition_long_running_rescheduledc                C  sT   |j  D ]\}}| j|  |7  < q
| j| |di}d|j| jdg}||fS r  )r   r  r  r   r  r   rf  )r   r   r  resourcer  r  r  r|   r|   r}   r    s    z'Worker.transition_executing_rescheduledc                C  s   | j rr|jdksJ |j| jvs$J |jr.J |jd us<J |jD ].}|j| jv sb|j| jv sbJ |jdksBJ qBd|_t	
| j|j|jf i g fS )Nrk   rW  rg   )r   r   r   rg   r   r   r   r(  r)  r   r   )r   r   r  r  r|   r|   r}   r    s    

zWorker.transition_waiting_readyc          	      C  sl   i g  }}|j dkr0| j||||||d\}}n$|j dkrT| j||||||d\}}|jrd|j||< ||fS )Nrj   r  r]  )r   r  r  r   )	r   r   r   r   r   r   r  r  Zmsgsr|   r|   r}   r    s,    




z!Worker.transition_cancelled_errorc             	   C  sb   ||_ ||_||_||_d|_dd|j| j|j|j |j|j|jd}|jrX|j|d< i |gfS )NrX  
task-erred)r  r  r   threadr   r   r   r   r   )	r   r   r   r   r   r   r   r  r   )r   r   r   r   r   r   r  Zsmsgr|   r|   r}   r    s"    
zWorker.transition_generic_errorc          	      C  sH   |j  D ]\}}| j|  |7  < q
| j| | j||||||dS r  )r   r  r  r   r  r  )	r   r   r   r   r   r   r  r  r  r|   r|   r}   r  	  s    z!Worker.transition_executing_errorc                C  sJ   i g  }}|j r<|j}|j|kr2| j||d\}}|||< n||_||fS )a  `resumed` is an intermediate degenerate state which splits further up
        into two states depending on what the last signal / next state is
        intended to be. There are only two viable choices depending on whether
        the task is required to be fetched from another worker `resumed(fetch)`
        or the task shall be computed on this worker `resumed(waiting)`.

        The only viable state transitions ending up here are

        flight -> cancelled -> resumed(waiting)

        or

        executing -> cancelled -> resumed(fetch)

        depending on the origin. Equally, only `fetch`, `waiting` or `released`
        are allowed output states.

        See also `transition_resumed_waiting`
        r  )r   r   r  )r   r   finishr  r  r  
next_stater|   r|   r}   _transition_from_resumed#	  s    



zWorker._transition_from_resumedc                C  s   | j |d|dS )5
        See Worker._transition_from_resumed
        rU  r  r  r  r|   r|   r}   r  E	  s    zWorker.transition_resumed_fetchc                C  s   | j |d|dS )r  r[  r  r  r  r|   r|   r}   r  K	  s    z!Worker.transition_resumed_missingc                C  s   | j |d|dS )r  rk   r  r  r  r|   r|   r}   r  Q	  s    z!Worker.transition_resumed_waitingc                C  sJ   |j r|dig fS |jdkr,|j|_i g fS |jdks:J |dig fS d S )Nr   r]  rj   rZ  r   r   r   r  r|   r|   r}   r  W	  s    
z!Worker.transition_cancelled_fetchc                C  s   ||_ d|_i g fS )Nri   )r   r   )r   r   nextr  r|   r|   r}   r  a	  s    z#Worker.transition_cancelled_resumedc                C  sJ   |j r|dig fS |jdkr,|j|_i g fS |jdks:J |dig fS d S )Nr   rj   r]  rY  r  r  r|   r|   r}   r  f	  s    
z#Worker.transition_cancelled_waitingc                C  s    d|_ |jsi g fS |dig fS )NrV  r   )r   r   r  r|   r|   r}   r  p	  s    z%Worker.transition_cancelled_forgottenc                C  s   |j sd|_i g fS |j}| j| | j| |j D ]\}}| j|  |7  < q<| j||d\}}|dkrz|||< ||fS )Nr   r  )	r   r   r   r  r   r   r  r  r  )r   r   r  r  r  r  r  r  r|   r|   r}   r  v	  s    
z$Worker.transition_cancelled_releasedc                C  s"   |j |_d|_d|_ d|_i g fS )Nr   rl   F)r   r   r   r   r  r|   r|   r}   r  	  s
    z$Worker.transition_executing_releasedc                C  s   |  j d7  _ | j|||dS )Nr   r  r  )r   r  r   r   r  r  r|   r|   r}   r  	  s    z%Worker.transition_long_running_memoryc          
   
   C  s   |t u r$|j| jvr$td| d|jd urT|j D ]\}}| j|  |7  < q8| j| | j	| d |_
z| j|||d}W nD ty } z,t|}|t| i}|g fW  Y d }~S d }~0 0 |j| jv s|j| jv sJ | |g}	||	fS )NzTried to transition task z# to `memory` without data availabler  )no_valuer   r(  r  r   r  r  r   r  r   r   r  r  r<   r  r'  r)  r  )
r   r   r  r  r  r  r  rE  r  r  r|   r|   r}   r  	  s&    

z Worker.transition_generic_memoryc                C  sd   | j r:|jdks |j| jv s J |jr*J |j| jvs:J | j| |  jd7  _| j	|||dS )Nrj   r   r  )
r   r   r   r   r   rg   r   r  r   r  r  r|   r|   r}   r  	  s    
z"Worker.transition_executing_memoryc                C  s   | j rf|jrJ |j| jvs J |jtv s.J |j| jvs>J |jD ] }|j| jv sD|j| jv sDJ qD|j	
 D ]\}}| j|  |8  < qpd|_| j| | jj| j|j|d i g fS )Nrj   r  )r   r   r   r(  r   r  rg   r   r)  r   r  r  r   r   r*  r1  execute)r   r   r  r  r  r  r|   r|   r}   r  	  s    

z'Worker.transition_constrained_executingc                  s    j rZ|jrJ |j jvs J |jtv s.J |j jvs>J t fdd|jD sZJ d|_ j	
|  jj j|j|d i g fS )Nc                 3  s&   | ]}|j  jv p|j  jv V  qd S r   r  r  r   r|   r}   r   	  s   z4Worker.transition_ready_executing.<locals>.<genexpr>rj   r  )r   r   r   r(  r   r  rg   r  r   r   r   r*  r1  r  r  r|   r   r}   r  	  s    
z!Worker.transition_ready_executingc                C  sh   |j r\i }d|_d |_d|_ |js,d||< n(| j| |jD ]}| j| | q>|g fS i g fS d S )NrU  Fr[  )r   r   r   r   r   r   r   )r   r   r  r  r  r|   r|   r}   r  	  s    

zWorker.transition_flight_fetchc                C  s(   | j | d |_| j||||||dS r  )r   r  r   r  )r   r   r   r   r   r   r  r|   r|   r}   r  	  s    zWorker.transition_flight_errorc                C  s2   |j r| j||dS d|_d|_d|_i g fS d S )Nr  r]  r   rl   )r   r  r   r   r   r  r|   r|   r}   r  	  s    z!Worker.transition_flight_releasedc                C  s   ||j ig fS r   )r   r  r|   r|   r}   r  
  s    z"Worker.transition_cancelled_memoryc                C  sF   d|_ | j| | j|j d|j|dg}| j| j i |fS )Nrm   )r  r   compute_duration)	r   r   r  r   r   r   r  r1  rG  )r   r   r  r  r  r|   r|   r}   r  
  s    z(Worker.transition_executing_long_runningc             
   C  s   i }z| j |||d}W nV tyn } z>t|}d|d |d |d |d f||< |g fW  Y d }~S d }~0 0 d|jg|dg}||fS 	Nr  rX  r   r   r   r   r  r  )r  r  r<   r   r   r   r  r  r  rE  r  r  r|   r|   r}   r  
  s     
z!Worker.transition_released_memoryc             
   C  s   | j | d |_i }z| j|||d}W nV ty } z>t|}d|d |d |d |d f||< |g fW  Y d }~S d }~0 0 d|jg|dg}||fS r  )r   r  r   r  r  r<   r   r  r|   r|   r}   r  &
  s$    
zWorker.transition_flight_memoryc                C  sp   i }| j r"tdd |jD r"J |jD ](}|j| |jdkr(|js(d||< q(d|_| j|jd  |g fS )Nc                 s  s   | ]}|j d kV  qdS )rV  Nr  r  r|   r|   r}   r   ?
  r   z7Worker.transition_released_forgotten.<locals>.<genexpr>r   rV  )	r   r   r   r   r  r   r   r   r   )r   r   r  r  r  r|   r|   r}   r  ;
  s    

z$Worker.transition_released_forgottenc             
   O  s  t |tr|rJ |^}}|d u s,|j|kr4i g fS |j}| j||f}|d ur|  jd7  _||g|R d|i|\}}	| jd|j||fi | nd||fvrXzz| j|d|d\}}	|||g|R }
t |
tr|
^}}n
|
d }}| j||g|R d|i\}}|	| |	|7 }	W n2 t
yT   t
d| d| d	|j d Y n0 nt
d| d| d	|j | j|j|||jd
d | D |t f ||	fS )Nr   r  r  r   r  r|   zImpossible transition from z to z for c                 S  s   i | ]\}}|j |qS r|   r   )r   r   newr|   r|   r}   r  ~
  r   z&Worker._transition.<locals>.<dictcomp>)r   r  r   r   r  r  _notify_pluginsr   _transitionr  rx   r   r  r  rD   )r   r   r  r  ru   rv   r0  rO  r  r  r  Zv_stateZv_argsZb_recsZb_smsgsr|   r|   r}   r  J
  s^    






zWorker._transition)r  c                K  sD   | j ||fd|i|\}}|D ]}| j| q | j||d dS )as  Transition a key from its current state to the finish state

        Examples
        --------
        >>> self.transition('x', 'waiting')
        {'x': 'processing'}

        Returns
        -------
        Dictionary of recommendations for future transitions

        See Also
        --------
        Scheduler.transitions: transitive version of this function
        r  r  N)r  r-  r  r  )r   r   r  r  rv   r  r  r  r|   r|   r}   r  
  s    zWorker.transition)r  c                C  s   g }|  }t }|rT| \}}|| | j|||d\}}	|| ||	7 }q| jrn|D ]}| | q^| j	 s|D ]}
| j
|
 q|ntdt| dS )zProcess transitions until none are left

        This includes feedback from previous transitions and continues until we
        reach a steady state
        r  z@BatchedSend closed while transitioning tasks. %d tasks not sent.N)r  r   popitemr   r  r  r   validate_taskr-  r  r  r  r@  r   )r   r  r  r  Zremaining_recsr   r   r  Za_recsZa_smsgsr  r|   r|   r}   r  
  s&    



zWorker.transitions)r  c                C  s.   |j dkr*| j|d||d |j dks*J d S )Nrj   rm   r  r  )r   r  )r   r   r  r  r|   r|   r}   maybe_transition_long_running
  s    
z$Worker.maybe_transition_long_runningc                 C  s6   | j | }|jdkt|j|td| jv || jv dS )Nrj   r   )rj   r   heapr(  )r   r   r   r   r!   rg   r(  )r   r   r   r|   r|   r}   stateof
  s    
zWorker.stateofc                   s"   dd  D   fdd| j D S )Nc                 S  s    g | ]}t |tr|jn|qS r|   )r   r~   r   r8  r|   r|   r}   r   
  r   z Worker.story.<locals>.<listcomp>c                   s<   g | ]4 t  fd dD s4t  fddD r qS )c                 3  s   | ]}| v V  qd S r   r|   r8  r  r|   r}   r   
  r   z*Worker.story.<locals>.<listcomp>.<genexpr>c                 3  s0   | ](} D ]}t |tttfr
||v V  q
qd S r   )r   r  r  r   )r   r   rs  r  r|   r}   r   
  s   )r   )r   r{  r  r}   r   
  s   )r   )r   r{  r|   r  r}   story
  s    
zWorker.storyc           
        s~  dt   }g }jrbtjjk s6jjk rbtdtjtjj j	 }|j
dkrjqfdd|jD }|s|jd usJ || qtj  fdd|D }|rt|ntt|jksJ |j\}}jd||t  f  j|7  _|j< fdd	|D }j||d
 jjj|||d q|D ]}	j|	 qfd S )Nzensure-communicating-z5Ensure communicating. Pending: %d. Connections: %d/%drU  c                   s   g | ]}| j vr|qS r|   )r   ri  r   r|   r}   r   
  r   z/Worker.ensure_communicating.<locals>.<listcomp>c                   s   g | ]}t | kr|qS r|   )r5   ri  )rK  r|   r}   r   
  r   zgather-dependenciesc                   s   i | ]} j | d fqS )r]  )r   r  )r   rj  r|   r}   r    r   z/Worker.ensure_communicating.<locals>.<dictcomp>r  )rj  	to_gathertotal_nbytesr  )rD   r   r   r   r   r   r   r  r@  r   r   r   r   r  r5   rf  r  choicer  select_keys_for_gatherr   r   r  r*  r1  
gather_depr   )
r   r  Zskipped_worker_in_flightr   r  localr  r  r  elr|   )rK  r   rj  r}   rF  
  sX    





zWorker.ensure_communicatingc              	   C  s   |j | jvr(|j | jvr(td| d|j}|jd u s@|d u rz| j|j  }W n tyn   | j|j  }Y n0 t||_t| }|_~zt|}W n" t	y   t
j|jdd}Y n0 dd|j |j| j|j |t||jd}|jr|j|d< |S )	NTask z
 not ready   rM  task-finishedr   )r  r  r   r   r  r   r,   r   r   )r   r(  r)  r  r   r   r  r  dumps_functionr   rG   dumpsry   r   r  r,   r   r   )r   r   typr  Ztyp_serializedr  r|   r|   r}   r    s6    


zWorker._get_task_finished_msgc                C  s   |j | jv rd|_i S i }|j | jv r4|| j|j < n8t }|| j|j < t }|| dkrl|jd||d d|_|jdu rt||_t	||_	|j
D ]6}|j| |js|jdkr|  jd8  _d||< q| j|j d	|t f |S )
at  
        Put a key into memory and set data related task state attributes.
        On success, generate recommendations for dependents.

        This method does not generate any scheduler messages since this method
        cannot distinguish whether it has to be an `add-task` or a
        `task-finished` signal. The caller is required to generate this message
        on success.

        Raises
        ------
        TypeError:
            In case the data is put into the in memory buffer and an exception
            occurs during spilling, this raises an exception. This has to be
            handled by the caller since most callers generate scheduler messages
            on success (see comment above) but we need to signal that this was
            not successful.
            Can only trigger if spill to disk is enabled and the task is not an
            actor.
        rW  g{Gz?z
disk-writeactionr0  r#  Nrk   r   rg   zput-in-memory)r   r(  r   r)  rD   r   r  r   r  r   r   r   r  r   r   )r   r   r  r  r  r0  r#  r  r|   r|   r}   r  3  s0    





zWorker._put_key_in_memoryc                 C  sz   t |tsJ |h}| j|  }| j| }|rr| }|jdkrDq,||  | jkrXqr||j	 || 7 }q,||fS )NrU  )
r   r   r   r   r   r   r   r   r   r   )r   rj  r  depsr  Lr   r|   r|   r}   r  g  s    

zWorker.select_keys_for_gatherc                 C  s   t dt | jS )NzThe attribute `Worker.total_comm_bytes` has been renamed to `comm_threshold_bytes`. Future versions will only support the new name.)r  r  FutureWarningr   r   r|   r|   r}   total_comm_bytesy  s
    zWorker.total_comm_byteszIterable[str]z+tuple[set[str], set[str], TaskState | None])to_gather_keysr   c                 C  s   t  }t  }|D ]`}| j|}|du r*q|jdv r@|| q|jdv rV|| qtd|j d|j dqd}|D ]"}|jrtt	|j} qqz|}qzdd |D }|||fS )	a  Filter a list of keys before scheduling coroutines to fetch data from workers.

        Returns
        -------
        in_flight_keys:
            The subset of keys in to_gather_keys in state `flight` or `resumed`
        cancelled_keys:
            The subset of tasks in to_gather_keys in state `cancelled` or `memory`
        cause:
            The task to attach startstops of this transfer to
        N)r]  ri   >   rW  rl   r  z found in illegal state z;. Only states `flight`, `resumed` and `cancelled` possible.c                 S  s   h | ]
}|j qS r|   r   r   r|   r|   r}   r     r   z0Worker._filter_deps_for_fetch.<locals>.<setcomp>)
r   r   r  r   r   r  r   r   r  iter)r   r  r  cancelled_keysr   r   causeZin_flight_keysr|   r|   r}   _filter_deps_for_fetch  s*    

	zWorker._filter_deps_for_fetch)r0  r#  r(  r  rj  r   c                   st  t  fdd|D }|jd| j | j |d || pBd}|| } j| j | j || d  j | fdd|D |||d	 |d
kr jd |d   _ j| \}	}
|	| |
d f j|< ttt	|
 }t|dkr|\} j| \}	}
|	| |
d f j|<  jd urN jd ||   jd |  jd t|   jd7  _d S )Nc                 3  s   | ]} j |  V  qd S r   )r   r   r8  r   r|   r}   r     r   z7Worker._update_metrics_received_data.<locals>.<genexpr>Ztransfer)r  r0  r#  sourcer;  g       @c                   s   i | ]}| j | jqS r|   r{  r8  r   r|   r}   r    r   z8Worker._update_metrics_received_data.<locals>.<dictcomp>)r0  r#  r4  r   r{  r  r  rT  i@B r6  r  r   ztransfer-bandwidthztransfer-durationztransfer-count)r  r   r  r/  r   r  r  r   mapr   r'  r   r  r7  r   Zcountersr  )r   r0  r#  r(  r  rj  r  r   r  ZbwZcntr  r  r|   r   r}   _update_metrics_received_data  sF    
z$Worker._update_metrics_received_data)rj  r  r  c                  s  | j tjvrdS i }t X i }t }t }zz| |\}}}	|s| jd|||t f W W |  j	|8  _	|
dddk}
|
di }|
r| jd|||t f | j|D ]}| j| }d|_||v r|jd	krd
||< nd||< q||v rd|| f||< q|
r d||< q||vr|j| | j| |j | j|d|t f | jd||d |jr|dnd||< q~~| j||d |   |
sd| _n8|  jd7  _tdd| j  I dH  | j| I dH  |   W d   dS |	sJ ~| jd|||t f tdt||	| t }t | j!||| j"dI dH }t }|d dkrW W |  j	|8  _	|
dddk}
|
di }|
r| jd|||t f | j|D ]}| j| }d|_||v r |jd	krd
||< nd||< n||v rd|| f||< nv|
r,d||< nf||vr|j| | j| |j | j|d|t f | jd||d |jrdnd||< q~~| j||d |   |
sd| _n8|  jd7  _tdd| j  I dH  | j| I dH  |   W d   dS | j#|||d |	|d | jd|t|d |t f W n t$y   t%d| | j|}| j&| | jd|||t f |D ]}| j| }|j'| qY n t(y< } zjt%| | jrt)rddl*}|+  t,|}| j| D ] }| j| }t-|. ||< q W Y d}~n
d}~0 0 W |  j	|8  _	|
dddk}
|
di }|
r| jd|||t f | j|D ]}| j| }d|_||v r|jd	krd
||< nd||< n||v rd|| f||< nv|
rd||< nf||vr|j| | j| |j | j|d|t f | jd||d |jr\dnd||< q~~| j||d |   |
sd| _n8|  jd7  _tdd| j  I dH  | j| I dH  |   n|  j	|8  _	|
dddk}
|
di }|
r| jd|||t f | j|D ]}| j| }d|_||v rf|jd	kr\d
||< nd||< n||v rd|| f||< nv|
rd||< nf||vr(|j| | j| |j | j|d|t f | jd||d |jrdnd||< q(~~| j||d |   |
s$d| _n8|  jd7  _tdd| j  I dH  | j| I dH  |   0 W d   n1 s|0    Y  dS )a  Gather dependencies for a task from a worker who has them

        Parameters
        ----------
        worker : str
            Address of worker to gather dependencies from
        to_gather : list
            Keys of dependencies to gather from worker -- this is not
            necessarily equivalent to the full list of dependencies of ``dep``
            as some dependencies may already be present on this worker.
        total_nbytes : int
            Total number of bytes for all the dependencies in to_gather combined
        Nznothing-to-gatherr  r   rx  r(  zbusy-gatherTrl   r   rU  rW  zmissing-depzmissing-data)r  Zerrant_workerr   r[  r  r   r   r  g      ?zrequest-depz#Request %d keys for task %s from %s)rT  )r0  r#  r(  r  rj  zreceive-depz+Worker stream died during communication: %szreceive-dep-failed)/r  r:   rA  rY   r   r  r   r  rD   r   r  r   r   r   r   r   r   r  r   r   r-  r  r  rG  r  r.  r/  query_who_hasrF  r  r@  r   get_data_from_workerr  rf  r  r-  r   r   r   r  LOG_PDBpdb	set_tracer<   r  r'  )r   rj  r  r  r  r  r3  r  r  r  rx  r(  r  r   r0  r#  r   rE  r  r  r  r|   r|   r}   r    s   

;







$


























zWorker.gather_depc              
     s&  t   | js W d    d S z| jr>| jD ]}|jr.J q.dt  }t| jjdd | jD dI d H }dd | D }| | i }| jD ]}|jrd||< q| j	||d W | j
d	 j| j
d
 _|   |   n&| j
d	 j| j
d
 _|   |   0 W d    n1 s0    Y  d S )Nzfind-missing-c                 S  s   g | ]
}|j qS r|   r   r   r|   r|   r}   r   }  r   z'Worker.find_missing.<locals>.<listcomp>r  c                 S  s   i | ]\}}|r||qS r|   r|   r  r|   r|   r}   r    r   z'Worker.find_missing.<locals>.<dictcomp>rU  r  r  r  )rY   r   r   r   rD   rb   r9  r  r  r  r  rB  rF  rG  )r   r   r  r   r  r|   r|   r}   r  q  s>    







zWorker.find_missing)r  r   c                   sN   t  4 t| jj|dI d H }| | |W  d    S 1 s@0    Y  d S )Nr  )rY   rb   r9  r   r  )r   r  r   r|   r|   r}   r    s    
zWorker.query_who_has)r   r   c              
   C  s   z|  D ]\}}|sq
|| jv r
| j| }| j|v rf| j| jdkrftd| j| t|| jh }|j	| |D ]$}| j
| | | j| | qvq
W nD ty } z,t| trdd l}|   W Y d }~n
d }~0 0 d S )NrW  zDScheduler claims worker %s holds data for task %s which is not true.r   )r  r   rf  r   r  r@  r.  r   r   r  r   r   r   r   r  r   r  r  r  )r   r   r  r  r   rj  rE  r  r|   r|   r}   r    s.    


zWorker.update_who_hasc                 C  sZ   | j |}|d ur|jnd }d|||d}| j| |tdhB v rV| j|d|d d S )Nzsteal-response)r  r   r   r  rk   r   r  )r   r  r   r-  r  r  r  )r   r   r  r   r   r3  r|   r|   r}   r    s    zWorker.handle_steal_request)r  r   c                 C  sD   t j| }|t jkr:| jt jvr:td| j| |   n|| _d S )Nz*Invalid Worker.status transition: %s -> %s)	r:   lookupr^  r  rA  r  rX  r  r  )r   r  Z
new_statusr|   r|   r}   r    s    


z"Worker.handle_worker_status_changezTaskState | None)r   r  r  r?  r   c              
   C  s  z| j rt|trJ | j| }|j}d|_td|||d |rd| j|dd|i|t	 f n| j|d|t	 f || j
v rz| j
|= W n  ty   tjddd Y n0 || jv r| j|= |jD ]}| j| |j q|j  || jv r| j|= |jd ur>|jd	kr>|j D ]\}}	| j|  |	7  < q |jD ]}
|j|
 |
j| qD|j  d |_d |_d |_d
|_| j| | j| |  d||||| W nZ t!y   Y nH t"y } z.t#| t$rdd l%}|&   W Y d }~n
d }~0 0 d S )Nr   zRelease key %s)r   r  r  zrelease-keyr  z$Tried to delete %s but no file foundTr<  rj   Fr  r   )'r   r   r~   r   r   r  r@  r   r  rD   r(  FileNotFoundErrorrX  r)  r   r   r  r   rC  r   r   r  r  r   r   r   r   r   r   r   r   r   r  r9   r  r   r  r  r  )r   r   r  r  r?  r   Zstate_beforerj  r  r  r  rE  r  r|   r|   r}   r    sb    








zWorker.release_keyc                 C  s   t | |||||dS N)rt   ru   rv   rk  rr  )r   r-   rt   ru   rk  rv   r|   r|   r}   rr     s    z
Worker.runc                 C  s   t | |||||dS r  r  )r   r-   rt   ru   rv   rk  r|   r|   r}   rs  #  s    zWorker.run_coroutinec                   s  t dd  t|tr"t|}|d u r2t|}|s:J || jv rV| j|dI d H  || j|< t	d|  t
|drz"|j| d}t|r|I d H }W nD ty } z,|s t|}|W  Y d }~W  d    S d }~0 0 ddiW  d    S 1  s0    Y  d S )	NFr  )r.  zStarting Worker plugin %ssetuprz  r  r   )rY   r   bytesrG   loadsr@   r>  r  r  r  rg  r  r   r  r<   )r   r!  r.  r[  r  rE  r  r|   r|   r}   r  &  s*    




(zWorker.plugin_addc                   s   t dd td|  z8| j|}t|drR|j| d}t|rR|I d H }W n> ty } z&t	|}|W  Y d }~W  d    S d }~0 0 ddiW  d    S 1 s0    Y  d S )NFr  zRemoving Worker plugin rf  rz  r  r   )
rY   r  r  r>  r   rg  rf  r   r  r<   )r   r.  r!  r  rE  r  r|   r|   r}   r  C  s    
(zWorker.plugin_remove)rv   c                   s   |pi }| dd}|}| j| }t||}t|d | }zpt|r\||i |I d H }	nB|r| j| jd t|||| j	|| j
| j	I d H }	n||i |}	dt|	dW S  ty }
 zdt|
dW  Y d }
~
S d }
~
0 0 d S )	Nseparate_threadT.rm  r   r  r  rX  r  r   )r   r)  getattrrX   rV   r*  run_in_executorr,  apply_function_actorr:  r   r   rH   r  )r   rm  rt   ru   rv   r  r   rO  r.  r  exr|   r|   r}   r}  R  s2    

zWorker.actor_executec              
   C  sX   z t | j| |}dt|dW S  tyR } zdt|dW  Y d }~S d }~0 0 d S )Nr   r  rX  r  )r  r)  rH   r  )r   rm  Z	attributer  r  r|   r|   r}   r~  u  s
    zWorker.actor_attribute)r   r   c                 C  s@   | j | }|jsdS |j D ]\}}| j| |k r dS qdS )NTF)r   r   r  r  )r   r   r   r  Zneededr|   r|   r}   meets_resource_constraints|  s    
z!Worker.meets_resource_constraintsc          
   
     s
  t |jts|jS zvt }t|jtkrHttg|jR  I d H \}}}nt|j \}}}t }|| dkr~|j	d||d |||fW S  t
y } zbtjddd | j	|jd|t f t|}	|	d | j|d	fi |	d
|i  W Y d }~n
d }~0 0 d S )Nr;  Zdeserializer  zCould not deserialize taskTr<  zdeserialize-errorr  rX  r  )r   r   rr   rD   r  r8   rZ   _deserializer   r  r  r  rX  r   r   r<   r   r  )
r   r   r  r0  rt   ru   rv   r#  rE  emsgr|   r|   r}   _maybe_deserialize_task  s6     

zWorker._maybe_deserialize_taskc              
   C  s^  | j tjtjfv rd S zdt  }| jr| j| jk r| jd }| j	|d }|d u s`|j
dkrl| j  q$| |r| j  | j|d|d q$qq$| jr| j| jk rt| j\}}| j	|}|d u rqq|j| jv r| j|d|d q|j
tv r| j|d|d qW nH tyX } z.t| trBdd l}|   W Y d }~n
d }~0 0 d S )Nzensure-computing-r   rh   rj   r  rW  )r  r:   r}  r^  rD   rh   r  r  r   r  r   popleftr  r  rg   r   r   r   r(  r  r  r  r   r  r  r  )r   r  r   r   r   rE  r  r|   r|   r}   rG    s:    





zWorker.ensure_computingc                  s  | j tjtjtjhv rd S || jvr(d S | j| }zz*|jdkr~td| d|_	| j
|d|d W W |   |   d S | jr|jrJ |jdksJ |jd usJ | j||dI d H \}}}| |||\}}|jd urd|jv r|jd }	nd}	|	| jv s
J ||jksJ | j|j z| j|	 }
t |_t|r^t|||| jI d H }n`d	tt|
v r| j|
t |||| j!|j| j"| j#| j
I d H }n| j|
t$|||| jI d H }W | j%|j n| j%|j 0 |j}| j&|}|sJ | '|d|_	|j|d
< |(dd }|j)*d|d |d d |d | j+|j< i }|d dkr|d |_,|d |_d|f||< | j-d ur| j-d |d |d   ntt.|(dt/rd||< nXt0dtt1|d d t2|ddt3|dd|d  d|d |d |d |d  f||< | j4||d td!|j| | jr`|jdksTJ |jr`J W np t5y } zV|s~J tj6d"|jdd# t7|}|(d$ | j
|dfi |d%|i W Y d }~n
d }~0 0 W |   |   n|   |   0 d S )&Nrl   zATrying to execute task %s which is not in executing state anymoreTr   r  rj   rE  rb  rM   r   r  Zcomputer0  r#  r  r  r  r  r   r   rW  ztask-durationactual-exceptionr\  zGCompute Failed
Function:  %s
args:      %s
kwargs:    %s
Exception: %r
r  max_lenr   rX  r   r   r   z*Send compute response to scheduler: %s, %sz&Exception during execution of task %s.r<  r  r  )8r  r:   r]  r  r^  r   r   r  r@  r   r  rG  rF  r   r   r   r  _prepare_args_for_executionr   r,  r   r   r   rD   r   rV   apply_function_asyncr/  r   r   r*  r  apply_functionr:  r   r   apply_function_simpler  r  r  r   r   r  r   r   r7  r   
Rescheduler  r(   convert_args_to_strconvert_kwargs_to_strr  r  rX  r<   )r   r   r  r   rt   ru   rv   args2kwargs2rE  rE  r  r  r  rc  r  r|   r|   r}   r    s    


x


	 







 
zWorker.executec              	   C  s   t  }i }|jD ]Z}|j}z| j| ||< W q tyh   ddlm} |t| j| | j	|| ||< Y q0 qt
||ttfd}	t
||ttfd}
t  }|| dkr|jd||d | jd ur| jd ||  |	|
fS )Nr   r   )Z	key_typesg{Gzt?z	disk-readr  zdisk-load-duration)rD   r   r   r(  r  rm  r   r   r)  rf  ra   r  r   r   r  r7  r   )r   r   ru   rv   r0  r(  r  r  r   r  r  r#  r|   r|   r}   r  T  s"    
&
z"Worker._prepare_args_for_executionc                   s`   j r
dS d _ d} jj}| j}| j } fdd}||  jrV| jkrVtd|d  t	 } j j
p| j }d}|| }	||kr6 jjstdt|t j q6 j }
|
d	krАq6||
7 }|d
7 }t	 | dkrtdI dH  t	 }| j}||	kr||kr j  | j}q|| |rVtd|t| d _ |S )zTrack this process's memory usage and act accordingly

        If we rise above 70% memory use, start dumping data to disk.

        If we rise above 80% memory use, stop execution of new tasks
        NTr   c                   s   |  j  } jrj| jkrj j   jtjkrtdt	|d t
|  j d urZt
 j nd tj _nT jtjkrtdt	|d t
|  j d urt
 j nd tj _      d S )Nz^Worker is at %d%% memory usage. Pausing worker.  Process memory: %s -- Worker memory limit: %sd   r   z^Worker is at %d%% memory usage. Resuming worker. Process memory: %s -- Worker memory limit: %s)r#  r&  r=  collectr  r:   r,  r  r  r   r'   r}  rG  rF  )rW  fracr   r|   r}   check_pause|  s2    


	

	z*Worker.memory_monitor.<locals>.check_pausez>Worker is at %.0f%% memory usage. Start spilling data to disk.r  zUnmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker.html#memtrim for more information. -- Unmanaged memory: %s -- Worker memory limit: %sr   r:  zMoved %d tasks worth %s to diskF)r<  r  procZmemory_infoZrssr#  r%  r  r@  rD   r$  r(  Zfastr  r'   Zevictr.  r/  r=  r  )r   r  r  rW  r	  r
  r0  rl  r  ZneedZweightr|   r   r}   r  l  sb    



	


zWorker.memory_monitorc                 C  sR   t  | j }| jt  }| _| j||f | j|t| j	f | j	
  d S r   )rD   r/  r   r/   r  r   r  r   r   r   rC  )r   r  profr|   r|   r}   r    s
    zWorker.cycle_profilec           
        s  | j s
dS t }| j | j  }W d   n1 s60    Y  t   fdd|D  i }| jrrdd |D }  D ]`\}}|durzt|| }|	|}t
j|d| jdd}t
|d| t
j|d| j| dd qzt }	| jdur| jd |	|  dS )	zz
        Get a frame from all actively computing threads

        Merge these frames into existing profile counts
        Nc                   s   i | ]}| | qS r|   r|   r   identframesr|   r}   r    r   z*Worker.trigger_profile.<locals>.<dictcomp>c                 S  s   i | ]}|t |qS r|   )r/   Zll_get_stackr  r|   r|   r}   r    r   Tzdistributed/worker.py)r#  zprofile-duration)r   rD   r   r  r  _current_framesr8  r  rX   r  r/   Zprocessr   Z	llprocessr   r7  r   )
r   r0  r   Zllframesr  framer   Zllframer   r#  r|   r  r}   r    s0    (

zWorker.trigger_profilec                   s6  t  | j }|r| jj n$d u r*| j nfdd| jD  |d u rLd}nt |f}|d u rhd }n"t |fd }|t	 krd }|dkr|d u rt
  n,|d u rt	 n|} fddt||D  tjtd  }	 st S |d u r2|d u s
||k r2d u r| j}
n
| j }
t|	|
}	|	S )Nc                   s$   g | ]\}} |v r||  fqS r|   r|   r   tr  r   r|   r}   r   
  r   z&Worker.get_profile.<locals>.<listcomp>r   r   c                   s   g | ]} | qS r|   r|   r   i)historyr|   r}   r     r   )rD   r/  r  r/   r   r   bisectZbisect_leftZbisect_rightr   r  ranger    r!   r  r   r   )r   r0  r#  r   rY  r  ZistartZistopZiistopr  r  r|   )r  r   r}   r    s6    



zWorker.get_profilec                   s   d u }t  | j }p| p"d  fdd| jD  fdd| jD d}|r|d || jd f |d |d	d
 | j D f |S )Nr   c                   s4   g | ],\}} |  k r k rn q||d  fqS r  r|   r  r0  r#  r|   r}   r   2  s   z/Worker.get_profile_metadata.<locals>.<listcomp>c                   s>   g | ]6\}} |  k r k rn q|d d |  D fqS )c                 S  s   i | ]\}}||d  qS r  r|   )r   r  r  r|   r|   r}   r  6  r   z:Worker.get_profile_metadata.<locals>.<listcomp>.<dictcomp>)r  )r   r  r  r  r|   r}   r   5  s   )countsr{  r  r  r{  c                 S  s   i | ]\}}||d  qS r  r|   r  r|   r|   r}   r  >  r   z/Worker.get_profile_metadata.<locals>.<dictcomp>)rD   r/  r   r   r  r   r   r  )r   r0  r#  Z
add_recentr  r  r|   r  r}   r  ,  s"    
zWorker.get_profile_metadatac                   s   | j 8 t  | j } fdd| D  W d    n1 sD0    Y  d urlfdd  D  dd   D }|S )Nc                   s   i | ]\}}| | qS r|   r|   )r   r  r  r  r|   r}   r  F  r   z)Worker.get_call_stack.<locals>.<dictcomp>c                   s   i | ]\}}| v r||qS r|   r|   r   r  r  r  r|   r}   r  H  r   c                 S  s   i | ]\}}|t |qS r|   )r/   ry  r  r|   r|   r}   r  J  r   )r   r  r  r   r  r  )r   r{  r   r  r|   )r  r{  r}   r  B  s    
4zWorker.get_call_stackc              	   O  sr   | j  D ]b\}}t||r
|dkr0tdt zt|||i | W q
 tyj   tj	d|dd Y q
0 q
d S )Nr  zThe `WorkerPlugin.release_key` hook is deprecated and will be removed in a future version. A similar event can now be caught by filtering for a `finish=='released'` event in the `WorkerPlugin.transition` hook.z!Plugin '%s' failed with exceptionTr<  )
r>  r  rg  r  r  r  r  r  r  r  )r   method_nameru   rv   r.  r!  r|   r|   r}   r  M  s    
zWorker._notify_pluginsc                 C  sX   |j | jv s|j | jv sJ t|jts,J |jr6J |j | jvsFJ |jdksTJ d S )NrW  )	r   r(  r)  r   r   r   r   rg   r   r   r|   r|   r}   validate_task_memoryd  s
    
zWorker.validate_task_memoryc                 C  sz   |j dksJ |jd usJ |j| jvs,J |jr6J |jD ]8}|j dksXJ | ||j| jv s<|j| jv s<J q<d S )Nrj   rW  )r   r   r   r(  r   r   r  r)  )r   r   r  r|   r|   r}   validate_task_executingk  s    

zWorker.validate_task_executingc                   sh   |j td jv sJ |j  jvs&J |jdks4J |jr>J |jrHJ t fdd|jD sdJ d S )Nr   rj   c                 3  s&   | ]}|j  jv p|j  jv V  qd S r   r  r  r   r|   r}   r   z  s   z-Worker.validate_task_ready.<locals>.<genexpr>)	r   r!   rg   r(  r   r   r   r  r   r   r|   r   r}   validate_task_readyt  s    

zWorker.validate_task_readyc                   sT   |j  jvsJ |jdksJ |jr(J |jrP|jrPt fdd|jD rPJ d S )Nrk   c                 3  s   | ]}|j  jv V  qd S r   )r   r(  r  r   r|   r}   r     r   z/Worker.validate_task_waiting.<locals>.<genexpr>)r   r(  r   r   r   r   r  r   r|   r   r}   validate_task_waiting~  s
    
zWorker.validate_task_waitingc                   sn   |j  jvsJ | jv sJ t fdd|jD r:J |jsDJ |j jv sTJ |j  j|j v sjJ d S )Nc                 3  s   | ]}|j  jv V  qd S r   )r   rg   r  r   r|   r}   r     r   z.Worker.validate_task_flight.<locals>.<genexpr>)r   r(  r   r   r   r   r   r   r|   r   r}   validate_task_flight  s    
zWorker.validate_task_flightc                 C  sx   |j | jvsJ | j|jvs J |jr*J || jv s8J |jsBJ |jD ]*}|j | j| v s`J || j| v sHJ qHd S r   )r   r(  rf  r   r   r   r   r   )r   r   r  r|   r|   r}   validate_task_fetch  s    


zWorker.validate_task_fetchc                   sV    j | jvsJ  jrJ  jr$J t fdd| j D rDJ  | jv sRJ d S )Nc                 3  s   | ]} j |v V  qd S r   r   r   r   r   r|   r}   r     r   z/Worker.validate_task_missing.<locals>.<genexpr>)r   r(  r   r   r   r   r'  r   r   r|   r'  r}   validate_task_missing  s
    

 zWorker.validate_task_missingc                 C  s(   |j | jvsJ |jsJ |js$J d S r   )r   r(  r   r   r   r|   r|   r}   validate_task_cancelled  s    
zWorker.validate_task_cancelledc                 C  s(   |j | jvsJ |jsJ |js$J d S r   )r   r(  r   r   r   r|   r|   r}   validate_task_resumed  s    
zWorker.validate_task_resumedc                   s    j | jvsJ  jrJ  jr$J  | jvs2J  | jvs@J  | jvsNJ  | jvs\J t fdd| j	 D r|J  j
rJ  jrJ  jrJ  jrJ d S )Nc                 3  s   | ]} j |v V  qd S r   r   r&  r'  r|   r}   r     r   z0Worker.validate_task_released.<locals>.<genexpr>)r   r(  r   r   r   r   r   r   r   r'  r   r   r   r   r   r|   r'  r}   validate_task_released  s    

 


zWorker.validate_task_releasedc              
   C  sh  z|j | jv r"| j|j  |ks"J |jdkr8| | n|jdkrN| | n|jdkrd| | n|jdkrz| | n|jdkr| | nl|jdkr| | nV|jdkr| 	| n@|jdkr| 
| n*|jd	kr| | n|jd
kr| | W nd tyb } zJt| tr0dd l}|  td|d| | d|W Y d }~n
d }~0 0 d S )NrW  rk   r[  rl   ri   rg   rj   r]  rU  r   r   z"Invalid TaskState encountered for z	.
Story:

)r   r   r   r   r#  r(  r)  r*  r"  r!  r$  r%  r+  r  r  r   r  r  r  AssertionErrorr  )r   r   rE  r  r|   r|   r}   r    sB    










zWorker.validate_taskc           
   
   C  s  | j tjvrd S zT| jdks"J d}| j D ]}|jd usBJ |jD ]}|j| j	| v sHJ qH|j
D ]$}|jd uszJ ||jv shJ |qh|jr|d7 }|jD ]Z}|j| jv sJ |jth dB v s|| jv s|j| jsJ ||| || |fqq0| j|ksJ | j	 D ],\}}|D ]}|| j| jv s&J q&q| j D ]}| | qRW nV ty } z<| j| j t| trdd l}	|	   W Y d }~n
d }~0 0 d S )Nr   r   >   rj   rU  r]  r[  )r  r:   rA  r  r   r'  r   r   r   r   r   r   r   r  r   issubsetr   r  r   r  r  r  r*  r1  r  r  r   r  r  r  )
r   r   r   rj  r  Zts_waitr{  r  rE  r  r|   r|   r}   validate_state  sN    




zWorker.validate_stater   c                 C  sZ   | j @ | jr"| jW  d    S |  W  d    S W d    n1 sL0    Y  d S r   )r   r  _get_clientr   r|   r|   r}   client  s    zWorker.client)rl  r   c              
   C  s  |du rt jd}t|d}zddlm} | }W n tyF   Y n`0 ddlm} |j	rj|j	j
| j	j
kst|jtr|j| j	j
kst|j|r|jj| j	j
kr|| _| js
ddlm} t| j}|| j	| j| jd	|d	d
|d| _tj| j |s
| jjdks
J | jS )zGet local client attached to this worker

        If no such client exists, create one

        See Also
        --------
        get_client
        N!distributed.comm.timeouts.connectsr   )default_clientr   )Clusterr   Trj  )r*  r!  Zset_as_defaultrn  Zdirect_to_workersr.  rl  r,  )r  r  r  r*   r1  r4  r  Zdistributed.deploy.clusterr5  r9  rf  r   Z
_start_argr   Zscheduler_addressr  r   rU   r*  r!  r   r   r   r  )r   rl  r4  r1  r5  r   rn  r|   r|   r}   r0    sN    



	



zWorker._get_clientc                 C  s   | j t  S )a  Get the key of the task we are currently running

        This only makes sense to run within a task

        Examples
        --------
        >>> from dask.distributed import get_worker
        >>> def f():
        ...     return get_worker().get_current_task()

        >>> future = client.submit(f)  # doctest: +SKIP
        >>> future.result()  # doctest: +SKIP
        'f-1234'

        See Also
        --------
        get_worker
        )r   r  	get_identr   r|   r|   r}   get_current_taskR  s    zWorker.get_current_task)NN)N)NNT)Fr   )Tre  TTF)N)T)NNNN)TN)NN)NNT)r|   TN)r|   NT)NNT)N)NNr|   N)NN)NNNF)r   N)N)N)ry   rz   r{   r   r$  WeakSetr   r   r   r3  r4  rp   rq   r   r   propertyr  r  r  r  r  rD  rE  rE   r  setterr  r	  r  r  r  r5  r+  r  r2  rx  rw  r{  rq  r  r0  rd  r  r  rv  rt  rw  rt  ru  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  rF  r  r  r  r  r  r  r  r  r  r  r  r  r  rr  rs  r  r  r}  r~  r  r  rG  r  r  r  r  r  r  r  r  r  r   r!  r"  r#  r$  r%  r(  r)  r*  r+  r  r/  r1  r0  r7  __classcell__r|   r|   r  r}   r   N  sp  
 ; j   \
	



$
,O5vn	
a %-,V	"


; 
	8 45 -    I   #
% n!)	
#4<r   r   c                   C  sZ   zt jd W S  tyT   ztdd tjD W  Y S  tyN   tdY n0 Y n0 dS )a  Get the worker currently running this task

    Examples
    --------
    >>> def f():
    ...     worker = get_worker()  # The worker on which this task is running
    ...     return worker.address

    >>> future = client.submit(f)  # doctest: +SKIP
    >>> future.result()  # doctest: +SKIP
    'tcp://127.0.0.1:47373'

    See Also
    --------
    get_client
    worker_client
    rj  c                 s  s   | ]}|j tjv r|V  qd S r   rh  ri  r|   r|   r}   r   ~  s   zget_worker.<locals>.<genexpr>zNo workers foundN)r^   r:  r   r   r   r   StopIterationr  r|   r|   r|   r}   
get_workerh  s    r=  Tr   c                 C  s   |du rt jd}t|d}| r0|r0t| } z
t }W n tyL   Y n0 | r^|jj	| krj|j
|dS ddlm} z| }W n ty   d}Y n0 |r| r|jj	| kr|S | r|| |dS tddS )a  Get a client while within a task.

    This client connects to the same scheduler to which the worker is connected

    Parameters
    ----------
    address : str, optional
        The address of the scheduler to connect to. Defaults to the scheduler
        the worker is connected to.
    timeout : int or str
        Timeout (in seconds) for getting the Client. Defaults to the
        ``distributed.comm.timeouts.connect`` configuration value.
    resolve_address : bool, default True
        Whether to resolve `address` to its canonical form.

    Returns
    -------
    Client

    Examples
    --------
    >>> def f():
    ...     client = get_client(timeout="10s")
    ...     futures = client.map(lambda x: x + 1, range(10))  # spawn many tasks
    ...     results = client.gather(futures)
    ...     return sum(results)

    >>> future = client.submit(f)  # doctest: +SKIP
    >>> future.result()  # doctest: +SKIP
    55

    See Also
    --------
    get_worker
    worker_client
    secede
    Nr2  r3  )rl  r   r   z.No global client found and no address provided)r  r  r  r*   r-   resolve_addressr=  r  r9  rf  r0  r1  r   r  )rf  rl  r>  rj  r   r1  r|   r|   r}   
get_client  s*    '



r?  c                  C  sJ   t  } t  t tj }| jj| j| jtj	 |dtj	 dt  d dS )a  
    Have this task secede from the worker's thread pool

    This opens up a new scheduling slot and a new thread for a new task. This
    enables the client to schedule tasks on this node, which is
    especially useful while waiting for other jobs to finish (e.g., with
    ``client.gather``).

    Examples
    --------
    >>> def mytask(x):
    ...     # do some work
    ...     client = get_client()
    ...     futures = client.map(...)  # do some remote work
    ...     secede()  # while that work happens, remove ourself from the pool
    ...     return client.gather(futures)  # return gathered results

    See Also
    --------
    get_client
    get_worker
    zsecede--r  N)
r=  
tpe_secederD   r^   r   r*  r1  r  r   r   )rj  r   r|   r|   r}   rN     s    
rN   c                   @  s   e Zd ZdZdS )r  a  Reschedule this task

    Raising this exception will stop the current execution of the task and ask
    the scheduler to reschedule this task, possibly on a different machine.

    This does not guarantee that the task will move onto a different machine.
    The scheduler will proceed through its normal heuristics to determine the
    optimal machine to accept this task.  The machine will likely change if the
    load across the cluster has significantly changed since first scheduling
    the task.
    N)ry   rz   r{   r   r|   r|   r|   r}   r    s   r  r  c                 C  s   | d u rd S | dkr,t tjtd||  } ttt8 t| } t| tr`| dkr`t | tj } W d    n1 st0    Y  t| t	rt
| } nt | } t| tjS )Nr@  r   )r   r0   ZMEMORY_LIMITminr   r  	TypeErrorr  r   r   r)   )r#  r  Ztotal_coresr|   r|   r}   r    s    ,

r  c                   sF   du rj  du rj  fdd}t|ddI dH S )a  Get keys from worker

    The worker has a two step handshake to acknowledge when data has been fully
    delivered.  This function implements that handshake.

    See Also
    --------
    Worker.get_data
    Worker.gather_dep
    utils_comm.gather_data_from_workers
    Nc               
     s    I d H } d| _zpt|  ddI d H }z|d }W n ty\   td|Y n0 |dkrv| dI d H  |W |  S |  0 d S )Nz#Ephemeral Worker->Worker for gatherrt  )r  r  r  r{  rT  r  r  zUnexpected responser   )r4   r.  r>   r  r  r&  Zreuse)r-   r3  r  r  r{  r  r  r  rT  rj  r|   r}   	_get_data'  s,    	z'get_data_from_worker.<locals>._get_datar  )Z	operation)r  r  rb   )r  r{  rj  rT  r  r  r  rE  r|   rD  r}   r    s    r  r  )r  c                 C  sL   t | dk rBzt|  }W n$ ty<   t| }|t| < Y n0 |S t| S )z'Load a function from bytes, cache bytesrT  )r   rP  r  rG   r  )Zbytes_objectr  r|   r|   r}   loads_functionH  s    
rF  c                 C  st   | durt | } |r(t|tr(t|}|r@t|tr@t|}|turb| sT|sT|rXJ t} |f}| |pjd|ppi fS )z<Deserialize task inputs and regularize to func, args, kwargsNr|   )rF  r   r  rG   r  r  execute_taskrs   r|   r|   r}   r  T  s    

r  c                 C  sL   t | r,| d | dd  }}|tt| S t| trDttt| S | S dS )zEvaluate a nested task

    >>> inc = lambda x: x + 1
    >>> execute_task((inc, 1))
    2
    >>> execute_task((sum, [1, 2, (inc, 3)]))
    7
    r   r   N)r$   r  rG  r   r  )rw   rO  ru   r|   r|   r}   rG  e  s    	
rG  r  c                 C  s   z0t  t|  }W d   n1 s$0    Y  W nv ty   tj| dd}t|dk rt  |t| < W d   n1 sz0    Y  Y n  ty   tj| dd}Y n0 |S )z)Dump a function to bytes, cache functionsNr  r  rT  )_cache_lockcache_dumpsr  rG   r  r   rC  )rO  r  r|   r|   r}   r  |  s    **r  c                 C  s   t | r| d tu rdttt| dd sdt| d t| d d}t| dkr`t| d |d< |S ttt| dd st| d t| dd dS t| S )	uK  Serialize a dask task

    Returns a dict of bytestrings that can each be loaded with ``loads``

    Examples
    --------
    Either returns a task as a function, args, kwargs dict

    >>> from operator import add
    >>> dumps_task((add, 1))  # doctest: +SKIP
    {'function': b' 	_operatoradd.'
     'args': b'   KK.'}

    Or as a single task blob if it can't easily decompose the result.  This
    happens either if the task is highly nested, or if it isn't a task at all

    >>> dumps_task(1)  # doctest: +SKIP
    {'task': b'       K.'}
    r   r   Nr   )rt   ru   r     rv   )	r$   r&   r   r  rQ   r  
warn_dumpsr   rH   )rw   r  r|   r|   r}   
dumps_task  s    "rL  Fg    .Ac                 C  sv   || dd}t d srt||krrdt d< t| }t|dkrX|dd d |d	d  }td
tt||f  |S )z6Dump an object to bytes, warn if those bytes are larger  r  r   TF   N2   z ... iaN  Large object of size %s detected in task graph: 
  %s
Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and 
keep data on workers

    future = client.submit(func, big_data)    # bad

    big_future = client.scatter(big_data)     # good
    future = client.submit(func, big_future)  # good)_warn_dumps_warnedr   r   r  r  r'   )objr  limitbr3  r|   r|   r}   rK    s    rK  c           
      C  s   t  }| |||< W d   n1 s*0    Y  t t_|t_|t_t| |||}	| ||= W d   n1 sv0    Y  |	S )Run a function, collect information

    Returns
    -------
    msg: dictionary with status, result/error, timings, etc..
    N)r  r6  rD   r^   r   r:  r   r  )
rt   ru   rv   r:  r   r   r   
time_delayr  r  r|   r|   r}   r     s    &$r   c           
   
   C  s   t  }t }z~z| |i |}W n< ty^ } z$t|}d|d< ||d< W Y d}~n.d}~0 0 dd|t||dur|t|ndd}W t }	nt }	0 || |d< |	| |d	< ||d
< |S )rS  r  r  r  Nr  r   r  r  r  r   r   r0  r#  r  r  r6  rD   r  r<   r  r   
rt   ru   rv   rT  r  r0  r  rE  r  rD  r|   r|   r}   r    s&    r  c           
   
     s   t  }t }zz| |i |I dH }W n< tyd } z$t|}d|d< ||d< W Y d}~n.d}~0 0 dd|t||durt|ndd}W t }	nt }	0 || |d< |	| |d	< ||d
< |S )rS  Nr  r  r  r  r   rU  r0  r#  r  rV  rW  r|   r|   r}   r    s&    r  c           	      C  s   t  }| |||< W d   n1 s*0    Y  |t_|t_dt_| |i |}| ||= W d   n1 st0    Y  |S )rS  NT)r  r6  r^   r:  r   rm  )	rt   ru   rv   r:  r   r   r   r  r  r|   r|   r}   r  )  s    	&$r  c                 C  sN   G dd d}|   } d| v r0|t| d | d< d| v rJ|t| d | d< | S )zMake a worker msg, which contains args and kwargs, safe to cast to str:
    allowing for some arguments to raise exceptions during conversion and
    ignoring them.
    c                   @  s   e Zd Zdd Zdd ZdS )zget_msg_safe_str.<locals>.Reprc                 S  s   || _ || _d S r   Z_fZ_val)r   r
  valr|   r|   r}   r   J  s    z'get_msg_safe_str.<locals>.Repr.__init__c                 S  s   |  | jS r   rX  r   r|   r|   r}   r   N  s    z'get_msg_safe_str.<locals>.Repr.__repr__N)ry   rz   r{   r   r   r|   r|   r|   r}   ReprI  s   rZ  ru   rv   )r  r  r  )r  rZ  r|   r|   r}   get_msg_safe_strC  s    r[  r   )r  r   c              	   C  s   d}dd t t| D }t| D ]|\}}zt|}W n tyL   d}Y n0 |||< |t|d 7 }|dur"||kr"dd|d|d	  d|   S q"d
d|S )zwConvert args to a string, allowing for some arguments to raise
    exceptions during conversion and ignoring them.
    r   c                 S  s   g | ]}d qS r   r|   r  r|   r|   r}   r   ^  r   z'convert_args_to_str.<locals>.<listcomp> < could not convert arg to str >r   Nz({}, r   z({}))r  r   	enumeraterb  r  ra  r  )ru   r  lengthstrsr  argsargr|   r|   r}   r  Y  s    
*r  r   )rv   r  r   c           	   	   C  s   d}dd t t| D }t|  D ]\}\}}zt|}W n tyT   d}Y n0 t|d | }|||< |t|d 7 }|dur&||kr&dd	|d|d
  d|   S q&dd	|S )zyConvert kwargs to a string, allowing for some arguments to raise
    exceptions during conversion and ignoring them.
    r   c                 S  s   g | ]}d qS r\  r|   r  r|   r|   r}   r   q  r   z)convert_kwargs_to_str.<locals>.<listcomp>r]  r   r   Nz{{{}r^  r   z{{{}}})r  r   r_  r  rb  r  ra  r  )	rv   r  r`  ra  r  argnamerb  rc  Zskwargr|   r|   r}   r  l  s    
*r  r|   c           
        sT  |pi }t |}|d u r$t|}n
td |s>|s>J d|rLt |}|rZt |}t|drl| |d< t|dr~| |d< tdt| zP|s||i |}n8|r||i |I d H }n| j	j
|g|R i | d }W nb ty@ } zHtjdtt|d d t|ddt|ddd	d
 t|}	W Y d }~nd }~0 0 dt|d}	|	S )Nz\The is_coro= parameter is deprecated. We now automatically detect coroutines/async functionszCombination not supportedZdask_workerZdask_schedulerzRun out-of-band function %rz2Run Failed
Function: %s
args:     %s
kwargs:   %s
r  r  Tr<  r   r  )rG   r  rV   r  r  rS   r  r  r(   r*  r1  r  r  r   r  r  r<   rH   )
rY  r-   rt   ru   rv   Zis_corork  r  rE  r3  r|   r|   r}   rr    sF    







rr  c                   s   t tjI d H }|S r   )rZ   r?   Z	real_time)rj  r  r|   r|   r}   
gpu_metric  s    re  rn  c                 C  s   t  S r   )r?   Zone_timerz  r|   r|   r}   gpu_startup  s    rf  c                  O  sd   z
t  }W n ty   Y n40 tdd | D dd | D d}|d| tj| i | dS )zvDask print function
    This prints both wherever this function is run, and also in the user's
    client session
    c                 s  s   | ]}t |V  qd S r   r+   )r   rb  r|   r|   r}   r     r   zprint.<locals>.<genexpr>c                 S  s   i | ]\}}|t |qS r|   rg  r  r|   r|   r}   r    r   zprint.<locals>.<dictcomp>ru   rv   printN)r=  r  r  r  r  builtinsri  )ru   rv   rj  r  r|   r|   r}   ri    s    
ri  c                  O  sD   z
t  }W n ty   Y n0 |d| |d tj| i | dS )zDask warn function
    This raises a warning both wherever this function is run, and also
    in the user's client session
    r  rh  N)r=  r  r  r  r  )ru   rv   rj  r|   r|   r}   r    s    
r  )NNT)NNNN)N)N)r|   NNT)Z
__future__r   r.  r  rj  r_  r   loggingr  r  r  r  r  r$  collectionsr   r   r   collections.abcr   r   r   r	   r
   r   r   concurrent.futuresr   
contextlibr   Zdatetimer   inspectr   rG   r   typingr   r   r   r   Zdiagnostics.pluginr   rm  r   r1  r   r   r   Ztlzr   r   r    r!   Ztornado.ioloopr"   r#   r  Z	dask.corer$   Zdask.systemr%   Z
dask.utilsr&   r'   r(   r)   r*   r+   r,   r   r-   r.   r/   r0   r1   Zbatchedr2   r3   r4   r5   Zcomm.addressingr6   r7   Z
comm.utilsr8   corer9   r:   r;   r<   r=   r>   Zdiagnosticsr?   r@   Z	diskutilsrA   rB   httprC   r6  rD   ZnoderE   Z	proctitlerF   rM  rH   ZpubsubrI   r!  rJ   ZshufflerK   r  rL   ZthreadpoolexecutorrM   rN   rA  rO   rP   rQ   rR   rS   rT   rU   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   Z
utils_commr`   ra   rb   Z
utils_perfrc   rd   re   r|  rf   Z	getLoggerry   r  r  r  r  r  r   r  rn   r   rp   rq   r   rr   r  rx   r~   r   r   r=  r?  r  r  r  Zjob_counterrP  rF  r  rG  rI  r  rH  r  rL  rO  r  rK  r   r  r  r  r[  r  r  rr  r   Z_global_workersr  r  re  rf  ri  r  r|   r|   r|   r}   <module>   s  $	$
 L
	
 0                              8D"    
4

$$
-