| from __future__ import absolute_import |
| |
| import csv |
| import functools |
| import logging |
| import os |
| import sys |
| import sysconfig |
| |
| from pip._vendor import pkg_resources |
| |
| from pip._internal.exceptions import UninstallationError |
| from pip._internal.locations import bin_py, bin_user |
| from pip._internal.utils.compat import WINDOWS, cache_from_source, uses_pycache |
| from pip._internal.utils.logging import indent_log |
| from pip._internal.utils.misc import ( |
| FakeFile, |
| ask, |
| dist_in_usersite, |
| dist_is_local, |
| egg_link_path, |
| is_local, |
| normalize_path, |
| renames, |
| rmtree, |
| ) |
| from pip._internal.utils.temp_dir import AdjacentTempDirectory, TempDirectory |
| from pip._internal.utils.typing import MYPY_CHECK_RUNNING |
| |
| if MYPY_CHECK_RUNNING: |
| from typing import ( |
| Any, Callable, Dict, Iterable, Iterator, List, Optional, Set, Tuple, |
| ) |
| from pip._vendor.pkg_resources import Distribution |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def _script_names(dist, script_name, is_gui): |
| # type: (Distribution, str, bool) -> List[str] |
| """Create the fully qualified name of the files created by |
| {console,gui}_scripts for the given ``dist``. |
| Returns the list of file names |
| """ |
| if dist_in_usersite(dist): |
| bin_dir = bin_user |
| else: |
| bin_dir = bin_py |
| exe_name = os.path.join(bin_dir, script_name) |
| paths_to_remove = [exe_name] |
| if WINDOWS: |
| paths_to_remove.append(exe_name + '.exe') |
| paths_to_remove.append(exe_name + '.exe.manifest') |
| if is_gui: |
| paths_to_remove.append(exe_name + '-script.pyw') |
| else: |
| paths_to_remove.append(exe_name + '-script.py') |
| return paths_to_remove |
| |
| |
| def _unique(fn): |
| # type: (Callable[..., Iterator[Any]]) -> Callable[..., Iterator[Any]] |
| @functools.wraps(fn) |
| def unique(*args, **kw): |
| # type: (Any, Any) -> Iterator[Any] |
| seen = set() # type: Set[Any] |
| for item in fn(*args, **kw): |
| if item not in seen: |
| seen.add(item) |
| yield item |
| return unique |
| |
| |
| @_unique |
| def uninstallation_paths(dist): |
| # type: (Distribution) -> Iterator[str] |
| """ |
| Yield all the uninstallation paths for dist based on RECORD-without-.py[co] |
| |
| Yield paths to all the files in RECORD. For each .py file in RECORD, add |
| the .pyc and .pyo in the same directory. |
| |
| UninstallPathSet.add() takes care of the __pycache__ .py[co]. |
| """ |
| r = csv.reader(FakeFile(dist.get_metadata_lines('RECORD'))) |
| for row in r: |
| path = os.path.join(dist.location, row[0]) |
| yield path |
| if path.endswith('.py'): |
| dn, fn = os.path.split(path) |
| base = fn[:-3] |
| path = os.path.join(dn, base + '.pyc') |
| yield path |
| path = os.path.join(dn, base + '.pyo') |
| yield path |
| |
| |
| def compact(paths): |
| # type: (Iterable[str]) -> Set[str] |
| """Compact a path set to contain the minimal number of paths |
| necessary to contain all paths in the set. If /a/path/ and |
| /a/path/to/a/file.txt are both in the set, leave only the |
| shorter path.""" |
| |
| sep = os.path.sep |
| short_paths = set() # type: Set[str] |
| for path in sorted(paths, key=len): |
| should_skip = any( |
| path.startswith(shortpath.rstrip("*")) and |
| path[len(shortpath.rstrip("*").rstrip(sep))] == sep |
| for shortpath in short_paths |
| ) |
| if not should_skip: |
| short_paths.add(path) |
| return short_paths |
| |
| |
| def compress_for_rename(paths): |
| # type: (Iterable[str]) -> Set[str] |
| """Returns a set containing the paths that need to be renamed. |
| |
| This set may include directories when the original sequence of paths |
| included every file on disk. |
| """ |
| case_map = dict((os.path.normcase(p), p) for p in paths) |
| remaining = set(case_map) |
| unchecked = sorted(set(os.path.split(p)[0] |
| for p in case_map.values()), key=len) |
| wildcards = set() # type: Set[str] |
| |
| def norm_join(*a): |
| # type: (str) -> str |
| return os.path.normcase(os.path.join(*a)) |
| |
| for root in unchecked: |
| if any(os.path.normcase(root).startswith(w) |
| for w in wildcards): |
| # This directory has already been handled. |
| continue |
| |
| all_files = set() # type: Set[str] |
| all_subdirs = set() # type: Set[str] |
| for dirname, subdirs, files in os.walk(root): |
| all_subdirs.update(norm_join(root, dirname, d) |
| for d in subdirs) |
| all_files.update(norm_join(root, dirname, f) |
| for f in files) |
| # If all the files we found are in our remaining set of files to |
| # remove, then remove them from the latter set and add a wildcard |
| # for the directory. |
| if not (all_files - remaining): |
| remaining.difference_update(all_files) |
| wildcards.add(root + os.sep) |
| |
| return set(map(case_map.__getitem__, remaining)) | wildcards |
| |
| |
| def compress_for_output_listing(paths): |
| # type: (Iterable[str]) -> Tuple[Set[str], Set[str]] |
| """Returns a tuple of 2 sets of which paths to display to user |
| |
| The first set contains paths that would be deleted. Files of a package |
| are not added and the top-level directory of the package has a '*' added |
| at the end - to signify that all it's contents are removed. |
| |
| The second set contains files that would have been skipped in the above |
| folders. |
| """ |
| |
| will_remove = set(paths) |
| will_skip = set() |
| |
| # Determine folders and files |
| folders = set() |
| files = set() |
| for path in will_remove: |
| if path.endswith(".pyc"): |
| continue |
| if path.endswith("__init__.py") or ".dist-info" in path: |
| folders.add(os.path.dirname(path)) |
| files.add(path) |
| |
| # probably this one https://github.com/python/mypy/issues/390 |
| _normcased_files = set(map(os.path.normcase, files)) # type: ignore |
| |
| folders = compact(folders) |
| |
| # This walks the tree using os.walk to not miss extra folders |
| # that might get added. |
| for folder in folders: |
| for dirpath, _, dirfiles in os.walk(folder): |
| for fname in dirfiles: |
| if fname.endswith(".pyc"): |
| continue |
| |
| file_ = os.path.join(dirpath, fname) |
| if (os.path.isfile(file_) and |
| os.path.normcase(file_) not in _normcased_files): |
| # We are skipping this file. Add it to the set. |
| will_skip.add(file_) |
| |
| will_remove = files | { |
| os.path.join(folder, "*") for folder in folders |
| } |
| |
| return will_remove, will_skip |
| |
| |
| class StashedUninstallPathSet(object): |
| """A set of file rename operations to stash files while |
| tentatively uninstalling them.""" |
| def __init__(self): |
| # type: () -> None |
| # Mapping from source file root to [Adjacent]TempDirectory |
| # for files under that directory. |
| self._save_dirs = {} # type: Dict[str, TempDirectory] |
| # (old path, new path) tuples for each move that may need |
| # to be undone. |
| self._moves = [] # type: List[Tuple[str, str]] |
| |
| def _get_directory_stash(self, path): |
| # type: (str) -> str |
| """Stashes a directory. |
| |
| Directories are stashed adjacent to their original location if |
| possible, or else moved/copied into the user's temp dir.""" |
| |
| try: |
| save_dir = AdjacentTempDirectory(path) # type: TempDirectory |
| except OSError: |
| save_dir = TempDirectory(kind="uninstall") |
| self._save_dirs[os.path.normcase(path)] = save_dir |
| |
| return save_dir.path |
| |
| def _get_file_stash(self, path): |
| # type: (str) -> str |
| """Stashes a file. |
| |
| If no root has been provided, one will be created for the directory |
| in the user's temp directory.""" |
| path = os.path.normcase(path) |
| head, old_head = os.path.dirname(path), None |
| save_dir = None |
| |
| while head != old_head: |
| try: |
| save_dir = self._save_dirs[head] |
| break |
| except KeyError: |
| pass |
| head, old_head = os.path.dirname(head), head |
| else: |
| # Did not find any suitable root |
| head = os.path.dirname(path) |
| save_dir = TempDirectory(kind='uninstall') |
| self._save_dirs[head] = save_dir |
| |
| relpath = os.path.relpath(path, head) |
| if relpath and relpath != os.path.curdir: |
| return os.path.join(save_dir.path, relpath) |
| return save_dir.path |
| |
| def stash(self, path): |
| # type: (str) -> str |
| """Stashes the directory or file and returns its new location. |
| Handle symlinks as files to avoid modifying the symlink targets. |
| """ |
| path_is_dir = os.path.isdir(path) and not os.path.islink(path) |
| if path_is_dir: |
| new_path = self._get_directory_stash(path) |
| else: |
| new_path = self._get_file_stash(path) |
| |
| self._moves.append((path, new_path)) |
| if (path_is_dir and os.path.isdir(new_path)): |
| # If we're moving a directory, we need to |
| # remove the destination first or else it will be |
| # moved to inside the existing directory. |
| # We just created new_path ourselves, so it will |
| # be removable. |
| os.rmdir(new_path) |
| renames(path, new_path) |
| return new_path |
| |
| def commit(self): |
| # type: () -> None |
| """Commits the uninstall by removing stashed files.""" |
| for _, save_dir in self._save_dirs.items(): |
| save_dir.cleanup() |
| self._moves = [] |
| self._save_dirs = {} |
| |
| def rollback(self): |
| # type: () -> None |
| """Undoes the uninstall by moving stashed files back.""" |
| for p in self._moves: |
| logger.info("Moving to %s\n from %s", *p) |
| |
| for new_path, path in self._moves: |
| try: |
| logger.debug('Replacing %s from %s', new_path, path) |
| if os.path.isfile(new_path) or os.path.islink(new_path): |
| os.unlink(new_path) |
| elif os.path.isdir(new_path): |
| rmtree(new_path) |
| renames(path, new_path) |
| except OSError as ex: |
| logger.error("Failed to restore %s", new_path) |
| logger.debug("Exception: %s", ex) |
| |
| self.commit() |
| |
| @property |
| def can_rollback(self): |
| # type: () -> bool |
| return bool(self._moves) |
| |
| |
| class UninstallPathSet(object): |
| """A set of file paths to be removed in the uninstallation of a |
| requirement.""" |
| def __init__(self, dist): |
| # type: (Distribution) -> None |
| self.paths = set() # type: Set[str] |
| self._refuse = set() # type: Set[str] |
| self.pth = {} # type: Dict[str, UninstallPthEntries] |
| self.dist = dist |
| self._moved_paths = StashedUninstallPathSet() |
| |
| def _permitted(self, path): |
| # type: (str) -> bool |
| """ |
| Return True if the given path is one we are permitted to |
| remove/modify, False otherwise. |
| |
| """ |
| return is_local(path) |
| |
| def add(self, path): |
| # type: (str) -> None |
| head, tail = os.path.split(path) |
| |
| # we normalize the head to resolve parent directory symlinks, but not |
| # the tail, since we only want to uninstall symlinks, not their targets |
| path = os.path.join(normalize_path(head), os.path.normcase(tail)) |
| |
| if not os.path.exists(path): |
| return |
| if self._permitted(path): |
| self.paths.add(path) |
| else: |
| self._refuse.add(path) |
| |
| # __pycache__ files can show up after 'installed-files.txt' is created, |
| # due to imports |
| if os.path.splitext(path)[1] == '.py' and uses_pycache: |
| self.add(cache_from_source(path)) |
| |
| def add_pth(self, pth_file, entry): |
| # type: (str, str) -> None |
| pth_file = normalize_path(pth_file) |
| if self._permitted(pth_file): |
| if pth_file not in self.pth: |
| self.pth[pth_file] = UninstallPthEntries(pth_file) |
| self.pth[pth_file].add(entry) |
| else: |
| self._refuse.add(pth_file) |
| |
| def remove(self, auto_confirm=False, verbose=False): |
| # type: (bool, bool) -> None |
| """Remove paths in ``self.paths`` with confirmation (unless |
| ``auto_confirm`` is True).""" |
| |
| if not self.paths: |
| logger.info( |
| "Can't uninstall '%s'. No files were found to uninstall.", |
| self.dist.project_name, |
| ) |
| return |
| |
| dist_name_version = ( |
| self.dist.project_name + "-" + self.dist.version |
| ) |
| logger.info('Uninstalling %s:', dist_name_version) |
| |
| with indent_log(): |
| if auto_confirm or self._allowed_to_proceed(verbose): |
| moved = self._moved_paths |
| |
| for_rename = compress_for_rename(self.paths) |
| |
| for path in sorted(compact(for_rename)): |
| moved.stash(path) |
| logger.debug('Removing file or directory %s', path) |
| |
| for pth in self.pth.values(): |
| pth.remove() |
| |
| logger.info('Successfully uninstalled %s', dist_name_version) |
| |
| def _allowed_to_proceed(self, verbose): |
| # type: (bool) -> bool |
| """Display which files would be deleted and prompt for confirmation |
| """ |
| |
| def _display(msg, paths): |
| # type: (str, Iterable[str]) -> None |
| if not paths: |
| return |
| |
| logger.info(msg) |
| with indent_log(): |
| for path in sorted(compact(paths)): |
| logger.info(path) |
| |
| if not verbose: |
| will_remove, will_skip = compress_for_output_listing(self.paths) |
| else: |
| # In verbose mode, display all the files that are going to be |
| # deleted. |
| will_remove = set(self.paths) |
| will_skip = set() |
| |
| _display('Would remove:', will_remove) |
| _display('Would not remove (might be manually added):', will_skip) |
| _display('Would not remove (outside of prefix):', self._refuse) |
| if verbose: |
| _display('Will actually move:', compress_for_rename(self.paths)) |
| |
| return ask('Proceed (y/n)? ', ('y', 'n')) == 'y' |
| |
| def rollback(self): |
| # type: () -> None |
| """Rollback the changes previously made by remove().""" |
| if not self._moved_paths.can_rollback: |
| logger.error( |
| "Can't roll back %s; was not uninstalled", |
| self.dist.project_name, |
| ) |
| return |
| logger.info('Rolling back uninstall of %s', self.dist.project_name) |
| self._moved_paths.rollback() |
| for pth in self.pth.values(): |
| pth.rollback() |
| |
| def commit(self): |
| # type: () -> None |
| """Remove temporary save dir: rollback will no longer be possible.""" |
| self._moved_paths.commit() |
| |
| @classmethod |
| def from_dist(cls, dist): |
| # type: (Distribution) -> UninstallPathSet |
| dist_path = normalize_path(dist.location) |
| if not dist_is_local(dist): |
| logger.info( |
| "Not uninstalling %s at %s, outside environment %s", |
| dist.key, |
| dist_path, |
| sys.prefix, |
| ) |
| return cls(dist) |
| |
| if dist_path in {p for p in {sysconfig.get_path("stdlib"), |
| sysconfig.get_path("platstdlib")} |
| if p}: |
| logger.info( |
| "Not uninstalling %s at %s, as it is in the standard library.", |
| dist.key, |
| dist_path, |
| ) |
| return cls(dist) |
| |
| paths_to_remove = cls(dist) |
| develop_egg_link = egg_link_path(dist) |
| develop_egg_link_egg_info = '{}.egg-info'.format( |
| pkg_resources.to_filename(dist.project_name)) |
| egg_info_exists = dist.egg_info and os.path.exists(dist.egg_info) |
| # Special case for distutils installed package |
| distutils_egg_info = getattr(dist._provider, 'path', None) |
| |
| # Uninstall cases order do matter as in the case of 2 installs of the |
| # same package, pip needs to uninstall the currently detected version |
| if (egg_info_exists and dist.egg_info.endswith('.egg-info') and |
| not dist.egg_info.endswith(develop_egg_link_egg_info)): |
| # if dist.egg_info.endswith(develop_egg_link_egg_info), we |
| # are in fact in the develop_egg_link case |
| paths_to_remove.add(dist.egg_info) |
| if dist.has_metadata('installed-files.txt'): |
| for installed_file in dist.get_metadata( |
| 'installed-files.txt').splitlines(): |
| path = os.path.normpath( |
| os.path.join(dist.egg_info, installed_file) |
| ) |
| paths_to_remove.add(path) |
| # FIXME: need a test for this elif block |
| # occurs with --single-version-externally-managed/--record outside |
| # of pip |
| elif dist.has_metadata('top_level.txt'): |
| if dist.has_metadata('namespace_packages.txt'): |
| namespaces = dist.get_metadata('namespace_packages.txt') |
| else: |
| namespaces = [] |
| for top_level_pkg in [ |
| p for p |
| in dist.get_metadata('top_level.txt').splitlines() |
| if p and p not in namespaces]: |
| path = os.path.join(dist.location, top_level_pkg) |
| paths_to_remove.add(path) |
| paths_to_remove.add(path + '.py') |
| paths_to_remove.add(path + '.pyc') |
| paths_to_remove.add(path + '.pyo') |
| |
| elif distutils_egg_info: |
| raise UninstallationError( |
| "Cannot uninstall {!r}. It is a distutils installed project " |
| "and thus we cannot accurately determine which files belong " |
| "to it which would lead to only a partial uninstall.".format( |
| dist.project_name, |
| ) |
| ) |
| |
| elif dist.location.endswith('.egg'): |
| # package installed by easy_install |
| # We cannot match on dist.egg_name because it can slightly vary |
| # i.e. setuptools-0.6c11-py2.6.egg vs setuptools-0.6rc11-py2.6.egg |
| paths_to_remove.add(dist.location) |
| easy_install_egg = os.path.split(dist.location)[1] |
| easy_install_pth = os.path.join(os.path.dirname(dist.location), |
| 'easy-install.pth') |
| paths_to_remove.add_pth(easy_install_pth, './' + easy_install_egg) |
| |
| elif egg_info_exists and dist.egg_info.endswith('.dist-info'): |
| for path in uninstallation_paths(dist): |
| paths_to_remove.add(path) |
| |
| elif develop_egg_link: |
| # develop egg |
| with open(develop_egg_link, 'r') as fh: |
| link_pointer = os.path.normcase(fh.readline().strip()) |
| assert (link_pointer == dist.location), ( |
| 'Egg-link {} does not match installed location of {} ' |
| '(at {})'.format( |
| link_pointer, dist.project_name, dist.location) |
| ) |
| paths_to_remove.add(develop_egg_link) |
| easy_install_pth = os.path.join(os.path.dirname(develop_egg_link), |
| 'easy-install.pth') |
| paths_to_remove.add_pth(easy_install_pth, dist.location) |
| |
| else: |
| logger.debug( |
| 'Not sure how to uninstall: %s - Check: %s', |
| dist, dist.location, |
| ) |
| |
| # find distutils scripts= scripts |
| if dist.has_metadata('scripts') and dist.metadata_isdir('scripts'): |
| for script in dist.metadata_listdir('scripts'): |
| if dist_in_usersite(dist): |
| bin_dir = bin_user |
| else: |
| bin_dir = bin_py |
| paths_to_remove.add(os.path.join(bin_dir, script)) |
| if WINDOWS: |
| paths_to_remove.add(os.path.join(bin_dir, script) + '.bat') |
| |
| # find console_scripts |
| _scripts_to_remove = [] |
| console_scripts = dist.get_entry_map(group='console_scripts') |
| for name in console_scripts.keys(): |
| _scripts_to_remove.extend(_script_names(dist, name, False)) |
| # find gui_scripts |
| gui_scripts = dist.get_entry_map(group='gui_scripts') |
| for name in gui_scripts.keys(): |
| _scripts_to_remove.extend(_script_names(dist, name, True)) |
| |
| for s in _scripts_to_remove: |
| paths_to_remove.add(s) |
| |
| return paths_to_remove |
| |
| |
| class UninstallPthEntries(object): |
| def __init__(self, pth_file): |
| # type: (str) -> None |
| self.file = pth_file |
| self.entries = set() # type: Set[str] |
| self._saved_lines = None # type: Optional[List[bytes]] |
| |
| def add(self, entry): |
| # type: (str) -> None |
| entry = os.path.normcase(entry) |
| # On Windows, os.path.normcase converts the entry to use |
| # backslashes. This is correct for entries that describe absolute |
| # paths outside of site-packages, but all the others use forward |
| # slashes. |
| # os.path.splitdrive is used instead of os.path.isabs because isabs |
| # treats non-absolute paths with drive letter markings like c:foo\bar |
| # as absolute paths. It also does not recognize UNC paths if they don't |
| # have more than "\\sever\share". Valid examples: "\\server\share\" or |
| # "\\server\share\folder". Python 2.7.8+ support UNC in splitdrive. |
| if WINDOWS and not os.path.splitdrive(entry)[0]: |
| entry = entry.replace('\\', '/') |
| self.entries.add(entry) |
| |
| def remove(self): |
| # type: () -> None |
| logger.debug('Removing pth entries from %s:', self.file) |
| |
| # If the file doesn't exist, log a warning and return |
| if not os.path.isfile(self.file): |
| logger.warning( |
| "Cannot remove entries from nonexistent file %s", self.file |
| ) |
| return |
| with open(self.file, 'rb') as fh: |
| # windows uses '\r\n' with py3k, but uses '\n' with py2.x |
| lines = fh.readlines() |
| self._saved_lines = lines |
| if any(b'\r\n' in line for line in lines): |
| endline = '\r\n' |
| else: |
| endline = '\n' |
| # handle missing trailing newline |
| if lines and not lines[-1].endswith(endline.encode("utf-8")): |
| lines[-1] = lines[-1] + endline.encode("utf-8") |
| for entry in self.entries: |
| try: |
| logger.debug('Removing entry: %s', entry) |
| lines.remove((entry + endline).encode("utf-8")) |
| except ValueError: |
| pass |
| with open(self.file, 'wb') as fh: |
| fh.writelines(lines) |
| |
| def rollback(self): |
| # type: () -> bool |
| if self._saved_lines is None: |
| logger.error( |
| 'Cannot roll back changes to %s, none were made', self.file |
| ) |
| return False |
| logger.debug('Rolling %s back to previous state', self.file) |
| with open(self.file, 'wb') as fh: |
| fh.writelines(self._saved_lines) |
| return True |