from fsspec import AbstractFileSystem from fsspec.utils import tokenize class AbstractArchiveFileSystem(AbstractFileSystem): """ A generic superclass for implementing Archive-based filesystems. Currently, it is shared amongst `ZipFileSystem`, `LibArchiveFileSystem` and `TarFileSystem`. """ def __str__(self): return "" % (type(self).__name__, id(self)) __repr__ = __str__ def ukey(self, path): return tokenize(path, self.fo, self.protocol) def _all_dirnames(self, paths): """Returns *all* directory names for each path in paths, including intermediate ones. Parameters ---------- paths: Iterable of path strings """ if len(paths) == 0: return set() dirnames = {self._parent(path) for path in paths} - {self.root_marker} return dirnames | self._all_dirnames(dirnames) def info(self, path, **kwargs): self._get_dirs() path = self._strip_protocol(path) if path in self.dir_cache: return self.dir_cache[path] elif path + "/" in self.dir_cache: return self.dir_cache[path + "/"] else: raise FileNotFoundError(path) def ls(self, path, detail=False, **kwargs): self._get_dirs() paths = {} for p, f in self.dir_cache.items(): p = p.rstrip("/") if "/" in p: root = p.rsplit("/", 1)[0] else: root = "" if root == path.rstrip("/"): paths[p] = f elif all( (a == b) for a, b in zip(path.split("/"), [""] + p.strip("/").split("/")) ): # root directory entry ppath = p.rstrip("/").split("/", 1)[0] if ppath not in paths: out = {"name": ppath + "/", "size": 0, "type": "directory"} paths[ppath] = out out = list(paths.values()) if detail: return out else: return list(sorted(f["name"] for f in out))