View file File name : polkit.py Content :# -*- coding: utf-8 -*- # slip.dbus.polkit -- convenience decorators and functions for using PolicyKit # with dbus services and clients # # Copyright © 2008, 2009, 2012, 2013, 2015 Red Hat, Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. # # Authors: # Nils Philippsen <nils@redhat.com> """This module contains convenience decorators and functions for using PolicyKit with dbus services and clients.""" from __future__ import absolute_import import collections import dbus from decorator import decorator from functools import reduce from .constants import method_call_no_timeout __all__ = ["require_auth", "enable_proxy", "AUTHFAIL_DONTCATCH", "NotAuthorizedException", "AreAuthorizationsObtainable", "IsSystemBusNameAuthorizedAsync"] def require_auth(polkit_auth): """Decorator for DBus service methods. Specify that a user needs a specific PolicyKit authorization `polkit_auth´ to execute it.""" def require_auth_decorator(method): assert hasattr(method, "_dbus_is_method") setattr(method, "_slip_polkit_auth_required", polkit_auth) return method return require_auth_decorator AUTH_EXC_PREFIX = \ "org.fedoraproject.slip.dbus.service.PolKit.NotAuthorizedException." class AUTHFAIL_DONTCATCH(object): pass def enable_proxy( func=None, authfail_result=AUTHFAIL_DONTCATCH, authfail_exception=None, authfail_callback=None): """Decorator for DBus proxy methods. Let's you (optionally) specify either a result value or an exception type and a callback which is returned, thrown or called respectively if a PolicyKit authorization doesn't exist or can't be obtained in the DBus mechanism, i.e. an appropriate DBus exception is thrown. An exception constructor may and a callback must accept an `action_id´ parameter which will be set to the id of the PolicyKit action for which authorization could not be obtained. Examples: 1) Return `False´ in the event of an authorization problem, and call `error_handler´: def error_handler(action_id=None): print "Authorization problem:", action_id class MyProxy(object): @polkit.enable_proxy(authfail_result=False, authfail_callback=error_handler) def some_method(self, ...): ... 2) Throw a `MyAuthError´ instance in the event of an authorization problem: class MyAuthError(Exception): def __init__(self, *args, **kwargs): action_id = kwargs.pop("action_id") super(MyAuthError, self).__init__(*args, **kwargs) self.action_id = action_id class MyProxy(object): @polkit.enable_proxy(authfail_exception=MyAuthError) def some_method(self, ...): ...""" assert(func is None or isinstance(func, collections.Callable)) assert( authfail_result in (None, AUTHFAIL_DONTCATCH) or authfail_exception is None) assert( authfail_callback is None or isinstance(authfail_callback, collections.Callable)) assert( authfail_exception is None or issubclass(authfail_exception, Exception)) def _enable_proxy(func, *p, **k): try: return func(*p, **k) except dbus.DBusException as e: exc_name = e.get_dbus_name() if not exc_name.startswith(AUTH_EXC_PREFIX): raise action_id = exc_name[len(AUTH_EXC_PREFIX):] if authfail_callback is not None: authfail_callback(action_id=action_id) if authfail_exception is not None: try: af_exc = authfail_exception(action_id=action_id) except: af_exc = authfail_exception() raise af_exc if authfail_result is AUTHFAIL_DONTCATCH: raise return authfail_result if func is not None: return decorator(_enable_proxy, func) else: def decorate(func): return decorator(_enable_proxy, func) return decorate class NotAuthorizedException(dbus.DBusException): """Exception which a DBus service method throws if an authorization required for executing it can't be obtained.""" _dbus_error_name = \ "org.fedoraproject.slip.dbus.service.PolKit.NotAuthorizedException" def __init__(self, action_id, *p, **k): self._dbus_error_name = self.__class__._dbus_error_name + "." +\ action_id super(NotAuthorizedException, self).__init__(*p, **k) class PolKit(object): """Convenience wrapper around polkit.""" _dbus_name = 'org.freedesktop.PolicyKit1' _dbus_path = '/org/freedesktop/PolicyKit1/Authority' _dbus_interface = 'org.freedesktop.PolicyKit1.Authority' __interface = None __bus = None __bus_name = None __signal_receiver = None @classmethod def _on_name_owner_changed(cls, name, old_owner, new_owner): if name == cls._dbus_name and PolKit.__bus: PolKit.__bus.remove_signal_receiver(PolKit.__signal_receiver) PolKit.__bus = None PolKit.__signal_receiver = None PolKit.__interface = None @property def _bus(self): if not PolKit.__bus: PolKit.__bus = dbus.SystemBus() PolKit.__signal_receiver = PolKit.__bus.add_signal_receiver( handler_function=self._on_name_owner_changed, signal_name='NameOwnerChanged', dbus_interface='org.freedesktop.DBus', arg0=self._dbus_name) return PolKit.__bus @property def _bus_name(self): if not PolKit.__bus_name: PolKit.__bus_name = self._bus.get_unique_name() return PolKit.__bus_name @property def _interface(self): if not PolKit.__interface: try: PolKit.__interface = dbus.Interface(self._bus.get_object( self._dbus_name, self._dbus_path), self._dbus_interface) except dbus.DBusException: pass return PolKit.__interface @property def _polkit_present(self): return bool(self._interface) def __dbus_system_bus_name_uid(self, system_bus_name): bus_object = self._bus.get_object( 'org.freedesktop.DBus', '/org/freedesktop/DBus') bus_interface = dbus.Interface(bus_object, 'org.freedesktop.DBus') try: uid = bus_interface.GetConnectionUnixUser(system_bus_name) except: uid = None return uid def __authorization_is_obtainable(self, authorization): if not self._polkit_present: return True (is_authorized, is_challenge, details) = \ self._interface.CheckAuthorization( ("system-bus-name", {"name": self._bus_name}), authorization, {}, 0, "") return is_authorized or is_challenge def AreAuthorizationsObtainable(self, authorizations): if not self._polkit_present: return True if not isinstance(authorizations, (tuple, list, set)): authorizations = (authorizations,) obtainable = \ reduce( lambda x, y: x and self.__authorization_is_obtainable(y), authorizations, True) return obtainable def IsSystemBusNameAuthorizedAsync( self, system_bus_name, action_id, reply_handler, error_handler, challenge=True, details={}): if not self._polkit_present: return reply_handler(action_id is None or self.__dbus_system_bus_name_uid(system_bus_name) == 0) flags = 0 if challenge: flags |= 0x1 def reply_cb(args): (is_authorized, is_challenge, details) = args reply_handler(is_authorized) self._interface.CheckAuthorization( ("system-bus-name", {"name": system_bus_name}), action_id, details, flags, "", reply_handler=reply_cb, error_handler=error_handler, timeout=method_call_no_timeout) __polkit = PolKit() def AreAuthorizationsObtainable(authorizations): return __polkit.AreAuthorizationsObtainable(authorizations) def IsSystemBusNameAuthorizedAsync( system_bus_name, action_id, reply_handler, error_handler, challenge=True, details={}): return __polkit.IsSystemBusNameAuthorizedAsync( system_bus_name, action_id, reply_handler, error_handler, challenge, details)