Edit file File name : debug.py Content :# # Copyright (C) 2015 Red Hat, Inc. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions of # the GNU General Public License v.2, or (at your option) any later version. # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY expressed or implied, including the implied warranties of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General # Public License for more details. You should have received a copy of the # GNU General Public License along with this program; if not, write to the # Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA # 02110-1301, USA. Any Red Hat trademarks that are incorporated in the # source code or documentation are not subject to the GNU General Public # License and may only be used or replicated with the express permission of # Red Hat, Inc. # from __future__ import absolute_import from __future__ import unicode_literals from dnf.i18n import ucd from dnfpluginscore import _, logger import dnf import dnf.cli import gzip import hawkey import os import rpm import sys import time DEBUG_VERSION = "dnf-debug-dump version 1\n" class Debug(dnf.Plugin): name = 'debug' def __init__(self, base, cli): super(Debug, self).__init__(base, cli) self.base = base self.cli = cli if self.cli is not None: self.cli.register_command(DebugDumpCommand) self.cli.register_command(DebugRestoreCommand) class DebugDumpCommand(dnf.cli.Command): aliases = ("debug-dump",) summary = _("dump information about installed rpm packages to file") def __init__(self, cli): super(DebugDumpCommand, self).__init__(cli) self.dump_file = None def configure(self): self.cli.demands.sack_activation = True self.cli.demands.available_repos = True @staticmethod def set_argparser(parser): parser.add_argument( "--norepos", action="store_true", default=False, help=_("do not attempt to dump the repository contents.")) parser.add_argument( "filename", nargs="?", help=_("optional name of dump file")) def run(self): """create debug txt file and compress it, if no filename specified use dnf_debug_dump-<timestamp>.txt.gz by default""" filename = self.opts.filename if not filename: now = time.strftime("%Y-%m-%d_%T", time.localtime(time.time())) filename = "dnf_debug_dump-%s-%s.txt.gz" % (os.uname()[1], now) filename = os.path.abspath(filename) if filename.endswith(".gz"): self.dump_file = gzip.GzipFile(filename, "w") else: self.dump_file = open(filename, "w") self.write(DEBUG_VERSION) self.dump_system_info() self.dump_dnf_config_info() self.dump_rpm_problems() self.dump_packages(not self.opts.norepos) self.dump_rpmdb_versions() self.dump_file.close() print(_("Output written to: %s") % filename) def write(self, msg): if dnf.pycomp.PY3 and isinstance(self.dump_file, gzip.GzipFile): msg = bytes(msg, "utf8") dnf.pycomp.write_to_file(self.dump_file, msg) def dump_system_info(self): self.write("%%%%SYSTEM INFO\n") uname = os.uname() self.write(" uname: %s, %s\n" % (uname[2], uname[4])) self.write(" rpm ver: %s\n" % rpm.__version__) self.write(" python ver: %s\n" % sys.version.replace("\n", "")) return def dump_dnf_config_info(self): var = self.base.conf.substitutions plugins = ",".join([p.name for p in self.base._plugins.plugins]) self.write("%%%%DNF INFO\n") self.write(" arch: %s\n" % var["arch"]) self.write(" basearch: %s\n" % var["basearch"]) self.write(" releasever: %s\n" % var["releasever"]) self.write(" dnf ver: %s\n" % dnf.const.VERSION) self.write(" enabled plugins: %s\n" % plugins) self.write(" global excludes: %s\n" % ",".join(self.base.conf.excludepkgs)) return def dump_rpm_problems(self): self.write("%%%%RPMDB PROBLEMS\n") (missing, conflicts) = rpm_problems(self.base) self.write("".join(["Package %s requires %s\n" % (ucd(pkg), ucd(req)) for (req, pkg) in missing])) self.write("".join(["Package %s conflicts with %s\n" % (ucd(pkg), ucd(conf)) for (conf, pkg) in conflicts])) def dump_packages(self, load_repos): q = self.base.sack.query() # packages from rpmdb self.write("%%%%RPMDB\n") for p in sorted(q.installed()): self.write(" %s\n" % pkgspec(p)) if not load_repos: return self.write("%%%%REPOS\n") available = q.available() for repo in sorted(self.base.repos.iter_enabled(), key=lambda x: x.id): try: url = None if repo.metalink is not None: url = repo.metalink elif repo.mirrorlist is not None: url = repo.mirrorlist elif len(repo.baseurl) > 0: url = repo.baseurl[0] self.write("%%%s - %s\n" % (repo.id, url)) self.write(" excludes: %s\n" % ",".join(repo.excludepkgs)) for po in sorted(available.filter(reponame=repo.id)): self.write(" %s\n" % pkgspec(po)) except dnf.exceptions.Error as e: self.write("Error accessing repo %s: %s\n" % (repo, str(e))) continue return def dump_rpmdb_versions(self): self.write("%%%%RPMDB VERSIONS\n") version = self.base.sack._rpmdb_version() self.write(" all: %s\n" % version) return class DebugRestoreCommand(dnf.cli.Command): aliases = ("debug-restore",) summary = _("restore packages recorded in debug-dump file") def configure(self): self.cli.demands.sack_activation = True self.cli.demands.available_repos = True self.cli.demands.root_user = True if not self.opts.output: self.cli.demands.resolving = True @staticmethod def set_argparser(parser): parser.add_argument( "--output", action="store_true", help=_("output commands that would be run to stdout.")) parser.add_argument( "--install-latest", action="store_true", help=_("Install the latest version of recorded packages.")) parser.add_argument( "--ignore-arch", action="store_true", help=_("Ignore architecture and install missing packages matching " "the name, epoch, version and release.")) parser.add_argument( "--filter-types", metavar="[install, remove, replace]", default="install, remove, replace", help=_("limit to specified type")) parser.add_argument( "--remove-installonly", action="store_true", help=_('Allow removing of install-only packages. Using this option may ' 'result in an attempt to remove the running kernel.')) parser.add_argument( "filename", nargs=1, help=_("name of dump file")) def run(self): """Execute the command action here.""" if self.opts.filter_types: self.opts.filter_types = set( self.opts.filter_types.replace(",", " ").split()) dump_pkgs = self.read_dump_file(self.opts.filename[0]) self.process_installed(dump_pkgs, self.opts) self.process_dump(dump_pkgs, self.opts) def process_installed(self, dump_pkgs, opts): installed = self.base.sack.query().installed() installonly_pkgs = self.base._get_installonly_query(installed) for pkg in installed: pkg_remove = False spec = pkgspec(pkg) dumped_versions = dump_pkgs.get((pkg.name, pkg.arch), None) if dumped_versions is not None: evr = (pkg.epoch, pkg.version, pkg.release) if evr in dumped_versions: # the correct version is already installed dumped_versions[evr] = 'skip' else: # other version is currently installed if pkg in installonly_pkgs: # package is install-only, should be removed pkg_remove = True else: # package should be upgraded / downgraded if "replace" in opts.filter_types: action = 'replace' else: action = 'skip' for d_evr in dumped_versions.keys(): dumped_versions[d_evr] = action else: # package should not be installed pkg_remove = True if pkg_remove and "remove" in opts.filter_types: if pkg not in installonly_pkgs or opts.remove_installonly: if opts.output: print("remove %s" % spec) else: self.base.package_remove(pkg) def process_dump(self, dump_pkgs, opts): for (n, a) in sorted(dump_pkgs.keys()): dumped_versions = dump_pkgs[(n, a)] for (e, v, r) in sorted(dumped_versions.keys()): action = dumped_versions[(e, v, r)] if action == 'skip': continue if opts.ignore_arch: arch = "" else: arch = "." + a if opts.install_latest and action == "install": pkg_spec = "%s%s" % (n, arch) else: pkg_spec = pkgtup2spec(n, arch, e, v, r) if action in opts.filter_types: if opts.output: print("%s %s" % (action, pkg_spec)) else: try: self.base.install(pkg_spec) except dnf.exceptions.MarkingError: logger.error(_("Package %s is not available"), pkg_spec) @staticmethod def read_dump_file(filename): if filename.endswith(".gz"): fobj = gzip.GzipFile(filename) else: fobj = open(filename) if ucd(fobj.readline()) != DEBUG_VERSION: logger.error(_("Bad dnf debug file: %s"), filename) raise dnf.exceptions.Error skip = True pkgs = {} for line in fobj: line = ucd(line) if skip: if line == "%%%%RPMDB\n": skip = False continue if not line or line[0] != " ": break pkg_spec = line.strip() nevra = hawkey.split_nevra(pkg_spec) # {(name, arch): {(epoch, version, release): action}} pkgs.setdefault((nevra.name, nevra.arch), {})[ (nevra.epoch, nevra.version, nevra.release)] = "install" return pkgs def rpm_problems(base): rpmdb = dnf.sack._rpmdb_sack(base) allpkgs = rpmdb.query().installed() requires = set() conflicts = set() for pkg in allpkgs: requires.update([(req, pkg) for req in pkg.requires if not str(req) == "solvable:prereqmarker" and not str(req).startswith("rpmlib(")]) conflicts.update([(conf, pkg) for conf in pkg.conflicts]) missing_requires = [(req, pkg) for (req, pkg) in requires if not allpkgs.filter(provides=req)] existing_conflicts = [(conf, pkg) for (conf, pkg) in conflicts if allpkgs.filter(provides=conf)] return missing_requires, existing_conflicts def pkgspec(pkg): return pkgtup2spec(pkg.name, pkg.arch, pkg.epoch, pkg.version, pkg.release) def pkgtup2spec(name, arch, epoch, version, release): a = "" if not arch else ".%s" % arch.lstrip('.') e = "" if epoch in (None, "") else "%s:" % epoch return "%s-%s%s-%s%s" % (name, e, version, release, a) Save