import html import logging import sys import weakref from contextlib import suppress from timeit import default_timer from tlz import valmap from tornado.ioloop import IOLoop import dask from ..client import default_client, futures_of from ..core import CommClosedError, clean_exception, coerce_to_address, connect from ..protocol.pickle import dumps from ..utils import LoopRunner, is_kernel, key_split from .progress import MultiProgress, Progress, format_time logger = logging.getLogger(__name__) def get_scheduler(scheduler): if scheduler is None: return default_client().scheduler.address return coerce_to_address(scheduler) class ProgressBar: def __init__(self, keys, scheduler=None, interval="100ms", complete=True): self.scheduler = get_scheduler(scheduler) self.client = None for key in keys: if hasattr(key, "client"): self.client = weakref.ref(key.client) break self.keys = {k.key if hasattr(k, "key") else k for k in keys} self.interval = dask.utils.parse_timedelta(interval, default="s") self.complete = complete self._start_time = default_timer() @property def elapsed(self): return default_timer() - self._start_time async def listen(self): complete = self.complete keys = self.keys async def setup(scheduler): p = Progress(keys, scheduler, complete=complete) await p.setup() return p def function(scheduler, p): result = { "all": len(p.all_keys), "remaining": len(p.keys), "status": p.status, } if p.status == "error": result.update(p.extra) return result self.comm = await connect( self.scheduler, **(self.client().connection_args if self.client else {}) ) logger.debug("Progressbar Connected to scheduler") await self.comm.write( { "op": "feed", "setup": dumps(setup), "function": dumps(function), "interval": self.interval, }, serializers=self.client()._serializers if self.client else None, ) while True: try: response = await self.comm.read( deserializers=self.client()._deserializers if self.client else None ) except CommClosedError: break self._last_response = response self.status = response["status"] self._draw_bar(**response) if response["status"] in ("error", "finished"): await self.comm.close() self._draw_stop(**response) break logger.debug("Progressbar disconnected from scheduler") def _draw_stop(self, **kwargs): pass def __del__(self): with suppress(AttributeError): self.comm.abort() class TextProgressBar(ProgressBar): def __init__( self, keys, scheduler=None, interval="100ms", width=40, loop=None, complete=True, start=True, **kwargs, ): super().__init__(keys, scheduler, interval, complete) self.width = width self.loop = loop or IOLoop() if start: loop_runner = LoopRunner(self.loop) loop_runner.run_sync(self.listen) def _draw_bar(self, remaining, all, **kwargs): frac = (1 - remaining / all) if all else 1.0 bar = "#" * int(self.width * frac) percent = int(100 * frac) elapsed = format_time(self.elapsed) msg = "\r[{0:<{1}}] | {2}% Completed | {3}".format( bar, self.width, percent, elapsed ) with suppress(ValueError): sys.stdout.write(msg) sys.stdout.flush() def _draw_stop(self, **kwargs): sys.stdout.write("\r") sys.stdout.flush() class ProgressWidget(ProgressBar): """ProgressBar that uses an IPython ProgressBar widget for the notebook See Also -------- progress: User function TextProgressBar: Text version suitable for the console """ def __init__( self, keys, scheduler=None, interval="100ms", complete=False, loop=None, **kwargs, ): super().__init__(keys, scheduler, interval, complete) from ipywidgets import HTML, FloatProgress, HBox, VBox self.elapsed_time = HTML("") self.bar = FloatProgress(min=0, max=1, description="") self.bar_text = HTML("") self.bar_widget = HBox([self.bar_text, self.bar]) self.widget = VBox([self.elapsed_time, self.bar_widget]) def _ipython_display_(self, **kwargs): IOLoop.current().add_callback(self.listen) return self.widget._ipython_display_(**kwargs) def _draw_stop(self, remaining, status, exception=None, **kwargs): if status == "error": _, exception, _ = clean_exception(exception) self.bar.bar_style = "danger" self.elapsed_time.value = ( '
Exception ' "" + repr(exception) + ":" + format_time(self.elapsed) + " " + "
" ) elif not remaining: self.bar.bar_style = "success" self.elapsed_time.value = ( '
Finished: ' + format_time(self.elapsed) + "
" ) def _draw_bar(self, remaining, all, **kwargs): ndone = all - remaining self.elapsed_time.value = ( '
Computing: ' + format_time(self.elapsed) + "
" ) self.bar.value = ndone / all if all else 1.0 self.bar_text.value = ( '
%d / %d
' % (ndone, all) ) class MultiProgressBar: def __init__( self, keys, scheduler=None, func=key_split, interval="100ms", complete=False, **kwargs, ): self.scheduler = get_scheduler(scheduler) self.client = None for key in keys: if hasattr(key, "client"): self.client = weakref.ref(key.client) break self.keys = {k.key if hasattr(k, "key") else k for k in keys} self.func = func self.interval = interval self.complete = complete self._start_time = default_timer() @property def elapsed(self): return default_timer() - self._start_time async def listen(self): complete = self.complete keys = self.keys func = self.func async def setup(scheduler): p = MultiProgress(keys, scheduler, complete=complete, func=func) await p.setup() return p def function(scheduler, p): result = { "all": valmap(len, p.all_keys), "remaining": valmap(len, p.keys), "status": p.status, } if p.status == "error": result.update(p.extra) return result self.comm = await connect( self.scheduler, **(self.client().connection_args if self.client else {}) ) logger.debug("Progressbar Connected to scheduler") await self.comm.write( { "op": "feed", "setup": dumps(setup), "function": dumps(function), "interval": self.interval, } ) while True: response = await self.comm.read( deserializers=self.client()._deserializers if self.client else None ) self._last_response = response self.status = response["status"] self._draw_bar(**response) if response["status"] in ("error", "finished"): await self.comm.close() self._draw_stop(**response) break logger.debug("Progressbar disconnected from scheduler") def _draw_stop(self, **kwargs): pass def __del__(self): with suppress(AttributeError): self.comm.abort() class MultiProgressWidget(MultiProgressBar): """Multiple progress bar Widget suitable for the notebook Displays multiple progress bars for a computation, split on computation type. See Also -------- progress: User-level function <--- use this MultiProgress: Non-visualization component that contains most logic ProgressWidget: Single progress bar widget """ def __init__( self, keys, scheduler=None, minimum=0, interval=0.1, func=key_split, complete=False, **kwargs, ): super().__init__(keys, scheduler, func, interval, complete) from ipywidgets import VBox self.widget = VBox([]) def make_widget(self, all): from ipywidgets import HTML, FloatProgress, HBox, VBox self.elapsed_time = HTML("") self.bars = {key: FloatProgress(min=0, max=1, description="") for key in all} self.bar_texts = {key: HTML("") for key in all} self.bar_labels = { key: HTML( '
' + html.escape(key.decode() if isinstance(key, bytes) else key) + "
" ) for key in all } def keyfunc(kv): """Order keys by most numerous, then by string name""" return kv[::-1] key_order = [k for k, v in sorted(all.items(), key=keyfunc, reverse=True)] self.bar_widgets = VBox( [ HBox([self.bar_texts[key], self.bars[key], self.bar_labels[key]]) for key in key_order ] ) self.widget.children = (self.elapsed_time, self.bar_widgets) def _ipython_display_(self, **kwargs): IOLoop.current().add_callback(self.listen) return self.widget._ipython_display_(**kwargs) def _draw_stop(self, remaining, status, exception=None, key=None, **kwargs): for k, v in remaining.items(): if not v: self.bars[k].bar_style = "success" else: self.bars[k].bar_style = "danger" if status == "error": _, exception, _ = clean_exception(exception) # self.bars[self.func(key)].bar_style = 'danger' # TODO self.elapsed_time.value = ( '
Exception ' + "" + repr(exception) + ":" + format_time(self.elapsed) + " " + "
" ) else: self.elapsed_time.value = ( '
Finished: ' + format_time(self.elapsed) + "
" ) def _draw_bar(self, remaining, all, status, **kwargs): if self.keys and not self.widget.children: self.make_widget(all) for k, ntasks in all.items(): ndone = ntasks - remaining[k] self.elapsed_time.value = ( '
Computing: ' + format_time(self.elapsed) + "
" ) self.bars[k].value = ndone / ntasks if ntasks else 1.0 self.bar_texts[k].value = ( '
%d / %d
' % (ndone, ntasks) ) def progress(*futures, notebook=None, multi=True, complete=True, **kwargs): """Track progress of futures This operates differently in the notebook and the console * Notebook: This returns immediately, leaving an IPython widget on screen * Console: This blocks until the computation completes Parameters ---------- futures : Futures A list of futures or keys to track notebook : bool (optional) Running in the notebook or not (defaults to guess) multi : bool (optional) Track different functions independently (defaults to True) complete : bool (optional) Track all keys (True) or only keys that have not yet run (False) (defaults to True) Notes ----- In the notebook, the output of `progress` must be the last statement in the cell. Typically, this means calling `progress` at the end of a cell. Examples -------- >>> progress(futures) # doctest: +SKIP [########################################] | 100% Completed | 1.7s """ futures = futures_of(futures) if not isinstance(futures, (set, list)): futures = [futures] if notebook is None: notebook = is_kernel() # often but not always correct assumption if notebook: if multi: bar = MultiProgressWidget(futures, complete=complete, **kwargs) else: bar = ProgressWidget(futures, complete=complete, **kwargs) return bar else: TextProgressBar(futures, complete=complete, **kwargs)