"""Synchronous IO wrappers around jeepney """ import array from collections import deque from errno import ECONNRESET import functools from itertools import count import os from selectors import DefaultSelector, EVENT_READ import socket import time from typing import Optional from jeepney import Parser, Message, MessageType, HeaderFields from jeepney.auth import Authenticator, BEGIN from jeepney.bus import get_bus from jeepney.fds import FileDescriptor, fds_buf_size from jeepney.wrappers import ProxyBase, unwrap_msg from jeepney.routing import Router from jeepney.bus_messages import message_bus from .common import MessageFilters, FilterHandle, check_replyable __all__ = [ 'open_dbus_connection', 'DBusConnection', 'Proxy', ] class _Future: def __init__(self): self._result = None def done(self): return bool(self._result) def set_exception(self, exception): self._result = (False, exception) def set_result(self, result): self._result = (True, result) def result(self): success, value = self._result if success: return value raise value def timeout_to_deadline(timeout): if timeout is not None: return time.monotonic() + timeout return None def deadline_to_timeout(deadline): if deadline is not None: return max(deadline - time.monotonic(), 0.) return None class DBusConnectionBase: """Connection machinery shared by this module and threading""" def __init__(self, sock: socket.socket, enable_fds=False): self.sock = sock self.enable_fds = enable_fds self.parser = Parser() self.outgoing_serial = count(start=1) self.selector = DefaultSelector() self.select_key = self.selector.register(sock, EVENT_READ) self.unique_name = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False def _serialise(self, message: Message, serial) -> (bytes, Optional[array.array]): if serial is None: serial = next(self.outgoing_serial) fds = array.array('i') if self.enable_fds else None data = message.serialise(serial=serial, fds=fds) return data, fds def _send_with_fds(self, data, fds): bytes_sent = self.sock.sendmsg( [data], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)] ) # If sendmsg succeeds, I think ancillary data has been sent atomically? # So now we just need to send any leftover normal data. if bytes_sent < len(data): self.sock.sendall(data[bytes_sent:]) def _receive(self, deadline): while True: msg = self.parser.get_next_message() if msg is not None: return msg b, fds = self._read_some_data(timeout=deadline_to_timeout(deadline)) self.parser.add_data(b, fds=fds) def _read_some_data(self, timeout=None): for key, ev in self.selector.select(timeout): if key == self.select_key: if self.enable_fds: return self._read_with_fds() else: return unwrap_read(self.sock.recv(4096)), [] raise TimeoutError def _read_with_fds(self): nbytes = self.parser.bytes_desired() data, ancdata, flags, _ = self.sock.recvmsg(nbytes, fds_buf_size()) if flags & getattr(socket, 'MSG_CTRUNC', 0): self.close() raise RuntimeError("Unable to receive all file descriptors") return unwrap_read(data), FileDescriptor.from_ancdata(ancdata) def close(self): """Close the connection""" self.selector.close() self.sock.close() class DBusConnection(DBusConnectionBase): def __init__(self, sock: socket.socket, enable_fds=False): super().__init__(sock, enable_fds) self._unwrap_reply = False # Message routing machinery self.router = Router(_Future) # Old interface, for backwards compat self._filters = MessageFilters() # Say Hello, get our unique name self.bus_proxy = Proxy(message_bus, self) hello_reply = self.bus_proxy.Hello() self.unique_name = hello_reply[0] def send(self, message: Message, serial=None): """Serialise and send a :class:`~.Message` object""" data, fds = self._serialise(message, serial) if fds: self._send_with_fds(data, fds) else: self.sock.sendall(data) send_message = send # Backwards compatibility def receive(self, *, timeout=None) -> Message: """Return the next available message from the connection If the data is ready, this will return immediately, even if timeout<=0. Otherwise, it will wait for up to timeout seconds, or indefinitely if timeout is None. If no message comes in time, it raises TimeoutError. """ return self._receive(timeout_to_deadline(timeout)) def recv_messages(self, *, timeout=None): """Receive one message and apply filters See :meth:`filter`. Returns nothing. """ msg = self.receive(timeout=timeout) self.router.incoming(msg) for filter in self._filters.matches(msg): filter.queue.append(msg) def send_and_get_reply(self, message, *, timeout=None, unwrap=None): """Send a message, wait for the reply and return it Filters are applied to other messages received before the reply - see :meth:`add_filter`. """ check_replyable(message) deadline = timeout_to_deadline(timeout) if unwrap is None: unwrap = self._unwrap_reply serial = next(self.outgoing_serial) self.send_message(message, serial=serial) while True: msg_in = self.receive(timeout=deadline_to_timeout(deadline)) reply_to = msg_in.header.fields.get(HeaderFields.reply_serial, -1) if reply_to == serial: if unwrap: return unwrap_msg(msg_in) return msg_in # Not the reply self.router.incoming(msg_in) for filter in self._filters.matches(msg_in): filter.queue.append(msg_in) def filter(self, rule, *, queue: Optional[deque] =None, bufsize=1): """Create a filter for incoming messages Usage:: with conn.filter(rule) as matches: # matches is a deque containing matched messages matching_msg = conn.recv_until_filtered(matches) :param jeepney.MatchRule rule: Catch messages matching this rule :param collections.deque queue: Matched messages will be added to this :param int bufsize: If no deque is passed in, create one with this size """ if queue is None: queue = deque(maxlen=bufsize) return FilterHandle(self._filters, rule, queue) def recv_until_filtered(self, queue, *, timeout=None) -> Message: """Process incoming messages until one is filtered into queue Pops the message from queue and returns it, or raises TimeoutError if the optional timeout expires. Without a timeout, this is equivalent to:: while len(queue) == 0: conn.recv_messages() return queue.popleft() In the other I/O modules, there is no need for this, because messages are placed in queues by a separate task. :param collections.deque queue: A deque connected by :meth:`filter` :param float timeout: Maximum time to wait in seconds """ deadline = timeout_to_deadline(timeout) while len(queue) == 0: self.recv_messages(timeout=deadline_to_timeout(deadline)) return queue.popleft() class Proxy(ProxyBase): """A blocking proxy for calling D-Bus methods You can call methods on the proxy object, such as ``bus_proxy.Hello()`` to make a method call over D-Bus and wait for a reply. It will either return a tuple of returned data, or raise :exc:`.DBusErrorResponse`. The methods available are defined by the message generator you wrap. You can set a time limit on a call by passing ``_timeout=`` in the method call, or set a default when creating the proxy. The ``_timeout`` argument is not passed to the message generator. All timeouts are in seconds, and :exc:`TimeoutErrror` is raised if it expires before a reply arrives. :param msggen: A message generator object :param ~blocking.DBusConnection connection: Connection to send and receive messages :param float timeout: Default seconds to wait for a reply, or None for no limit """ def __init__(self, msggen, connection, *, timeout=None): super().__init__(msggen) self._connection = connection self._timeout = timeout def __repr__(self): extra = '' if (self._timeout is None) else f', timeout={self._timeout}' return f"Proxy({self._msggen}, {self._connection}{extra})" def _method_call(self, make_msg): @functools.wraps(make_msg) def inner(*args, **kwargs): timeout = kwargs.pop('_timeout', self._timeout) msg = make_msg(*args, **kwargs) assert msg.header.message_type is MessageType.method_call return self._connection.send_and_get_reply( msg, timeout=timeout, unwrap=True ) return inner def unwrap_read(b): """Raise ConnectionResetError from an empty read. Sometimes the socket raises an error itself, sometimes it gives no data. I haven't worked out when it behaves each way. """ if not b: raise ConnectionResetError(ECONNRESET, os.strerror(ECONNRESET)) return b def prep_socket(addr, enable_fds=False, timeout=2.0) -> socket.socket: """Create a socket and authenticate ready to send D-Bus messages""" sock = socket.socket(family=socket.AF_UNIX) # To impose the overall auth timeout, we'll update the timeout on the socket # before each send/receive. This is ugly, but we can't use the socket for # anything else until this has succeeded, so this should be safe. deadline = timeout_to_deadline(timeout) def with_sock_deadline(meth, *args): sock.settimeout(deadline_to_timeout(deadline)) return meth(*args) try: with_sock_deadline(sock.connect, addr) authr = Authenticator(enable_fds=enable_fds) for req_data in authr: with_sock_deadline(sock.sendall, req_data) authr.feed(unwrap_read(with_sock_deadline(sock.recv, 1024))) with_sock_deadline(sock.sendall, BEGIN) except socket.timeout as e: sock.close() raise TimeoutError(f"Did not authenticate in {timeout} seconds") from e except: sock.close() raise sock.settimeout(None) # Put the socket back in blocking mode return sock def open_dbus_connection( bus='SESSION', enable_fds=False, auth_timeout=1., ) -> DBusConnection: """Connect to a D-Bus message bus Pass ``enable_fds=True`` to allow sending & receiving file descriptors. An error will be raised if the bus does not allow this. For simplicity, it's advisable to leave this disabled unless you need it. D-Bus has an authentication step before sending or receiving messages. This takes < 1 ms in normal operation, but there is a timeout so that client code won't get stuck if the server doesn't reply. *auth_timeout* configures this timeout in seconds. """ bus_addr = get_bus(bus) sock = prep_socket(bus_addr, enable_fds, timeout=auth_timeout) conn = DBusConnection(sock, enable_fds) return conn if __name__ == '__main__': conn = open_dbus_connection() print("Unique name:", conn.unique_name)