# -*- coding: utf-8 -*- # pylint: disable=too-few-public-methods """Collection of application detection utilities.""" __all__ = [ 'join', 'Linux', 'Mac', 'Win', 'DetectedApplication', 'Group', 'CheckConfiguredRoots', 'CheckKnownRoots', 'check_known_mac_roots', 'AppendExecutable', ] import inspect import itertools import os import typing from anaconda_navigator import config as navigator_config if typing.TYPE_CHECKING: import typing_extensions from anaconda_navigator.config import user as user_config class Source(typing_extensions.Protocol): """ Initial source of application details. It is not required for application details to be complete - they still might be extended with filters. """ def __call__(self) -> typing.Iterator['DetectedApplication']: """Iterate through detected applications.""" class Filter(typing_extensions.Protocol): """ Filters for application details. Filters are allowed to generate new application details, as well as modify existing ones before forwarding them further. """ def __call__(self, parent: typing.Iterator['DetectedApplication']) -> typing.Iterator['DetectedApplication']: """Iterate through detected applications.""" def is_source( instance: typing.Callable[..., typing.Iterator['DetectedApplication']], ) -> 'typing_extensions.TypeGuard[Source]': """ Check if `instance` is :class:`~Source` instance. This check is a rough one, as we don't have a strict type limitations. """ signature: inspect.Signature = inspect.signature(instance) return len(signature.parameters) == 0 def is_filter( instance: typing.Callable[..., typing.Iterator['DetectedApplication']], ) -> 'typing_extensions.TypeGuard[Filter]': """ Check if `instance` is :class:`~Filter` instance. This check is a rough one, as we don't have a strict type limitations. """ signature: inspect.Signature = inspect.signature(instance) if len(signature.parameters) != 1: return False parent: typing.Optional[inspect.Parameter] = signature.parameters.get('parent', None) return (parent is not None) and (parent.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD) def join(root: typing.Optional[str], *args: typing.Optional[str]) -> typing.Optional[str]: """Join path parts, if all of them are not :code:`None`.""" if root is None: return None if any(arg is None for arg in args): return None args = typing.cast(typing.Tuple[str, ...], args) return os.path.abspath(os.path.join(root, *args)) class Linux: """Collection of Linux locations.""" def __init__(self) -> None: """Initialize new :class:`~Linux` instance.""" raise NotImplementedError() root: typing.ClassVar[typing.Optional[str]] = None home: typing.ClassVar[typing.Optional[str]] = None if navigator_config.LINUX: root = '/' home = os.path.expanduser('~') class Mac: """Collection of OS X locations.""" def __init__(self) -> None: """Initialize new :class:`~Mac` instance.""" raise NotImplementedError() root: typing.ClassVar[typing.Optional[str]] = None home: typing.ClassVar[typing.Optional[str]] = None applications: typing.ClassVar[typing.Optional[str]] = None user_applications: typing.ClassVar[typing.Optional[str]] = None if navigator_config.MAC: root = '/' home = os.path.expanduser('~') applications = os.path.join(root, 'Applications') user_applications = os.path.join(home, 'Applications') class Win: """Collection of Windows locations.""" def __init__(self) -> None: """Initialize new :class:`~Win` instance.""" raise NotImplementedError() home: typing.ClassVar[typing.Optional[str]] = None program_files_x86: typing.ClassVar[typing.Optional[str]] = None program_files_x64: typing.ClassVar[typing.Optional[str]] = None program_data: typing.ClassVar[typing.Optional[str]] = None local_app_data: typing.ClassVar[typing.Optional[str]] = None local_app_data_low: typing.ClassVar[typing.Optional[str]] = None roaming_app_data: typing.ClassVar[typing.Optional[str]] = None if navigator_config.WIN: from anaconda_navigator.external import knownfolders # pylint: disable=import-outside-toplevel home = os.path.expanduser('~') program_files_x86 = knownfolders.get_folder_path(knownfolders.FOLDERID.ProgramFilesX86)[0] # type: ignore program_files_x64 = knownfolders.get_folder_path(knownfolders.FOLDERID.ProgramFilesX64)[0] # type: ignore program_data = knownfolders.get_folder_path(knownfolders.FOLDERID.ProgramData)[0] # type: ignore local_app_data = knownfolders.get_folder_path(knownfolders.FOLDERID.LocalAppData)[0] # type: ignore local_app_data_low = knownfolders.get_folder_path(knownfolders.FOLDERID.LocalAppDataLow)[0] # type: ignore roaming_app_data = knownfolders.get_folder_path(knownfolders.FOLDERID.RoamingAppData)[0] # type: ignore KEEP: typing.Any = object() class DetectedApplication: """Description of the detected application.""" __slots__ = ('__root', '__executable', '__version') def __init__( self, *, root: typing.Optional[str] = None, executable: typing.Optional[str] = None, version: typing.Optional[str] = None, ) -> None: """Initialize new :class:`~DetectedApplication`.""" self.__root: 'typing_extensions.Final[typing.Optional[str]]' = root self.__executable: 'typing_extensions.Final[typing.Optional[str]]' = executable self.__version: 'typing_extensions.Final[typing.Optional[str]]' = version @property def complete(self) -> bool: # noqa: D401 """All required values are provided.""" return all( item is not None for item in (self.__root, self.__executable, self.__version) ) @property def root(self) -> str: # noqa: D401 """Root directory of the detected application.""" if self.__root is None: raise AttributeError('Incomplete detected application description') return self.__root @property def executable(self) -> str: # noqa: D401 """Path to the executable of the detected application.""" if self.__executable is None: raise AttributeError('Incomplete detected application description') return self.__executable @property def version(self) -> str: # noqa: D401 """Version of the detected application.""" if self.__version is None: raise AttributeError('Incomplete detected application description') return self.__version def replace( self, *, root: typing.Optional[str] = KEEP, executable: typing.Optional[str] = KEEP, version: typing.Optional[str] = KEEP, ) -> 'DetectedApplication': """Prepare a copy of current instance with some additional replacements.""" if root is KEEP: root = self.__root if executable is KEEP: executable = self.__executable if version is KEEP: version = self.__version return DetectedApplication(root=root, executable=executable, version=version) class Group: """ Group of application detecting sources and filters. Order in which you provide sources and filters matters. Output of each source is chained with all previously detected applications (if such). Filters are applied to all previously detected applications. """ __slots__ = ('__content',) def __init__(self, *args: typing.Union['Source', 'Filter']) -> None: """Initialize new :class:`~Group` instance.""" if len(args) <= 0: raise TypeError('at least one source must be provided') if not is_source(args[0]): raise TypeError('first argument must be a Source') arg: typing.Union['Source', 'Filter'] for arg in args: if (not is_source(arg)) and (not is_filter(arg)): raise TypeError(f'each argument must be source- of filter-compatible, not {type(arg).__name__}') self.__content: 'typing_extensions.Final[typing.Tuple[typing.Union[Source, Filter], ...]]' = args def __call__(self) -> typing.Iterator[DetectedApplication]: """Iterate through detected applications.""" result: typing.Iterator[DetectedApplication] = typing.cast('Source', self.__content[0])() arg: typing.Union['Source', 'Filter'] for arg in itertools.islice(self.__content, 1, len(self.__content), 1): if is_source(arg): result = itertools.chain(result, arg()) else: arg = typing.cast('Filter', arg) result = arg(result) return result def collect_str(source: typing.Union[None, str, typing.Iterable[typing.Any]]) -> typing.Iterator[str]: """Collect string values from the recursive collections/iterables of :class:`~str` or :code:`None` instances.""" stack: typing.List[typing.Union[None, str, typing.Iterable[typing.Any]]] = [source] while stack: current: typing.Union[None, str, typing.Iterable[typing.Any]] = stack.pop() if current is None: continue if isinstance(current, str): yield current else: offset: int = len(stack) child: typing.Union[None, str, typing.Iterable[typing.Any]] for child in current: stack.insert(offset, child) class CheckConfiguredRoots: """ Get application root from the configuration. Multiple configuration keys might be provided as `args`. Configuration, from which value should be retrieved, must be provided as a keyword argument. If configuration section is not provided - "main" will be used by default. """ __slots__ = ('__content', '__configuration', '__section') def __init__( self, *args: typing.Union[None, str, typing.Iterable[typing.Optional[str]]], configuration: 'user_config.UserConfig', section: str = 'main', ) -> None: """Initialize new :class:`~CheckKnownRoots` instance.""" self.__content: 'typing_extensions.Final[typing.Tuple[str, ...]]' = tuple(collect_str(args)) self.__configuration: 'typing_extensions.Final[user_config.UserConfig]' = configuration self.__section: 'typing_extensions.Final[str]' = section def __call__(self) -> typing.Iterator[DetectedApplication]: """Iterate through detected applications.""" setting: str for setting in self.__content: root: str = self.__configuration.get(self.__section, setting, '') if not root: continue root = os.path.abspath(root) if os.path.exists(root): yield DetectedApplication(root=root) class CheckKnownRoots: """ Iterate through known roots, selecting only existing ones. Some of root options might be :code:`None` - they will be skipped. """ __slots__ = ('__content',) def __init__(self, *args: typing.Union[None, str, typing.Iterable[typing.Optional[str]]]) -> None: """Initialize new :class:`~CheckKnownRoots` instance.""" self.__content: 'typing_extensions.Final[typing.Tuple[str, ...]]' = tuple(collect_str(args)) def __call__(self) -> typing.Iterator[DetectedApplication]: """Iterate through detected applications.""" root: str for root in self.__content: root = os.path.abspath(root) if os.path.exists(root): yield DetectedApplication(root=root) def check_known_mac_roots(*args: typing.Union[None, str, typing.Iterable[typing.Optional[str]]]) -> CheckKnownRoots: """ Shortcut for checking application roots of OS X applications. Unlike :class:`~CheckKnownRoots` - you need to provide application names instead of directory paths. """ return CheckKnownRoots(*( join(root, arg) for root in (Mac.user_applications, Mac.applications) for arg in collect_str(args) )) class StepIntoRoot: """ Iterate through children of application root. Each child might be checked that it `starts_with`, `ends_with` or `equals` to one or multiple values. Iteration results are sorted. If you want descending order of the results - set `reverse` to :code:`True`. """ __slots__ = ('__equals', '__starts_with', '__ends_with', '__reverse') def __init__( self, *, equals: typing.Union[str, typing.Iterable[str]] = (), starts_with: typing.Union[str, typing.Iterable[str]] = (), ends_with: typing.Union[str, typing.Iterable[str]] = (), reverse: bool = False, ) -> None: """Initialize new :class:`~StepIntoRoot` instance.""" if isinstance(equals, str): equals = {equals} else: equals = set(equals) if isinstance(starts_with, str): starts_with = (starts_with,) else: starts_with = tuple(starts_with) if isinstance(ends_with, str): ends_with = (ends_with,) else: ends_with = tuple(ends_with) self.__equals: 'typing_extensions.Final[typing.Set[str]]' = equals self.__starts_with: 'typing_extensions.Final[typing.Tuple[str, ...]]' = starts_with self.__ends_with: 'typing_extensions.Final[typing.Tuple[str, ...]]' = ends_with self.__reverse: 'typing_extensions.Final[bool]' = reverse def __call__(self, parent: typing.Iterator[DetectedApplication]) -> typing.Iterator[DetectedApplication]: """Iterate through detected applications.""" application: DetectedApplication for application in parent: if not application.root: continue children: typing.List[str] try: children = sorted(os.listdir(application.root), reverse=self.__reverse) except OSError: continue child: str for child in children: if self.__equals and (child not in self.__equals): continue if self.__starts_with and not any(child.startswith(item) for item in self.__starts_with): continue if self.__ends_with and not any(child.endswith(item) for item in self.__ends_with): continue yield application.replace(root=os.path.join(application.root, child)) class AppendExecutable: """ Check multiple options of executables and iterate through existing ones. Each option of the executable must be a path relative to the application root. """ __slots__ = ('__content',) def __init__(self, *args: typing.Union[None, str, typing.Iterable[typing.Optional[str]]]) -> None: """Initialize new :class:`~AppendExecutable` instance.""" self.__content: 'typing_extensions.Final[typing.Tuple[str, ...]]' = tuple(collect_str(args)) def __call__(self, parent: typing.Iterator[DetectedApplication]) -> typing.Iterator[DetectedApplication]: """Iterate through detected applications.""" application: DetectedApplication executable: str for application in parent: for executable in self.__content: executable = os.path.abspath(os.path.join(application.root, executable)) if os.path.exists(executable): yield application.replace(executable=executable) class If: """Iterate through application only if a `condition` is met.""" __slots__ = ('__condition',) def __init__(self, condition: bool) -> None: """Initialize new :class:`~If` instance.""" self.__condition: 'typing_extensions.Final[bool]' = condition def __call__(self, parent: typing.Iterator[DetectedApplication]) -> typing.Iterator[DetectedApplication]: """Iterate through detected applications.""" if self.__condition: return parent return iter(()) def linux_only() -> If: """Iterate through applications only if on a linux machine.""" return If(navigator_config.LINUX) def mac_only() -> If: """Iterate through applications only if on an os x machine.""" return If(navigator_config.MAC) def win_only() -> If: """Iterate through applications only if on a windows machine.""" return If(navigator_config.WIN)