# -*- coding: utf-8 -*- # Advice: use repr(our_file.read()) to print the full output of tqdm # (else '\r' will replace the previous lines and you'll see only the latest. from __future__ import print_function import csv import os import re import sys from contextlib import contextmanager from functools import wraps from warnings import catch_warnings, simplefilter from pytest import importorskip, mark, raises, skip from tqdm import TqdmDeprecationWarning, TqdmWarning, tqdm, trange from tqdm.contrib import DummyTqdmFile from tqdm.std import EMA, Bar try: from StringIO import StringIO except ImportError: from io import StringIO from io import IOBase # to support unicode strings from io import BytesIO class DeprecationError(Exception): pass # Ensure we can use `with closing(...) as ... :` syntax if getattr(StringIO, '__exit__', False) and getattr(StringIO, '__enter__', False): def closing(arg): return arg else: from contextlib import closing try: _range = xrange except NameError: _range = range try: _unicode = unicode except NameError: _unicode = str nt_and_no_colorama = False if os.name == 'nt': try: import colorama # NOQA except ImportError: nt_and_no_colorama = True # Regex definitions # List of control characters CTRLCHR = [r'\r', r'\n', r'\x1b\[A'] # Need to escape [ for regex # Regular expressions compilation RE_rate = re.compile(r'[^\d](\d[.\d]+)it/s') RE_ctrlchr = re.compile("(%s)" % '|'.join(CTRLCHR)) # Match control chars RE_ctrlchr_excl = re.compile('|'.join(CTRLCHR)) # Match and exclude ctrl chars RE_pos = re.compile(r'([\r\n]+((pos\d+) bar:\s+\d+%|\s{3,6})?[^\r\n]*)') def pos_line_diff(res_list, expected_list, raise_nonempty=True): """ Return differences between two bar output lists. To be used with `RE_pos` """ res = [(r, e) for r, e in zip(res_list, expected_list) for pos in [len(e) - len(e.lstrip('\n'))] # bar position if r != e # simple comparison if not r.startswith(e) # start matches or not ( # move up at end (maybe less due to closing bars) any(r.endswith(end + i * '\x1b[A') for i in range(pos + 1) for end in [ ']', # bar ' ']) # cleared or '100%' in r # completed bar or r == '\n') # final bar or r[(-1 - pos) * len('\x1b[A'):] == '\x1b[A'] # too many moves up if raise_nonempty and (res or len(res_list) != len(expected_list)): if len(res_list) < len(expected_list): res.extend([(None, e) for e in expected_list[len(res_list):]]) elif len(res_list) > len(expected_list): res.extend([(r, None) for r in res_list[len(expected_list):]]) raise AssertionError( "Got => Expected\n" + '\n'.join('%r => %r' % i for i in res)) return res class DiscreteTimer(object): """Virtual discrete time manager, to precisely control time for tests""" def __init__(self): self.t = 0.0 def sleep(self, t): """Sleep = increment the time counter (almost no CPU used)""" self.t += t def time(self): """Get the current time""" return self.t def cpu_timify(t, timer=None): """Force tqdm to use the specified timer instead of system-wide time()""" if timer is None: timer = DiscreteTimer() t._time = timer.time t._sleep = timer.sleep t.start_t = t.last_print_t = t._time() return timer class UnicodeIO(IOBase): """Unicode version of StringIO""" def __init__(self, *args, **kwargs): super(UnicodeIO, self).__init__(*args, **kwargs) self.encoding = 'U8' # io.StringIO supports unicode, but no encoding self.text = '' self.cursor = 0 def __len__(self): return len(self.text) def seek(self, offset): self.cursor = offset def tell(self): return self.cursor def write(self, s): self.text = self.text[:self.cursor] + s + self.text[self.cursor + len(s):] self.cursor += len(s) def read(self, n=-1): _cur = self.cursor self.cursor = len(self) if n < 0 else min(_cur + n, len(self)) return self.text[_cur:self.cursor] def getvalue(self): return self.text def get_bar(all_bars, i=None): """Get a specific update from a whole bar traceback""" # Split according to any used control characters bars_split = RE_ctrlchr_excl.split(all_bars) bars_split = list(filter(None, bars_split)) # filter out empty splits return bars_split if i is None else bars_split[i] def progressbar_rate(bar_str): return float(RE_rate.search(bar_str).group(1)) def squash_ctrlchars(s): """Apply control characters in a string just like a terminal display""" curline = 0 lines = [''] # state of fake terminal for nextctrl in filter(None, RE_ctrlchr.split(s)): # apply control chars if nextctrl == '\r': # go to line beginning (simplified here: just empty the string) lines[curline] = '' elif nextctrl == '\n': if curline >= len(lines) - 1: # wrap-around creates newline lines.append('') # move cursor down curline += 1 elif nextctrl == '\x1b[A': # move cursor up if curline > 0: curline -= 1 else: raise ValueError("Cannot go further up") else: # print message on current line lines[curline] += nextctrl return lines def test_format_interval(): """Test time interval format""" format_interval = tqdm.format_interval assert format_interval(60) == '01:00' assert format_interval(6160) == '1:42:40' assert format_interval(238113) == '66:08:33' def test_format_num(): """Test number format""" format_num = tqdm.format_num assert float(format_num(1337)) == 1337 assert format_num(int(1e6)) == '1e+6' assert format_num(1239876) == '1' '239' '876' def test_format_meter(): """Test statistics and progress bar formatting""" try: unich = unichr except NameError: unich = chr format_meter = tqdm.format_meter assert format_meter(0, 1000, 13) == " 0%| | 0/1000 [00:13= (3,): assert format_meter(0, 1000, 13, ncols=68, prefix='fullwidth: ') == ( "fullwidth: 0%| | 0/1000 [00:13= (bigstep - 1) and ((i - (bigstep - 1)) % smallstep) == 0: timer.sleep(1e-2) if i >= 3 * bigstep: break assert "15%" in our_file.getvalue() # Test different behavior with and without mininterval timer = DiscreteTimer() total = 1000 mininterval = 0.1 maxinterval = 10 with closing(StringIO()) as our_file: with tqdm(total=total, file=our_file, miniters=None, smoothing=1, mininterval=mininterval, maxinterval=maxinterval) as tm1: with tqdm(total=total, file=our_file, miniters=None, smoothing=1, mininterval=0, maxinterval=maxinterval) as tm2: cpu_timify(tm1, timer) cpu_timify(tm2, timer) # Fast iterations, check if dynamic_miniters triggers timer.sleep(mininterval) # to force update for t1 tm1.update(total / 2) tm2.update(total / 2) assert int(tm1.miniters) == tm2.miniters == total / 2 # Slow iterations, check different miniters if mininterval timer.sleep(maxinterval * 2) tm1.update(total / 2) tm2.update(total / 2) res = [tm1.miniters, tm2.miniters] assert res == [(total / 2) * mininterval / (maxinterval * 2), (total / 2) * maxinterval / (maxinterval * 2)] # Same with iterable based tqdm timer1 = DiscreteTimer() # need 2 timers for each bar because zip not work timer2 = DiscreteTimer() total = 100 mininterval = 0.1 maxinterval = 10 with closing(StringIO()) as our_file: t1 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, mininterval=mininterval, maxinterval=maxinterval) t2 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, mininterval=0, maxinterval=maxinterval) cpu_timify(t1, timer1) cpu_timify(t2, timer2) for i in t1: if i == ((total / 2) - 2): timer1.sleep(mininterval) if i == (total - 1): timer1.sleep(maxinterval * 2) for i in t2: if i == ((total / 2) - 2): timer2.sleep(mininterval) if i == (total - 1): timer2.sleep(maxinterval * 2) assert t1.miniters == 0.255 assert t2.miniters == 0.5 t1.close() t2.close() def test_delay(): """Test delay""" timer = DiscreteTimer() with closing(StringIO()) as our_file: t = tqdm(total=2, file=our_file, leave=True, delay=3) cpu_timify(t, timer) timer.sleep(2) t.update(1) assert not our_file.getvalue() timer.sleep(2) t.update(1) assert our_file.getvalue() t.close() def test_min_iters(): """Test miniters""" with closing(StringIO()) as our_file: for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=2): pass out = our_file.getvalue() assert '| 0/3 ' in out assert '| 1/3 ' not in out assert '| 2/3 ' in out assert '| 3/3 ' in out with closing(StringIO()) as our_file: for _ in tqdm(_range(3), file=our_file, leave=True, mininterval=0, miniters=1): pass out = our_file.getvalue() assert '| 0/3 ' in out assert '| 1/3 ' in out assert '| 2/3 ' in out assert '| 3/3 ' in out def test_dynamic_min_iters(): """Test purely dynamic miniters (and manual updates and __del__)""" with closing(StringIO()) as our_file: total = 10 t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, smoothing=1) t.update() # Increase 3 iterations t.update(3) # The next two iterations should be skipped because of dynamic_miniters t.update() t.update() # The third iteration should be displayed t.update() out = our_file.getvalue() assert t.dynamic_miniters t.__del__() # simulate immediate del gc assert ' 0%| | 0/10 [00:00<' in out assert '40%' in out assert '50%' not in out assert '60%' not in out assert '70%' in out # Check with smoothing=0, miniters should be set to max update seen so far with closing(StringIO()) as our_file: total = 10 t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, smoothing=0) t.update() t.update(2) t.update(5) # this should be stored as miniters t.update(1) out = our_file.getvalue() assert all(i in out for i in ("0/10", "1/10", "3/10")) assert "2/10" not in out assert t.dynamic_miniters and not t.smoothing assert t.miniters == 5 t.close() # Check iterable based tqdm with closing(StringIO()) as our_file: t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, smoothing=0.5) for _ in t: pass assert t.dynamic_miniters # No smoothing with closing(StringIO()) as our_file: t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, smoothing=0) for _ in t: pass assert t.dynamic_miniters # No dynamic_miniters (miniters is fixed manually) with closing(StringIO()) as our_file: t = tqdm(_range(10), file=our_file, miniters=1, mininterval=None) for _ in t: pass assert not t.dynamic_miniters def test_big_min_interval(): """Test large mininterval""" with closing(StringIO()) as our_file: for _ in tqdm(_range(2), file=our_file, mininterval=1E10): pass assert '50%' not in our_file.getvalue() with closing(StringIO()) as our_file: with tqdm(_range(2), file=our_file, mininterval=1E10) as t: t.update() t.update() assert '50%' not in our_file.getvalue() def test_smoothed_dynamic_min_iters(): """Test smoothed dynamic miniters""" timer = DiscreteTimer() with closing(StringIO()) as our_file: with tqdm(total=100, file=our_file, miniters=None, mininterval=1, smoothing=0.5, maxinterval=0) as t: cpu_timify(t, timer) # Increase 10 iterations at once timer.sleep(1) t.update(10) # The next iterations should be partially skipped for _ in _range(2): timer.sleep(1) t.update(4) for _ in _range(20): timer.sleep(1) t.update() assert t.dynamic_miniters out = our_file.getvalue() assert ' 0%| | 0/100 [00:00<' in out assert '20%' in out assert '23%' not in out assert '25%' in out assert '26%' not in out assert '28%' in out def test_smoothed_dynamic_min_iters_with_min_interval(): """Test smoothed dynamic miniters with mininterval""" timer = DiscreteTimer() # In this test, `miniters` should gradually decline total = 100 with closing(StringIO()) as our_file: # Test manual updating tqdm with tqdm(total=total, file=our_file, miniters=None, mininterval=1e-3, smoothing=1, maxinterval=0) as t: cpu_timify(t, timer) t.update(10) timer.sleep(1e-2) for _ in _range(4): t.update() timer.sleep(1e-2) out = our_file.getvalue() assert t.dynamic_miniters with closing(StringIO()) as our_file: # Test iteration-based tqdm with tqdm(_range(total), file=our_file, miniters=None, mininterval=0.01, smoothing=1, maxinterval=0) as t2: cpu_timify(t2, timer) for i in t2: if i >= 10: timer.sleep(0.1) if i >= 14: break out2 = our_file.getvalue() assert t.dynamic_miniters assert ' 0%| | 0/100 [00:00<' in out assert '11%' in out and '11%' in out2 # assert '12%' not in out and '12%' in out2 assert '13%' in out and '13%' in out2 assert '14%' in out and '14%' in out2 @mark.slow def test_rlock_creation(): """Test that importing tqdm does not create multiprocessing objects.""" mp = importorskip('multiprocessing') if not hasattr(mp, 'get_context'): skip("missing multiprocessing.get_context") # Use 'spawn' instead of 'fork' so that the process does not inherit any # globals that have been constructed by running other tests ctx = mp.get_context('spawn') with ctx.Pool(1) as pool: # The pool will propagate the error if the target method fails pool.apply(_rlock_creation_target) def _rlock_creation_target(): """Check that the RLock has not been constructed.""" import multiprocessing as mp patch = importorskip('unittest.mock').patch # Patch the RLock class/method but use the original implementation with patch('multiprocessing.RLock', wraps=mp.RLock) as rlock_mock: # Importing the module should not create a lock from tqdm import tqdm assert rlock_mock.call_count == 0 # Creating a progress bar should initialize the lock with closing(StringIO()) as our_file: with tqdm(file=our_file) as _: # NOQA pass assert rlock_mock.call_count == 1 # Creating a progress bar again should reuse the lock with closing(StringIO()) as our_file: with tqdm(file=our_file) as _: # NOQA pass assert rlock_mock.call_count == 1 def test_disable(): """Test disable""" with closing(StringIO()) as our_file: for _ in tqdm(_range(3), file=our_file, disable=True): pass assert our_file.getvalue() == '' with closing(StringIO()) as our_file: progressbar = tqdm(total=3, file=our_file, miniters=1, disable=True) progressbar.update(3) progressbar.close() assert our_file.getvalue() == '' def test_infinite_total(): """Test treatment of infinite total""" with closing(StringIO()) as our_file: for _ in tqdm(_range(3), file=our_file, total=float("inf")): pass def test_nototal(): """Test unknown total length""" with closing(StringIO()) as our_file: for _ in tqdm(iter(range(10)), file=our_file, unit_scale=10): pass assert "100it" in our_file.getvalue() with closing(StringIO()) as our_file: for _ in tqdm(iter(range(10)), file=our_file, bar_format="{l_bar}{bar}{r_bar}"): pass assert "10/?" in our_file.getvalue() def test_unit(): """Test SI unit prefix""" with closing(StringIO()) as our_file: for _ in tqdm(_range(3), file=our_file, miniters=1, unit="bytes"): pass assert 'bytes/s' in our_file.getvalue() def test_ascii(): """Test ascii/unicode bar""" # Test ascii autodetection with closing(StringIO()) as our_file: with tqdm(total=10, file=our_file, ascii=None) as t: assert t.ascii # TODO: this may fail in the future # Test ascii bar with closing(StringIO()) as our_file: for _ in tqdm(_range(3), total=15, file=our_file, miniters=1, mininterval=0, ascii=True): pass res = our_file.getvalue().strip("\r").split("\r") assert '7%|6' in res[1] assert '13%|#3' in res[2] assert '20%|##' in res[3] # Test unicode bar with closing(UnicodeIO()) as our_file: with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t: for _ in _range(3): t.update() res = our_file.getvalue().strip("\r").split("\r") assert u"7%|\u258b" in res[1] assert u"13%|\u2588\u258e" in res[2] assert u"20%|\u2588\u2588" in res[3] # Test custom bar for bars in [" .oO0", " #"]: with closing(StringIO()) as our_file: for _ in tqdm(_range(len(bars) - 1), file=our_file, miniters=1, mininterval=0, ascii=bars, ncols=27): pass res = our_file.getvalue().strip("\r").split("\r") for b, line in zip(bars, res): assert '|' + b + '|' in line def test_update(): """Test manual creation and updates""" res = None with closing(StringIO()) as our_file: with tqdm(total=2, file=our_file, miniters=1, mininterval=0) as progressbar: assert len(progressbar) == 2 progressbar.update(2) assert '| 2/2' in our_file.getvalue() progressbar.desc = 'dynamically notify of 4 increments in total' progressbar.total = 4 progressbar.update(-1) progressbar.update(2) res = our_file.getvalue() assert '| 3/4 ' in res assert 'dynamically notify of 4 increments in total' in res def test_close(): """Test manual creation and closure and n_instances""" # With `leave` option with closing(StringIO()) as our_file: progressbar = tqdm(total=3, file=our_file, miniters=10) progressbar.update(3) assert '| 3/3 ' not in our_file.getvalue() # Should be blank assert len(tqdm._instances) == 1 progressbar.close() assert len(tqdm._instances) == 0 assert '| 3/3 ' in our_file.getvalue() # Without `leave` option with closing(StringIO()) as our_file: progressbar = tqdm(total=3, file=our_file, miniters=10, leave=False) progressbar.update(3) progressbar.close() assert '| 3/3 ' not in our_file.getvalue() # Should be blank # With all updates with closing(StringIO()) as our_file: assert len(tqdm._instances) == 0 with tqdm(total=3, file=our_file, miniters=0, mininterval=0, leave=True) as progressbar: assert len(tqdm._instances) == 1 progressbar.update(3) res = our_file.getvalue() assert '| 3/3 ' in res # Should be blank assert '\n' not in res # close() called assert len(tqdm._instances) == 0 exres = res.rsplit(', ', 1)[0] res = our_file.getvalue() assert res[-1] == '\n' if not res.startswith(exres): raise AssertionError("\n<<< Expected:\n{0}\n>>> Got:\n{1}\n===".format( exres + ', ...it/s]\n', our_file.getvalue())) # Closing after the output stream has closed with closing(StringIO()) as our_file: t = tqdm(total=2, file=our_file) t.update() t.update() t.close() def test_ema(): """Test exponential weighted average""" ema = EMA(0.01) assert round(ema(10), 2) == 10 assert round(ema(1), 2) == 5.48 assert round(ema(), 2) == 5.48 assert round(ema(1), 2) == 3.97 assert round(ema(1), 2) == 3.22 def test_smoothing(): """Test exponential weighted average smoothing""" timer = DiscreteTimer() # -- Test disabling smoothing with closing(StringIO()) as our_file: with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t: cpu_timify(t, timer) for _ in t: pass assert '| 3/3 ' in our_file.getvalue() # -- Test smoothing # 1st case: no smoothing (only use average) with closing(StringIO()) as our_file2: with closing(StringIO()) as our_file: t = tqdm(_range(3), file=our_file2, smoothing=None, leave=True, miniters=1, mininterval=0) cpu_timify(t, timer) with tqdm(_range(3), file=our_file, smoothing=None, leave=True, miniters=1, mininterval=0) as t2: cpu_timify(t2, timer) for i in t2: # Sleep more for first iteration and # see how quickly rate is updated if i == 0: timer.sleep(0.01) else: # Need to sleep in all iterations # to calculate smoothed rate # (else delta_t is 0!) timer.sleep(0.001) t.update() n_old = len(tqdm._instances) t.close() assert len(tqdm._instances) == n_old - 1 # Get result for iter-based bar a = progressbar_rate(get_bar(our_file.getvalue(), 3)) # Get result for manually updated bar a2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) # 2nd case: use max smoothing (= instant rate) with closing(StringIO()) as our_file2: with closing(StringIO()) as our_file: t = tqdm(_range(3), file=our_file2, smoothing=1, leave=True, miniters=1, mininterval=0) cpu_timify(t, timer) with tqdm(_range(3), file=our_file, smoothing=1, leave=True, miniters=1, mininterval=0) as t2: cpu_timify(t2, timer) for i in t2: if i == 0: timer.sleep(0.01) else: timer.sleep(0.001) t.update() t.close() # Get result for iter-based bar b = progressbar_rate(get_bar(our_file.getvalue(), 3)) # Get result for manually updated bar b2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) # 3rd case: use medium smoothing with closing(StringIO()) as our_file2: with closing(StringIO()) as our_file: t = tqdm(_range(3), file=our_file2, smoothing=0.5, leave=True, miniters=1, mininterval=0) cpu_timify(t, timer) t2 = tqdm(_range(3), file=our_file, smoothing=0.5, leave=True, miniters=1, mininterval=0) cpu_timify(t2, timer) for i in t2: if i == 0: timer.sleep(0.01) else: timer.sleep(0.001) t.update() t2.close() t.close() # Get result for iter-based bar c = progressbar_rate(get_bar(our_file.getvalue(), 3)) # Get result for manually updated bar c2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) # Check that medium smoothing's rate is between no and max smoothing rates assert a <= c <= b assert a2 <= c2 <= b2 @mark.skipif(nt_and_no_colorama, reason="Windows without colorama") def test_deprecated_nested(): """Test nested progress bars""" # TODO: test degradation on windows without colorama? # Artificially test nested loop printing # Without leave our_file = StringIO() try: tqdm(total=2, file=our_file, nested=True) except TqdmDeprecationWarning: if """`nested` is deprecated and automated. Use `position` instead for manual control.""" not in our_file.getvalue(): raise else: raise DeprecationError("Should not allow nested kwarg") def test_bar_format(): """Test custom bar formatting""" with closing(StringIO()) as our_file: bar_format = ('{l_bar}{bar}|{n_fmt}/{total_fmt}-{n}/{total}' '{percentage}{rate}{rate_fmt}{elapsed}{remaining}') for _ in trange(2, file=our_file, leave=True, bar_format=bar_format): pass out = our_file.getvalue() assert "\r 0%| |0/2-0/20.0None?it/s00:00?\r" in out # Test unicode string auto conversion with closing(StringIO()) as our_file: bar_format = r'hello world' with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t: assert isinstance(t.bar_format, _unicode) def test_custom_format(): """Test adding additional derived format arguments""" class TqdmExtraFormat(tqdm): """Provides a `total_time` format parameter""" @property def format_dict(self): d = super(TqdmExtraFormat, self).format_dict total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1) d.update(total_time=self.format_interval(total_time) + " in total") return d with closing(StringIO()) as our_file: for _ in TqdmExtraFormat( range(10), file=our_file, bar_format="{total_time}: {percentage:.0f}%|{bar}{r_bar}"): pass assert "00:00 in total" in our_file.getvalue() def test_eta(capsys): """Test eta bar_format""" from datetime import datetime as dt for _ in trange(999, miniters=1, mininterval=0, leave=True, bar_format='{l_bar}{eta:%Y-%m-%d}'): pass _, err = capsys.readouterr() assert "\r100%|{eta:%Y-%m-%d}\n".format(eta=dt.now()) in err def test_unpause(): """Test unpause""" timer = DiscreteTimer() with closing(StringIO()) as our_file: t = trange(10, file=our_file, leave=True, mininterval=0) cpu_timify(t, timer) timer.sleep(0.01) t.update() timer.sleep(0.01) t.update() timer.sleep(0.1) # longer wait time t.unpause() timer.sleep(0.01) t.update() timer.sleep(0.01) t.update() t.close() r_before = progressbar_rate(get_bar(our_file.getvalue(), 2)) r_after = progressbar_rate(get_bar(our_file.getvalue(), 3)) assert r_before == r_after def test_disabled_unpause(capsys): """Test disabled unpause""" with tqdm(total=10, disable=True) as t: t.update() t.unpause() t.update() print(t) out, err = capsys.readouterr() assert not err assert out == ' 0%| | 0/10 [00:00= t0 assert t0 <= t2 t3 = tqdm(total=10, file=our_file) t4 = tqdm(total=10, file=our_file) t5 = tqdm(total=10, file=our_file) t5.close() t6 = tqdm(total=10, file=our_file) assert t3 != t4 assert t3 > t2 assert t5 == t6 t6.close() t4.close() t3.close() t2.close() t1.close() t0.close() def test_repr(): """Test representation""" with closing(StringIO()) as our_file: with tqdm(total=10, ascii=True, file=our_file) as t: assert str(t) == ' 0%| | 0/10 [00:00 out3.count('\r') assert out4.count(", ".join(expected_order)) == 2 # Test setting postfix string directly with closing(StringIO()) as our_file: with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', postfix=None) as t5: t5.set_postfix_str("Hello", False) t5.set_postfix_str("World") out5 = our_file.getvalue() assert "Hello" not in out5 out5 = out5[1:-1].split(', ')[3:] assert out5 == ["World"] def test_postfix_direct(): """Test directly assigning non-str objects to postfix""" with closing(StringIO()) as our_file: with tqdm(total=10, file=our_file, miniters=1, mininterval=0, bar_format="{postfix[0][name]} {postfix[1]:>5.2f}", postfix=[{'name': "foo"}, 42]) as t: for i in range(10): if i % 2: t.postfix[0]["name"] = "abcdefghij"[i] else: t.postfix[1] = i t.update() res = our_file.getvalue() assert "f 6.00" in res assert "h 6.00" in res assert "h 8.00" in res assert "j 8.00" in res @contextmanager def std_out_err_redirect_tqdm(tqdm_file=sys.stderr): orig_out_err = sys.stdout, sys.stderr try: sys.stdout = sys.stderr = DummyTqdmFile(tqdm_file) yield orig_out_err[0] # Relay exceptions except Exception as exc: raise exc # Always restore sys.stdout/err if necessary finally: sys.stdout, sys.stderr = orig_out_err def test_file_redirection(): """Test redirection of output""" with closing(StringIO()) as our_file: # Redirect stdout to tqdm.write() with std_out_err_redirect_tqdm(tqdm_file=our_file): with tqdm(total=3) as pbar: print("Such fun") pbar.update(1) print("Such", "fun") pbar.update(1) print("Such ", end="") print("fun") pbar.update(1) res = our_file.getvalue() assert res.count("Such fun\n") == 3 assert "0/3" in res assert "3/3" in res def test_external_write(): """Test external write mode""" with closing(StringIO()) as our_file: # Redirect stdout to tqdm.write() for _ in trange(3, file=our_file): del tqdm._lock # classmethod should be able to recreate lock with tqdm.external_write_mode(file=our_file): our_file.write("Such fun\n") res = our_file.getvalue() assert res.count("Such fun\n") == 3 assert "0/3" in res assert "3/3" in res def test_unit_scale(): """Test numeric `unit_scale`""" with closing(StringIO()) as our_file: for _ in tqdm(_range(9), unit_scale=9, file=our_file, miniters=1, mininterval=0): pass out = our_file.getvalue() assert '81/81' in out def patch_lock(thread=True): """decorator replacing tqdm's lock with vanilla threading/multiprocessing""" try: if thread: from threading import RLock else: from multiprocessing import RLock lock = RLock() except (ImportError, OSError) as err: skip(str(err)) def outer(func): """actual decorator""" @wraps(func) def inner(*args, **kwargs): """set & reset lock even if exceptions occur""" default_lock = tqdm.get_lock() try: tqdm.set_lock(lock) return func(*args, **kwargs) finally: tqdm.set_lock(default_lock) return inner return outer @patch_lock(thread=False) def test_threading(): """Test multiprocess/thread-realted features""" pass # TODO: test interleaved output #445 def test_bool(): """Test boolean cast""" def internal(our_file, disable): kwargs = {'file': our_file, 'disable': disable} with trange(10, **kwargs) as t: assert t with trange(0, **kwargs) as t: assert not t with tqdm(total=10, **kwargs) as t: assert bool(t) with tqdm(total=0, **kwargs) as t: assert not bool(t) with tqdm([], **kwargs) as t: assert not t with tqdm([0], **kwargs) as t: assert t with tqdm(iter([]), **kwargs) as t: assert t with tqdm(iter([1, 2, 3]), **kwargs) as t: assert t with tqdm(**kwargs) as t: try: print(bool(t)) except TypeError: pass else: raise TypeError("Expected bool(tqdm()) to fail") # test with and without disable with closing(StringIO()) as our_file: internal(our_file, False) internal(our_file, True) def backendCheck(module): """Test tqdm-like module fallback""" tn = module.tqdm tr = module.trange with closing(StringIO()) as our_file: with tn(total=10, file=our_file) as t: assert len(t) == 10 with tr(1337) as t: assert len(t) == 1337 def test_auto(): """Test auto fallback""" from tqdm import auto, autonotebook backendCheck(autonotebook) backendCheck(auto) def test_wrapattr(): """Test wrapping file-like objects""" data = "a twenty-char string" with closing(StringIO()) as our_file: with closing(StringIO()) as writer: with tqdm.wrapattr(writer, "write", file=our_file, bytes=True) as wrap: wrap.write(data) res = writer.getvalue() assert data == res res = our_file.getvalue() assert '%.1fB [' % len(data) in res with closing(StringIO()) as our_file: with closing(StringIO()) as writer: with tqdm.wrapattr(writer, "write", file=our_file, bytes=False) as wrap: wrap.write(data) res = our_file.getvalue() assert '%dit [' % len(data) in res def test_float_progress(): """Test float totals""" with closing(StringIO()) as our_file: with trange(10, total=9.6, file=our_file) as t: with catch_warnings(record=True) as w: simplefilter("always", category=TqdmWarning) for i in t: if i < 9: assert not w assert w assert "clamping frac" in str(w[-1].message) def test_screen_shape(): """Test screen shape""" # ncols with closing(StringIO()) as our_file: with trange(10, file=our_file, ncols=50) as t: list(t) res = our_file.getvalue() assert all(len(i) == 50 for i in get_bar(res)) # no second/third bar, leave=False with closing(StringIO()) as our_file: kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, 'miniters': 0, 'mininterval': 0, 'leave': False} with trange(10, desc="one", **kwargs) as t1: with trange(10, desc="two", **kwargs) as t2: with trange(10, desc="three", **kwargs) as t3: list(t3) list(t2) list(t1) res = our_file.getvalue() assert "one" in res assert "two" not in res assert "three" not in res assert "\n\n" not in res assert "more hidden" in res # double-check ncols assert all(len(i) == 50 for i in get_bar(res) if i.strip() and "more hidden" not in i) # all bars, leave=True with closing(StringIO()) as our_file: kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, 'miniters': 0, 'mininterval': 0} with trange(10, desc="one", **kwargs) as t1: with trange(10, desc="two", **kwargs) as t2: assert "two" not in our_file.getvalue() with trange(10, desc="three", **kwargs) as t3: assert "three" not in our_file.getvalue() list(t3) list(t2) list(t1) res = our_file.getvalue() assert "one" in res assert "two" in res assert "three" in res assert "\n\n" not in res assert "more hidden" in res # double-check ncols assert all(len(i) == 50 for i in get_bar(res) if i.strip() and "more hidden" not in i) # second bar becomes first, leave=False with closing(StringIO()) as our_file: kwargs = {'file': our_file, 'ncols': 50, 'nrows': 2, 'miniters': 0, 'mininterval': 0, 'leave': False} t1 = tqdm(total=10, desc="one", **kwargs) with tqdm(total=10, desc="two", **kwargs) as t2: t1.update() t2.update() t1.close() res = our_file.getvalue() assert "one" in res assert "two" not in res assert "more hidden" in res t2.update() res = our_file.getvalue() assert "two" in res def test_initial(): """Test `initial`""" with closing(StringIO()) as our_file: for _ in tqdm(_range(9), initial=10, total=19, file=our_file, miniters=1, mininterval=0): pass out = our_file.getvalue() assert '10/19' in out assert '19/19' in out def test_colour(): """Test `colour`""" with closing(StringIO()) as our_file: for _ in tqdm(_range(9), file=our_file, colour="#beefed"): pass out = our_file.getvalue() assert '\x1b[38;2;%d;%d;%dm' % (0xbe, 0xef, 0xed) in out with catch_warnings(record=True) as w: simplefilter("always", category=TqdmWarning) with tqdm(total=1, file=our_file, colour="charm") as t: assert w t.update() assert "Unknown colour" in str(w[-1].message) with closing(StringIO()) as our_file2: for _ in tqdm(_range(9), file=our_file2, colour="blue"): pass out = our_file2.getvalue() assert '\x1b[34m' in out def test_closed(): """Test writing to closed file""" with closing(StringIO()) as our_file: for i in trange(9, file=our_file, miniters=1, mininterval=0): if i == 5: our_file.close() def test_reversed(capsys): """Test reversed()""" for _ in reversed(tqdm(_range(9))): pass out, err = capsys.readouterr() assert not out assert ' 0%' in err assert '100%' in err def test_contains(capsys): """Test __contains__ doesn't iterate""" with tqdm(list(range(9))) as t: assert 9 not in t assert all(i in t for i in _range(9)) out, err = capsys.readouterr() assert not out assert ' 0%' in err assert '100%' not in err