Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions pydbus/authorization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2016 Anselm Kruis
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#

"""
A decorator for Polkit authorization
"""

from __future__ import print_function, absolute_import
import gi
gi.require_version('Polkit', '1.0')
from gi.repository import GLib, Polkit, Gio
import functools
from pydbus import registration, generic
# import traceback

__all__ = ['PolkitAuthorization']


class PolkitAuthorization(object):
@staticmethod
def NotAuthorizedError(message="Not authorized to perform operation"):
return GLib.Error.new_literal(Polkit.error_quark(), message, Polkit.Error.NOT_AUTHORIZED)

def __init__(self, action_id, details=None, flags=Polkit.CheckAuthorizationFlags.ALLOW_USER_INTERACTION):
if callable(action_id):
raise TypeError("PolkitAuthorization needs an action id")
self.action_id = action_id
self.details = details
self.flags = flags

def __call__(self, func):
func_info = generic.inspect_function(func, flag_names=('async',), arg_names=('dbus_bus', 'dbus_method_invocation', "polkit_is_authorized"))
is_async = func_info['async']
needs_dbus_method_invocation = func_info["dbus_method_invocation"]
needs_dbus_bus = func_info["dbus_bus"]
needs_polkit_is_authorized = func_info["polkit_is_authorized"]

@functools.wraps(func)
def wrapper(*args, **kw):
bus = kw['dbus_bus'] if needs_dbus_bus else kw.pop('dbus_bus')
method_invocation = kw['dbus_method_invocation'] if needs_dbus_method_invocation else kw.pop('dbus_method_invocation')
try:
cancellable = Gio.Cancellable()
state = dict(bus=bus,
method_invocation=method_invocation,
func=func,
is_async=is_async,
needs_polkit_is_authorized=needs_polkit_is_authorized,
cancellable=cancellable,
args=args,
kw=kw)
timeout = bus.timeout
if timeout == -1:
# -1 indicates a default value
timeout = 25000 # DBus default

GLib.timeout_add(timeout, cancellable.cancel)
Polkit.Authority.get_async(cancellable, self._cb_get_authority_ready, state)
except Exception as e:
# traceback.print_exc() # FIXME: better error reporting. logging?
method_invocation.return_exception(e)

wrapper.async = True
wrapper.arg_dbus_bus = True
wrapper.arg_dbus_method_invocation = True
return wrapper

def _cb_get_authority_ready(self, source_object, res, state):
# source_object is None
method_invocation = state['method_invocation']
try:
authority = Polkit.Authority.get_finish(res)
assert authority is not None

sender = method_invocation.get_sender()
dbus_service = state['bus'].get('.DBus')['']
sender_pid = dbus_service.GetConnectionUnixProcessID(sender) # synchronous call for now
subject = Polkit.UnixProcess.new(sender_pid)

authority.check_authorization(subject,
self.action_id,
self.details,
self.flags,
state['cancellable'],
self._cb_check_authorization_ready,
state)
except Exception as e:
# traceback.print_exc() # FIXME: better error reporting. logging?
method_invocation.return_exception(e)

def _cb_check_authorization_ready(self, authority, res, state):
method_invocation = state['method_invocation']
needs_polkit_is_authorized = state['needs_polkit_is_authorized']
try:
result = authority.check_authorization_finish(res)
is_authorized = result.get_is_authorized()
if not needs_polkit_is_authorized and not is_authorized:
method_invocation.return_exception(self.NotAuthorizedError())
return

if needs_polkit_is_authorized:
state['kw']['polkit_is_authorized'] = is_authorized

result = state['func'](*state['args'], **state['kw'])
if not (state['is_async'] or result is registration.METHOD_IS_ASYNC):
method_invocation.return_value(result)

except Exception as e:
# traceback.print_exc() # FIXME: better error reporting. logging?
method_invocation.return_exception(e)
8 changes: 4 additions & 4 deletions pydbus/bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ def __enter__(self):
def __exit__(self, exc_type, exc_value, traceback):
self.con = None

def SystemBus():
return Bus(Bus.Type.SYSTEM)
def SystemBus(timeout=1000):
return Bus(Bus.Type.SYSTEM, timeout=timeout)

def SessionBus():
return Bus(Bus.Type.SESSION)
def SessionBus(timeout=1000):
return Bus(Bus.Type.SESSION, timeout=timeout)

if __name__ == "__main__":
import sys
Expand Down
53 changes: 53 additions & 0 deletions pydbus/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
on dbus, they can be used everywhere.
"""

import inspect


class subscription(object):
__slots__ = ("callback_list", "callback")

Expand Down Expand Up @@ -103,3 +106,53 @@ def __repr__(self):
return "<signal " + self.__qualname__ + " at 0x" + format(id(self), "x") + ">"

bound_method = type(signal().emit) # TODO find a prettier way to get this type


def inspect_function(func, flag_names, arg_names):
"""Inspect a function or method.

This function inspects *func* and returns
boolean flags and information, whether *func* wants a partiucular
arguments.

The flag *NAME* is set, if *func* has an attribute *NAME*, whose
value is true in a boolean context.

The function wants the argument *NAME*, it *NAME* is in the list of
named arguments or if *func* has an attribute ``arg_``*NAME*, whose
value is true in a boolean context.

:parameter func: a callable object.
:parameter flag_names: an iterable, that yields flag names (strings)
:type flag_names: :class:`~collections.Iterable`
:parameter arg_names: an iterable, that yields potential argument names.
:type arg_names: :class:`~collections.Iterable`
:returns: a dictionary, that contains a boolean value for each flag-name
and arg-name.
:rtype: dict
"""
result = {}
for name in flag_names:
try:
value = bool(getattr(func, name)) # be careful, func can be anything
except Exception:
value = False
result[name] = value

if arg_names:
try:
func_args = inspect.getargspec(func)[0]
except TypeError:
# not a function
func_args = ()
for name in arg_names:
if name in func_args:
result[name] = True
continue
try:
value = bool(getattr(func, "arg_" + name))
except Exception:
value = False
result[name] = value

return result
65 changes: 48 additions & 17 deletions pydbus/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
from . import generic
from .exitable import ExitableWithAliases

METHOD_IS_ASYNC = object()


class ObjectWrapper(ExitableWithAliases("unwrap")):
__slots__ = ["object", "outargs", "property_types"]
__slots__ = ["object", "outargs", "property_types", "bus"]

def __init__(self, object, interfaces):
def __init__(self, object, interfaces, bus):
self.object = object
self.bus = bus

self.outargs = {}
for iface in interfaces:
Expand Down Expand Up @@ -40,30 +44,57 @@ def onPropertiesChanged(iface, changed, invalidated):
SignalEmitted = generic.signal()

def call_method(self, connection, sender, object_path, interface_name, method_name, parameters, invocation):
def return_exception(exc):
if isinstance(exc, GLib.GError):
# the much simpler invocation.return_gerror(exc) raises TypeError.
invocation.return_error_literal(GLib.quark_from_string(exc.domain), exc.code, exc.message)
else:
# TODO Think of a better way to translate Python exception types to DBus error types.
e_type = type(exc).__name__
if "." not in e_type:
e_type = "unknown." + e_type
invocation.return_dbus_error(e_type, str(exc))

try:
outargs = self.outargs[interface_name + "." + method_name]
loutargs = len(outargs)
soutargs = "(" + "".join(outargs) + ")"

invocation.return_value_raw = return_value_raw = invocation.return_value

def return_value(result):
if loutargs == 0:
return_value_raw(None)
elif loutargs == 1:
return_value_raw(GLib.Variant(soutargs, (result,)))
else:
return_value_raw(GLib.Variant(soutargs, result))
return False

invocation.return_value = return_value
invocation.return_exception = return_exception

method = getattr(self.object, method_name)
method_info = generic.inspect_function(method, flag_names=('async',), arg_names=('dbus_bus', 'dbus_method_invocation'))

result = method(*parameters)
if method_info["async"] and not method_info["dbus_method_invocation"]:
# for now, we require a asynchronous method to accept the argument 'dbus_method_invocation'
# and to return its result itself.
raise TypeError("an asynchronous method must accept the argument 'dbus_method_invocation'")

#if len(outargs) == 1:
# result = (result,)
kw = {}
if method_info["dbus_bus"]:
kw["dbus_bus"] = self.bus
if method_info["dbus_method_invocation"]:
kw["dbus_method_invocation"] = invocation

if len(outargs) == 0:
invocation.return_value(None)
elif len(outargs) == 1:
invocation.return_value(GLib.Variant(soutargs, (result,)))
else:
invocation.return_value(GLib.Variant(soutargs, result))
result = method(*parameters, **kw)
if not (result is METHOD_IS_ASYNC or method_info["async"]):
# func is synchronous and returned its result. Send it back to the remote caller.
return_value(result)

except Exception as e:
#TODO Think of a better way to translate Python exception types to DBus error types.
e_type = type(e).__name__
if not "." in e_type:
e_type = "unknown." + e_type
invocation.return_dbus_error(e_type, str(e))
return_exception(e)

def get_property(self, connection, sender, object_path, interface_name, property_name):
# Note: It's impossible to correctly return an exception, as
Expand Down Expand Up @@ -119,5 +150,5 @@ def register_object(self, path, object, node_info):
node_info = [Gio.DBusNodeInfo.new_for_xml(ni) for ni in node_info]
interfaces = sum((ni.interfaces for ni in node_info), [])

wrapper = ObjectWrapper(object, interfaces)
wrapper = ObjectWrapper(object, interfaces, self)
return ObjectRegistration(self.con, path, interfaces, wrapper, own_wrapper=True)
Loading