Spaces:
Runtime error
Runtime error
| import itertools | |
| import os | |
| from collections import UserDict | |
| from dataclasses import dataclass, field | |
| from typing import Callable, Dict, Iterator, List, Optional, Union | |
| from installer import log | |
| class Directory: # forward declaration | |
| ... | |
| FilePathList = List[str] | |
| FilePathIterator = Iterator[str] | |
| DirectoryPathList = List[str] | |
| DirectoryPathIterator = Iterator[str] | |
| DirectoryList = List[Directory] | |
| DirectoryIterator = Iterator[Directory] | |
| DirectoryCollection = Dict[str, Directory] | |
| ExtensionFilter = Callable | |
| ExtensionList = list[str] | |
| RecursiveType = Union[bool,Callable] | |
| def real_path(directory_path:str) -> Union[str, None]: | |
| try: | |
| return os.path.abspath(os.path.expanduser(directory_path)) | |
| except Exception: | |
| pass | |
| return None | |
| class Directory(Directory): # pylint: disable=E0102 | |
| path: str = field(default_factory=str) | |
| mtime: float = field(default_factory=float, init=False) | |
| files: FilePathList = field(default_factory=list) | |
| directories: DirectoryPathList = field(default_factory=list) | |
| def __post_init__(self): | |
| object.__setattr__(self, 'mtime', self.live_mtime) | |
| def from_dict(cls, dict_object: dict) -> Directory: | |
| directory = cls.__new__(cls) | |
| object.__setattr__(directory, 'path', dict_object.get('path')) | |
| object.__setattr__(directory, 'mtime', dict_object.get('mtime')) | |
| object.__setattr__(directory, 'files', dict_object.get('files')) | |
| object.__setattr__(directory, 'directories', dict_object.get('directories')) | |
| return directory | |
| def clear(self) -> None: | |
| self._update(Directory.from_dict({ | |
| 'path': None, | |
| 'mtime': float(), | |
| 'files': [], | |
| 'directories': [] | |
| })) | |
| def update(self, source_directory: Directory) -> Directory: | |
| if source_directory is not self: | |
| self._update(source_directory) | |
| return self | |
| def _update(self, source:Directory) -> None: | |
| assert not source.path or source.path == self.path, f'When updating a directory, the paths must match. Attemped to update Directory `{self.path}` with `{source.path}`' | |
| for dead_path in self.directories: | |
| if dead_path not in source.directories: | |
| delete_cached_directory(dead_path) | |
| self.directories[:] = source.directories | |
| self.files[:] = source.files | |
| object.__setattr__(self, 'mtime', source.mtime) | |
| def exists(self) -> bool: | |
| return self.path and os.path.exists(self.path) | |
| def is_directory(self) -> bool: | |
| return self.exists and os.path.isdir(self.path) | |
| def live_mtime(self) -> float: | |
| return os.path.getmtime(self.path) if self.is_directory else 0 | |
| def is_stale(self) -> bool: | |
| return not self.is_directory or self.mtime != self.live_mtime | |
| class DirectoryCache(UserDict, DirectoryCollection): | |
| def __delattr__(self, directory_path: str) -> None: | |
| directory: Directory = get_directory(directory_path, fetch=False) | |
| if directory: | |
| map(delete_cached_directory, directory.directories) | |
| directory.clear() | |
| del self.data[directory_path] | |
| def clean_directory(directory: Directory, /, recursive: RecursiveType=False) -> bool: | |
| if not directory.is_directory: | |
| is_clean = False | |
| delete_cached_directory(directory.path) | |
| else: | |
| is_clean = not directory.is_stale | |
| if not is_clean: | |
| directory.update(fetch_directory(directory.path)) | |
| else: | |
| for directory_path in directory.directories[:]: | |
| try: | |
| recurse = recursive and (not callable(recursive) or recursive(directory.path)) | |
| directory = get_directory(directory_path, fetch=recurse) | |
| if directory: | |
| if directory.is_directory: | |
| if recurse: | |
| is_clean = clean_directory(directory, recursive=recurse) and is_clean | |
| continue | |
| delete_cached_directory(directory_path) | |
| # If we had intended to fetch this directory, but didn't, that means it doesn't exist. Purge. | |
| if recurse: | |
| directory.directories.remove(directory_path) | |
| is_clean = False | |
| except Exception: | |
| pass | |
| return is_clean | |
| def get_directory(directory_or_path: str, /, fetch:bool=True) -> Union[Directory, None]: | |
| if isinstance(directory_or_path, Directory): | |
| if directory_or_path.is_directory: | |
| return directory_or_path | |
| else: | |
| directory_or_path = directory_or_path.path | |
| directory_or_path = real_path(directory_or_path) | |
| if not cache_folders.get(directory_or_path, None): | |
| if fetch: | |
| directory = fetch_directory(directory_path=directory_or_path) | |
| if directory: | |
| cache_folders[directory_or_path] = directory | |
| else: | |
| clean_directory(cache_folders[directory_or_path]) | |
| return cache_folders[directory_or_path] if directory_or_path in cache_folders else None | |
| def fetch_directory(directory_path: str) -> Union[Directory, None]: | |
| directory: Directory | |
| for directory in _walk(directory_path, recurse=False): | |
| return directory # The return is intentional, we get a generator, we only need the one | |
| return None | |
| def _walk(top, recurse:RecursiveType=True) -> Directory: | |
| # reimplemented `path.walk()` | |
| nondirs = [] | |
| walk_dirs = [] | |
| try: | |
| scandir_it = os.scandir(top) | |
| except OSError: | |
| return | |
| with scandir_it: | |
| while True: | |
| try: | |
| entry = next(scandir_it) | |
| except StopIteration: | |
| break | |
| if not entry.is_dir(): | |
| nondirs.append(entry.path) | |
| else: | |
| if entry.is_symlink() and not os.path.exists(entry.path): | |
| log.error(f'Files broken symlink: {entry.path}') | |
| else: | |
| walk_dirs.append(entry.path) | |
| yield Directory(top, nondirs, walk_dirs) | |
| if recurse: | |
| for new_path in walk_dirs: | |
| if callable(recurse) and not recurse(new_path): | |
| continue | |
| yield from _walk(new_path, recurse=recurse) | |
| def _cached_walk(top, recurse:RecursiveType=True) -> Directory: | |
| top = get_directory(top) | |
| if not top: | |
| return | |
| yield top | |
| if recurse: | |
| for child_directory in top.directories: | |
| if os.path.basename(child_directory).startswith('models--'): | |
| continue | |
| if callable(recurse) and not recurse(child_directory): | |
| continue | |
| yield from _cached_walk(child_directory, recurse=recurse) | |
| def walk(top, recurse:RecursiveType=True, cached=True) -> Directory: | |
| yield from _cached_walk(top, recurse=recurse) if cached else _walk(top, recurse=recurse) | |
| def delete_cached_directory(directory_path:str) -> bool: | |
| global cache_folders # pylint: disable=W0602 | |
| if directory_path in cache_folders: | |
| del cache_folders[directory_path] | |
| def is_directory(dir_path:str) -> bool: | |
| return dir_path and os.path.exists(dir_path) and os.path.isdir(dir_path) | |
| def directory_mtime(directory_path:str, /, recursive:RecursiveType=True) -> float: | |
| return float(max(0, *[directory.mtime for directory in get_directories(directory_path, recursive=recursive)])) | |
| def unique_directories(directories:DirectoryPathList, /, recursive:RecursiveType=True) -> DirectoryPathIterator: | |
| '''Ensure no empty, or duplicates''' | |
| '''If we are going recursive, then directories that are children of other directories are redundant''' | |
| ''' @todo this is incredibly inneficient. the hit is small, but it is ugly, no? ''' | |
| directories = sorted(unique_paths(directories), reverse=True) | |
| while directories: | |
| directory = directories.pop() | |
| yield directory | |
| if not recursive: | |
| continue | |
| _directory = os.path.join(directory, '') | |
| child_directory = None | |
| while directories and directories[-1].startswith(_directory): | |
| if not callable(recursive) or not child_directory: | |
| directories.pop() | |
| continue | |
| child_directory = directories[-1][len(directory):] | |
| if child_directory: | |
| next_directory = _directory | |
| if not callable(recursive): | |
| _remove_directory = next_directory | |
| else: | |
| for sub_directory in child_directory.split(os.path.sep): | |
| next_directory = os.path.join(next_directory, sub_directory) | |
| if recursive(next_directory): | |
| _remove_directory = os.path.join(next_directory, '') | |
| break | |
| while _remove_directory and directories: | |
| _d = directories.pop() | |
| if not directories[-1].startswith(_remove_directory): | |
| del _remove_directory | |
| def unique_paths(directory_paths:DirectoryPathList) -> DirectoryPathIterator: | |
| realpaths = (real_path(directory_path) for directory_path in filter(bool, directory_paths)) | |
| return {real_directory_path: True for real_directory_path in filter(bool, realpaths)}.keys() | |
| def get_directories(*directory_paths: DirectoryPathList, fetch:bool=True, recursive:RecursiveType=True) -> DirectoryCollection: | |
| directory_paths = unique_directories(directory_paths, recursive=recursive) | |
| directories = (get_directory(directory_path, fetch=fetch) for directory_path in directory_paths) | |
| return filter(bool, directories) | |
| def directory_files(*directories_or_paths: Union[DirectoryPathList, DirectoryList], recursive: RecursiveType=True) -> FilePathIterator: | |
| return itertools.chain.from_iterable( | |
| itertools.chain( | |
| directory_object.files, | |
| [] | |
| if not recursive | |
| else itertools.chain.from_iterable( | |
| directory_files(directory, recursive=recursive) | |
| for directory | |
| in filter( | |
| bool, | |
| map(get_directory, filter(((bool if recursive else False) if not callable(recursive) else recursive), directory_object.directories)) | |
| ) | |
| ) | |
| ) | |
| for directory_object | |
| in filter(bool, map(get_directory, directories_or_paths)) | |
| ) | |
| def extension_filter(ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> ExtensionFilter: | |
| if ext_filter: | |
| ext_filter = [*map(str.upper, ext_filter)] | |
| if ext_blacklist: | |
| ext_blacklist = [*map(str.upper, ext_blacklist)] | |
| def filter_functon(fp:str): | |
| return (not ext_filter or any(fp.upper().endswith(ew) for ew in ext_filter)) and (not ext_blacklist or not any(fp.upper().endswith(ew) for ew in ext_blacklist)) | |
| return filter_functon | |
| def not_hidden(filepath: str) -> bool: | |
| return not os.path.basename(filepath).startswith('.') | |
| def filter_files(file_paths: FilePathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None) -> FilePathIterator: | |
| return filter(extension_filter(ext_filter, ext_blacklist), file_paths) | |
| def list_files(*directory_paths:DirectoryPathList, ext_filter: Optional[ExtensionList]=None, ext_blacklist: Optional[ExtensionList]=None, recursive:RecursiveType=True) -> FilePathIterator: | |
| return filter_files(itertools.chain.from_iterable( | |
| directory_files(directory, recursive=recursive) | |
| for directory in get_directories(*directory_paths, recursive=recursive) | |
| ), ext_filter, ext_blacklist) | |
| cache_folders = DirectoryCache({}) | |