from collections import defaultdict from os.path import splitext from contextlib import contextmanager from pathlib import Path import networkx as nx from networkx.utils import create_random_state, create_py_random_state import inspect, itertools, collections import re, gzip, bz2 __all__ = [ "not_implemented_for", "open_file", "nodes_or_number", "preserve_random_state", "random_state", "np_random_state", "py_random_state", "argmap", ] def not_implemented_for(*graph_types): """Decorator to mark algorithms as not implemented Parameters ---------- graph_types : container of strings Entries must be one of "directed", "undirected", "multigraph", or "graph". Returns ------- _require : function The decorated function. Raises ------ NetworkXNotImplemented If any of the packages cannot be imported Notes ----- Multiple types are joined logically with "and". For "or" use multiple @not_implemented_for() lines. Examples -------- Decorate functions like this:: @not_implemented_for("directed") def sp_function(G): pass # rule out MultiDiGraph @not_implemented_for("directed","multigraph") def sp_np_function(G): pass # rule out all except DiGraph @not_implemented_for("undirected") @not_implemented_for("multigraph") def sp_np_function(G): pass """ if ("directed" in graph_types) and ("undirected" in graph_types): raise ValueError("Function not implemented on directed AND undirected graphs?") if ("multigraph" in graph_types) and ("graph" in graph_types): raise ValueError("Function not implemented on graph AND multigraphs?") if not set(graph_types) < {"directed", "undirected", "multigraph", "graph"}: raise KeyError( "use one or more of directed, undirected, multigraph, graph. " f"You used {graph_types}" ) # 3-way logic: True if "directed" input, False if "undirected" input, else None dval = ("directed" in graph_types) or not ("undirected" in graph_types) and None mval = ("multigraph" in graph_types) or not ("graph" in graph_types) and None errmsg = f"not implemented for {' '.join(graph_types)} type" def _not_implemented_for(g): if (mval is None or mval == g.is_multigraph()) and ( dval is None or dval == g.is_directed() ): raise nx.NetworkXNotImplemented(errmsg) return g return argmap(_not_implemented_for, 0) # To handle new extensions, define a function accepting a `path` and `mode`. # Then add the extension to _dispatch_dict. fopeners = { ".gz": gzip.open, ".gzip": gzip.open, ".bz2": bz2.BZ2File, } _dispatch_dict = defaultdict(lambda: open, **fopeners) # type: ignore def open_file(path_arg, mode="r"): """Decorator to ensure clean opening and closing of files. Parameters ---------- path_arg : string or int Name or index of the argument that is a path. mode : str String for opening mode. Returns ------- _open_file : function Function which cleanly executes the io. Examples -------- Decorate functions like this:: @open_file(0,"r") def read_function(pathname): pass @open_file(1,"w") def write_function(G, pathname): pass @open_file(1,"w") def write_function(G, pathname="graph.dot"): pass @open_file("pathname","w") def write_function(G, pathname="graph.dot"): pass @open_file("path", "w+") def another_function(arg, **kwargs): path = kwargs["path"] pass Notes ----- Note that this decorator solves the problem when a path argument is specified as a string, but it does not handle the situation when the function wants to accept a default of None (and then handle it). Here is an example of how to handle this case:: @open_file("path") def some_function(arg1, arg2, path=None): if path is None: fobj = tempfile.NamedTemporaryFile(delete=False) else: # `path` could have been a string or file object or something # similar. In any event, the decorator has given us a file object # and it will close it for us, if it should. fobj = path try: fobj.write("blah") finally: if path is None: fobj.close() Normally, we'd want to use "with" to ensure that fobj gets closed. However, the decorator will make `path` a file object for us, and using "with" would undesirably close that file object. Instead, we use a try block, as shown above. When we exit the function, fobj will be closed, if it should be, by the decorator. """ def _open_file(path): # Now we have the path_arg. There are two types of input to consider: # 1) string representing a path that should be opened # 2) an already opened file object if isinstance(path, str): ext = splitext(path)[1] elif isinstance(path, Path): # path is a pathlib reference to a filename ext = path.suffix path = str(path) else: # could be None, or a file handle, in which case the algorithm will deal with it return path, lambda: None fobj = _dispatch_dict[ext](path, mode=mode) return fobj, lambda: fobj.close() return argmap(_open_file, path_arg, try_finally=True) def nodes_or_number(which_args): """Decorator to allow number of nodes or container of nodes. With this decorator, the specified argument can be either a number or a container of nodes. If it is a number, the nodes used are `range(n)`. This allows `nx.complete_graph(50)` in place of `nx.complete_graph(list(range(50)))`. And it also allows `nx.complete_graph(any_list_of_nodes)`. Parameters ---------- which_args : string or int or sequence of strings or ints If string, the name of the argument to be treated. If int, the index of the argument to be treated. If more than one node argument is allowed, can be a list of locations. Returns ------- _nodes_or_numbers : function Function which replaces int args with ranges. Examples -------- Decorate functions like this:: @nodes_or_number("nodes") def empty_graph(nodes): # nodes is converted to a list of nodes @nodes_or_number(0) def empty_graph(nodes): # nodes is converted to a list of nodes @nodes_or_number(["m1", "m2"]) def grid_2d_graph(m1, m2, periodic=False): # m1 and m2 are each converted to a list of nodes @nodes_or_number([0, 1]) def grid_2d_graph(m1, m2, periodic=False): # m1 and m2 are each converted to a list of nodes @nodes_or_number(1) def full_rary_tree(r, n) # presumably r is a number. It is not handled by this decorator. # n is converted to a list of nodes """ def _nodes_or_number(n): try: nodes = list(range(n)) except TypeError: nodes = tuple(n) else: if n < 0: msg = "Negative number of nodes not valid: {n}" raise nx.NetworkXError(msg) return (n, nodes) try: iter_wa = iter(which_args) except TypeError: iter_wa = (which_args,) return argmap(_nodes_or_number, *iter_wa) def preserve_random_state(func): """Decorator to preserve the numpy.random state during a function. .. deprecated:: 2.6 This is deprecated and will be removed in NetworkX v3.0. Parameters ---------- func : function function around which to preserve the random state. Returns ------- wrapper : function Function which wraps the input function by saving the state before calling the function and restoring the function afterward. Examples -------- Decorate functions like this:: @preserve_random_state def do_random_stuff(x, y): return x + y * numpy.random.random() Notes ----- If numpy.random is not importable, the state is not saved or restored. """ import warnings msg = "preserve_random_state is deprecated and will be removed in 3.0." warnings.warn(msg, DeprecationWarning) try: import numpy as np @contextmanager def save_random_state(): state = np.random.get_state() try: yield finally: np.random.set_state(state) def wrapper(*args, **kwargs): with save_random_state(): np.random.seed(1234567890) return func(*args, **kwargs) wrapper.__name__ = func.__name__ return wrapper except ImportError: return func def np_random_state(random_state_argument): """Decorator to generate a `numpy.random.RandomState` instance. The decorator processes the argument indicated by `random_state_argument` using :func:`nx.utils.create_random_state`. The argument value can be a seed (integer), or a `numpy.random.RandomState` instance or (`None` or `numpy.random`). The latter options use the glocal random number generator used by `numpy.random`. The result is a `numpy.random.RandomState` instance. Parameters ---------- random_state_argument : string or int The name or index of the argument to be converted to a `numpy.random.RandomState` instance. Returns ------- _random_state : function Function whose random_state keyword argument is a RandomState instance. Examples -------- Decorate functions like this:: @np_random_state("seed") def random_float(seed=None): return seed.rand() @np_random_state(0) def random_float(rng=None): return rng.rand() @np_random_state(1) def random_array(dims, random_state=1): return random_state.rand(*dims) See Also -------- py_random_state """ return argmap(create_random_state, random_state_argument) def random_state(random_state_argument): """Decorator to generate a `numpy.random.RandomState` instance. .. deprecated:: 2.7 This function is a deprecated alias for `np_random_state` and will be removed in version 3.0. Use np_random_state instead. """ import warnings warnings.warn( ( "`random_state` is a deprecated alias for `np_random_state`\n" "and will be removed in version 3.0. Use `np_random_state` instead." ), DeprecationWarning, stacklevel=2, ) return np_random_state(random_state_argument) def py_random_state(random_state_argument): """Decorator to generate a random.Random instance (or equiv). The decorator processes the argument indicated by `random_state_argument` using :func:`nx.utils.create_py_random_state`. The argument value can be a seed (integer), or a random number generator:: If int, return a random.Random instance set with seed=int. If random.Random instance, return it. If None or the `random` package, return the global random number generator used by `random`. If np.random package, return the global numpy random number generator wrapped in a PythonRandomInterface class. If np.random.RandomState instance, return it wrapped in PythonRandomInterface If a PythonRandomInterface instance, return it Parameters ---------- random_state_argument : string or int The name of the argument or the index of the argument in args that is to be converted to the random.Random instance or numpy.random.RandomState instance that mimics basic methods of random.Random. Returns ------- _random_state : function Function whose random_state_argument is converted to a Random instance. Examples -------- Decorate functions like this:: @py_random_state("random_state") def random_float(random_state=None): return random_state.rand() @py_random_state(0) def random_float(rng=None): return rng.rand() @py_random_state(1) def random_array(dims, seed=12345): return seed.rand(*dims) See Also -------- np_random_state """ return argmap(create_py_random_state, random_state_argument) class argmap: """A decorator to apply a map to arguments before calling the function This class provides a decorator that maps (transforms) arguments of the function before the function is called. Thus for example, we have similar code in many functions to determine whether an argument is the number of nodes to be created, or a list of nodes to be handled. The decorator provides the code to accept either -- transforming the indicated argument into a list of nodes before the actual function is called. This decorator class allows us to process single or multiple arguments. The arguments to be processed can be specified by string, naming the argument, or by index, specifying the item in the args list. Parameters ---------- func : callable The function to apply to arguments *args : iterable of (int, str or tuple) A list of parameters, specified either as strings (their names), ints (numerical indices) or tuples, which may contain ints, strings, and (recursively) tuples. Each indicates which parameters the decorator should map. Tuples indicate that the map function takes (and returns) multiple parameters in the same order and nested structure as indicated here. try_finally : bool (default: False) When True, wrap the function call in a try-finally block with code for the finally block created by `func`. This is used when the map function constructs an object (like a file handle) that requires post-processing (like closing). Examples -------- Most of these examples use `@argmap(...)` to apply the decorator to the function defined on the next line. In the NetworkX codebase however, `argmap` is used within a function to construct a decorator. That is, the decorator defines a mapping function and then uses `argmap` to build and return a decorated function. A simple example is a decorator that specifies which currency to report money. The decorator (named `convert_to`) would be used like:: @convert_to("US_Dollars", "income") def show_me_the_money(name, income): print(f"{name} : {income}") And the code to create the decorator might be:: def convert_to(currency, which_arg): def _convert(amount): if amount.currency != currency: amount = amount.to_currency(currency) return amount return argmap(_convert, which_arg) Despite this common idiom for argmap, most of the following examples use the `@argmap(...)` idiom to save space. Here's an example use of argmap to sum the elements of two of the functions arguments. The decorated function:: @argmap(sum, "xlist", "zlist") def foo(xlist, y, zlist): return xlist - y + zlist is syntactic sugar for:: def foo(xlist, y, zlist): x = sum(xlist) z = sum(zlist) return x - y + z and is equivalent to (using argument indexes):: @argmap(sum, "xlist", 2) def foo(xlist, y, zlist): return xlist - y + zlist or:: @argmap(sum, "zlist", 0) def foo(xlist, y, zlist): return xlist - y + zlist Transforming functions can be applied to multiple arguments, such as:: def swap(x, y): return y, x # the 2-tuple tells argmap that the map `swap` has 2 inputs/outputs. @argmap(swap, ("a", "b")): def foo(a, b, c): return a / b * c is equivalent to:: def foo(a, b, c): a, b = swap(a, b) return a / b * c More generally, the applied arguments can be nested tuples of strings or ints. The syntax `@argmap(some_func, ("a", ("b", "c")))` would expect `some_func` to accept 2 inputs with the second expected to be a 2-tuple. It should then return 2 outputs with the second a 2-tuple. The returns values would replace input "a" "b" and "c" respectively. Similarly for `@argmap(some_func, (0, ("b", 2)))`. Also, note that an index larger than the number of named parameters is allowed for variadic functions. For example:: def double(a): return 2 * a @argmap(double, 3) def overflow(a, *args): return a, args print(overflow(1, 2, 3, 4, 5, 6)) # output is 1, (2, 3, 8, 5, 6) **Try Finally** Additionally, this `argmap` class can be used to create a decorator that initiates a try...finally block. The decorator must be written to return both the transformed argument and a closing function. This feature was included to enable the `open_file` decorator which might need to close the file or not depending on whether it had to open that file. This feature uses the keyword-only `try_finally` argument to `@argmap`. For example this map opens a file and then makes sure it is closed:: def open_file(fn): f = open(fn) return f, lambda: f.close() The decorator applies that to the function `foo`:: @argmap(open_file, "file", try_finally=True) def foo(file): print(file.read()) is syntactic sugar for:: def foo(file): file, close_file = open_file(file) try: print(file.read()) finally: close_file() and is equivalent to (using indexes):: @argmap(open_file, 0, try_finally=True) def foo(file): print(file.read()) Here's an example of the try_finally feature used to create a decorator:: def my_closing_decorator(which_arg): def _opener(path): if path is None: path = open(path) fclose = path.close else: # assume `path` handles the closing fclose = lambda: None return path, fclose return argmap(_opener, which_arg, try_finally=True) which can then be used as:: @my_closing_decorator("file") def fancy_reader(file=None): # this code doesn't need to worry about closing the file print(file.read()) Notes ----- An object of this class is callable and intended to be used when defining a decorator. Generally, a decorator takes a function as input and constructs a function as output. Specifically, an `argmap` object returns the input function decorated/wrapped so that specified arguments are mapped (transformed) to new values before the decorated function is called. As an overview, the argmap object returns a new function with all the dunder values of the original function (like `__doc__`, `__name__`, etc). Code for this decorated function is built based on the original function's signature. It starts by mapping the input arguments to potentially new values. Then it calls the decorated function with these new values in place of the indicated arguments that have been mapped. The return value of the original function is then returned. This new function is the function that is actually called by the user. Three additional features are provided. 1) The code is lazily compiled. That is, the new function is returned as an object without the code compiled, but with all information needed so it can be compiled upon it's first invocation. This saves time on import at the cost of additional time on the first call of the function. Subsequent calls are then just as fast as normal. 2) If the "try_finally" keyword-only argument is True, a try block follows each mapped argument, matched on the other side of the wrapped call, by a finally block closing that mapping. We expect func to return a 2-tuple: the mapped value and a function to be called in the finally clause. This feature was included so the `open_file` decorator could provide a file handle to the decorated function and close the file handle after the function call. It even keeps track of whether to close the file handle or not based on whether it had to open the file or the input was already open. So, the decorated function does not need to include any code to open or close files. 3) The maps applied can process multiple arguments. For example, you could swap two arguments using a mapping, or transform them to their sum and their difference. This was included to allow a decorator in the `quality.py` module that checks that an input `partition` is a valid partition of the nodes of the input graph `G`. In this example, the map has inputs `(G, partition)`. After checking for a valid partition, the map either raises an exception or leaves the inputs unchanged. Thus many functions that make this check can use the decorator rather than copy the checking code into each function. More complicated nested argument structures are described below. The remaining notes describe the code structure and methods for this class in broad terms to aid in understanding how to use it. Instantiating an `argmap` object simply stores the mapping function and the input identifiers of which arguments to map. The resulting decorator is ready to use this map to decorate any function. Calling that object (`argmap.__call__`, but usually done via `@my_decorator`) a lazily compiled thin wrapper of the decorated function is constructed, wrapped with the necessary function dunder attributes like `__doc__` and `__name__`. That thinly wrapped function is returned as the decorated function. When that decorated function is called, the thin wrapper of code calls `argmap._lazy_compile` which compiles the decorated function (using `argmap.compile`) and replaces the code of the thin wrapper with the newly compiled code. This saves the compilation step every import of networkx, at the cost of compiling upon the first call to the decorated function. When the decorated function is compiled, the code is recursively assembled using the `argmap.assemble` method. The recursive nature is needed in case of nested decorators. The result of the assembly is a number of useful objects. sig : the function signature of the original decorated function as constructed by :func:`argmap.signature`. This is constructed using `inspect.signature` but enhanced with attribute strings `sig_def` and `sig_call`, and other information specific to mapping arguments of this function. This information is used to construct a string of code defining the new decorated function. wrapped_name : a unique internally used name constructed by argmap for the decorated function. functions : a dict of the functions used inside the code of this decorated function, to be used as `globals` in `exec`. This dict is recursively updated to allow for nested decorating. mapblock : code (as a list of strings) to map the incoming argument values to their mapped values. finallys : code (as a list of strings) to provide the possibly nested set of finally clauses if needed. mutable_args : a bool indicating whether the `sig.args` tuple should be converted to a list so mutation can occur. After this recursive assembly process, the `argmap.compile` method constructs code (as strings) to convert the tuple `sig.args` to a list if needed. It joins the defining code with appropriate indents and compiles the result. Finally, this code is evaluated and the original wrapper's implementation is replaced with the compiled version (see `argmap._lazy_compile` for more details). Other `argmap` methods include `_name` and `_count` which allow internally generated names to be unique within a python session. The methods `_flatten` and `_indent` process the nested lists of strings into properly indented python code ready to be compiled. More complicated nested tuples of arguments also allowed though usually not used. For the simple 2 argument case, the argmap input ("a", "b") implies the mapping function will take 2 arguments and return a 2-tuple of mapped values. A more complicated example with argmap input `("a", ("b", "c"))` requires the mapping function take 2 inputs, with the second being a 2-tuple. It then must output the 3 mapped values in the same nested structure `(newa, (newb, newc))`. This level of generality is not often needed, but was convenient to implement when handling the multiple arguments. See Also -------- not_implemented_for open_file nodes_or_number random_state py_random_state networkx.community.quality.require_partition require_partition """ def __init__(self, func, *args, try_finally=False): self._func = func self._args = args self._finally = try_finally @staticmethod def _lazy_compile(func): """Compile the source of a wrapped function Assemble and compile the decorated function, and intrusively replace its code with the compiled version's. The thinly wrapped function becomes the decorated function. Parameters ---------- func : callable A function returned by argmap.__call__ which is in the process of being called for the first time. Returns ------- func : callable The same function, with a new __code__ object. Notes ----- It was observed in NetworkX issue #4732 [1] that the import time of NetworkX was significantly bloated by the use of decorators: over half of the import time was being spent decorating functions. This was somewhat improved by a change made to the `decorator` library, at the cost of a relatively heavy-weight call to `inspect.Signature.bind` for each call to the decorated function. The workaround we arrived at is to do minimal work at the time of decoration. When the decorated function is called for the first time, we compile a function with the same function signature as the wrapped function. The resulting decorated function is faster than one made by the `decorator` library, so that the overhead of the first call is 'paid off' after a small number of calls. References ---------- [1] https://github.com/networkx/networkx/issues/4732 """ real_func = func.__argmap__.compile(func.__wrapped__) func.__code__ = real_func.__code__ func.__globals__.update(real_func.__globals__) func.__dict__.update(real_func.__dict__) return func def __call__(self, f): """Construct a lazily decorated wrapper of f. The decorated function will be compiled when it is called for the first time, and it will replace its own __code__ object so subsequent calls are fast. Parameters ---------- f : callable A function to be decorated. Returns ------- func : callable The decorated function. See Also -------- argmap._lazy_compile """ if inspect.isgeneratorfunction(f): def func(*args, __wrapper=None, **kwargs): yield from argmap._lazy_compile(__wrapper)(*args, **kwargs) else: def func(*args, __wrapper=None, **kwargs): return argmap._lazy_compile(__wrapper)(*args, **kwargs) # standard function-wrapping stuff func.__name__ = f.__name__ func.__doc__ = f.__doc__ func.__defaults__ = f.__defaults__ func.__kwdefaults__.update(f.__kwdefaults__ or {}) func.__module__ = f.__module__ func.__qualname__ = f.__qualname__ func.__dict__.update(f.__dict__) func.__wrapped__ = f # now that we've wrapped f, we may have picked up some __dict__ or # __kwdefaults__ items that were set by a previous argmap. Thus, we set # these values after those update() calls. # If we attempt to access func from within itself, that happens through # a closure -- which trips an error when we replace func.__code__. The # standard workaround for functions which can't see themselves is to use # a Y-combinator, as we do here. func.__kwdefaults__["_argmap__wrapper"] = func # this self-reference is here because functools.wraps preserves # everything in __dict__, and we don't want to mistake a non-argmap # wrapper for an argmap wrapper func.__self__ = func # this is used to variously call self.assemble and self.compile func.__argmap__ = self return func __count = 0 @classmethod def _count(cls): """Maintain a globally-unique identifier for function names and "file" names Note that this counter is a class method reporting a class variable so the count is unique within a Python session. It could differ from session to session for a specific decorator depending on the order that the decorators are created. But that doesn't disrupt `argmap`. This is used in two places: to construct unique variable names in the `_name` method and to construct unique fictitious filenames in the `_compile` method. Returns ------- count : int An integer unique to this Python session (simply counts from zero) """ cls.__count += 1 return cls.__count _bad_chars = re.compile("[^a-zA-Z0-9_]") @classmethod def _name(cls, f): """Mangle the name of a function to be unique but somewhat human-readable The names are unique within a Python session and set using `_count`. Parameters ---------- f : str or object Returns ------- name : str The mangled version of `f.__name__` (if `f.__name__` exists) or `f` """ f = f.__name__ if hasattr(f, "__name__") else f fname = re.sub(cls._bad_chars, "_", f) return f"argmap_{fname}_{cls._count()}" def compile(self, f): """Compile the decorated function. Called once for a given decorated function -- collects the code from all argmap decorators in the stack, and compiles the decorated function. Much of the work done here uses the `assemble` method to allow recursive treatment of multiple argmap decorators on a single decorated function. That flattens the argmap decorators, collects the source code to construct a single decorated function, then compiles/executes/returns that function. The source code for the decorated function is stored as an attribute `_code` on the function object itself. Note that Python's `compile` function requires a filename, but this code is constructed without a file, so a fictitious filename is used to describe where the function comes from. The name is something like: "argmap compilation 4". Parameters ---------- f : callable The function to be decorated Returns ------- func : callable The decorated file """ sig, wrapped_name, functions, mapblock, finallys, mutable_args = self.assemble( f ) call = f"{sig.call_sig.format(wrapped_name)}#" mut_args = f"{sig.args} = list({sig.args})" if mutable_args else "" body = argmap._indent(sig.def_sig, mut_args, mapblock, call, finallys) code = "\n".join(body) locl = {} globl = dict(functions.values()) filename = f"{self.__class__} compilation {self._count()}" compiled = compile(code, filename, "exec") exec(compiled, globl, locl) func = locl[sig.name] func._code = code return func def assemble(self, f): """Collects components of the source for the decorated function wrapping f. If `f` has multiple argmap decorators, we recursively assemble the stack of decorators into a single flattened function. This method is part of the `compile` method's process yet separated from that method to allow recursive processing. The outputs are strings, dictionaries and lists that collect needed info to flatten any nested argmap-decoration. Parameters ---------- f : callable The function to be decorated. If f is argmapped, we assemble it. Returns ------- sig : argmap.Signature The function signature as an `argmap.Signature` object. wrapped_name : str The mangled name used to represent the wrapped function in the code being assembled. functions : dict A dictionary mapping id(g) -> (mangled_name(g), g) for functions g referred to in the code being assembled. These need to be present in the ``globals`` scope of ``exec`` when defining the decorated function. mapblock : list of lists and/or strings Code that implements mapping of parameters including any try blocks if needed. This code will precede the decorated function call. finallys : list of lists and/or strings Code that implements the finally blocks to post-process the arguments (usually close any files if needed) after the decorated function is called. mutable_args : bool True if the decorator needs to modify positional arguments via their indices. The compile method then turns the argument tuple into a list so that the arguments can be modified. """ # first, we check if f is already argmapped -- if that's the case, # build up the function recursively. # > mapblock is generally a list of function calls of the sort # arg = func(arg) # in addition to some try-blocks if needed. # > finallys is a recursive list of finally blocks of the sort # finally: # close_func_1() # finally: # close_func_2() # > functions is a dict of functions used in the scope of our decorated # function. It will be used to construct globals used in compilation. # We make functions[id(f)] = name_of_f, f to ensure that a given # function is stored and named exactly once even if called by # nested decorators. if hasattr(f, "__argmap__") and f.__self__ is f: ( sig, wrapped_name, functions, mapblock, finallys, mutable_args, ) = f.__argmap__.assemble(f.__wrapped__) functions = dict(functions) # shallow-copy just in case else: sig = self.signature(f) wrapped_name = self._name(f) mapblock, finallys = [], [] functions = {id(f): (wrapped_name, f)} mutable_args = False if id(self._func) in functions: fname, _ = functions[id(self._func)] else: fname, _ = functions[id(self._func)] = self._name(self._func), self._func # this is a bit complicated -- we can call functions with a variety of # nested arguments, so long as their input and output are tuples with # the same nested structure. e.g. ("a", "b") maps arguments a and b. # A more complicated nesting like (0, (3, 4)) maps arguments 0, 3, 4 # expecting the mapping to output new values in the same nested shape. # The ability to argmap multiple arguments was necessary for # the decorator `nx.algorithms.community.quality.require_partition`, and # while we're not taking full advantage of the ability to handle # multiply-nested tuples, it was convenient to implement this in # generality because the recursive call to `get_name` is necessary in # any case. applied = set() def get_name(arg, first=True): nonlocal mutable_args if isinstance(arg, tuple): name = ", ".join(get_name(x, False) for x in arg) return name if first else f"({name})" if arg in applied: raise nx.NetworkXError(f"argument {arg} is specified multiple times") applied.add(arg) if arg in sig.names: return sig.names[arg] elif isinstance(arg, str): if sig.kwargs is None: raise nx.NetworkXError( f"name {arg} is not a named parameter and this function doesn't have kwargs" ) return f"{sig.kwargs}[{arg!r}]" else: if sig.args is None: raise nx.NetworkXError( f"index {arg} not a parameter index and this function doesn't have args" ) mutable_args = True return f"{sig.args}[{arg - sig.n_positional}]" if self._finally: # here's where we handle try_finally decorators. Such a decorator # returns a mapped argument and a function to be called in a # finally block. This feature was required by the open_file # decorator. The below generates the code # # name, final = func(name) #<--append to mapblock # try: #<--append to mapblock # ... more argmapping and try blocks # return WRAPPED_FUNCTION(...) # ... more finally blocks # finally: #<--prepend to finallys # final() #<--prepend to finallys # for a in self._args: name = get_name(a) final = self._name(name) mapblock.append(f"{name}, {final} = {fname}({name})") mapblock.append("try:") finallys = ["finally:", f"{final}()#", "#", finallys] else: mapblock.extend( f"{name} = {fname}({name})" for name in map(get_name, self._args) ) return sig, wrapped_name, functions, mapblock, finallys, mutable_args @classmethod def signature(cls, f): r"""Construct a Signature object describing `f` Compute a Signature so that we can write a function wrapping f with the same signature and call-type. Parameters ---------- f : callable A function to be decorated Returns ------- sig : argmap.Signature The Signature of f Notes ----- The Signature is a namedtuple with names: name : a unique version of the name of the decorated function signature : the inspect.signature of the decorated function def_sig : a string used as code to define the new function call_sig : a string used as code to call the decorated function names : a dict keyed by argument name and index to the argument's name n_positional : the number of positional arguments in the signature args : the name of the VAR_POSITIONAL argument if any, i.e. \*theseargs kwargs : the name of the VAR_KEYWORDS argument if any, i.e. \*\*kwargs These named attributes of the signature are used in `assemble` and `compile` to construct a string of source code for the decorated function. """ sig = inspect.signature(f, follow_wrapped=False) def_sig = [] call_sig = [] names = {} kind = None args = None kwargs = None npos = 0 for i, param in enumerate(sig.parameters.values()): # parameters can be position-only, keyword-or-position, keyword-only # in any combination, but only in the order as above. we do edge # detection to add the appropriate punctuation prev = kind kind = param.kind if prev == param.POSITIONAL_ONLY != kind: # the last token was position-only, but this one isn't def_sig.append("/") if prev != param.KEYWORD_ONLY == kind != param.VAR_POSITIONAL: # param is the first keyword-only arg and isn't starred def_sig.append("*") # star arguments as appropriate if kind == param.VAR_POSITIONAL: name = "*" + param.name args = param.name count = 0 elif kind == param.VAR_KEYWORD: name = "**" + param.name kwargs = param.name count = 0 else: names[i] = names[param.name] = param.name name = param.name count = 1 # assign to keyword-only args in the function call if kind == param.KEYWORD_ONLY: call_sig.append(f"{name} = {name}") else: npos += count call_sig.append(name) def_sig.append(name) fname = cls._name(f) def_sig = f'def {fname}({", ".join(def_sig)}):' if inspect.isgeneratorfunction(f): _return = "yield from" else: _return = "return" call_sig = f"{_return} {{}}({', '.join(call_sig)})" return cls.Signature(fname, sig, def_sig, call_sig, names, npos, args, kwargs) Signature = collections.namedtuple( "Signature", [ "name", "signature", "def_sig", "call_sig", "names", "n_positional", "args", "kwargs", ], ) @staticmethod def _flatten(nestlist, visited): """flattens a recursive list of lists that doesn't have cyclic references Parameters ---------- nestlist : iterable A recursive list of objects to be flattened into a single iterable visited : set A set of object ids which have been walked -- initialize with an empty set Yields ------ Non-list objects contained in nestlist """ for thing in nestlist: if isinstance(thing, list): if id(thing) in visited: raise ValueError("A cycle was found in nestlist. Be a tree.") else: visited.add(id(thing)) yield from argmap._flatten(thing, visited) else: yield thing _tabs = " " * 64 @staticmethod def _indent(*lines): """Indent list of code lines to make executable Python code Indents a tree-recursive list of strings, following the rule that one space is added to the tab after a line that ends in a colon, and one is removed after a line that ends in an hashmark. Parameters ---------- *lines : lists and/or strings A recursive list of strings to be assembled into properly indented code. Returns ------- code : str Examples -------- argmap._indent(*["try:", "try:", "pass#", "finally:", "pass#", "#", "finally:", "pass#"]) renders to '''try: try: pass# finally: pass# # finally: pass#''' """ depth = 0 for line in argmap._flatten(lines, set()): yield f"{argmap._tabs[:depth]}{line}" depth += (line[-1:] == ":") - (line[-1:] == "#")