mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2025-01-22 12:23:30 +08:00
7adf8d09b4
Some test scripts use "from gi.repository import GObject" whereas others use "import gobject". gi.repository is not always available on embedded systems, so convert all instances to this format: try: from gi.repository import GObject except ImportError: import gobject as GObject Also, sort the imports in this order: system, dbus, gobject, bluezutils
88 lines
2.0 KiB
Python
Executable File
88 lines
2.0 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
import sys
|
|
import dbus
|
|
import dbus.service
|
|
import dbus.mainloop.glib
|
|
try:
|
|
from gi.repository import GObject
|
|
except ImportError:
|
|
import gobject as GObject
|
|
|
|
BUS_NAME = 'org.bluez.obex'
|
|
PATH = '/org/bluez/obex'
|
|
AGENT_MANAGER_INTERFACE = 'org.bluez.obex.AgentManager1'
|
|
AGENT_INTERFACE = 'org.bluez.obex.Agent1'
|
|
TRANSFER_INTERFACE = 'org.bluez.obex.Transfer1'
|
|
|
|
def ask(prompt):
|
|
try:
|
|
return raw_input(prompt)
|
|
except:
|
|
return input(prompt)
|
|
|
|
class Agent(dbus.service.Object):
|
|
def __init__(self, conn=None, obj_path=None):
|
|
dbus.service.Object.__init__(self, conn, obj_path)
|
|
self.pending_auth = False
|
|
|
|
@dbus.service.method(AGENT_INTERFACE, in_signature="o",
|
|
out_signature="s")
|
|
def AuthorizePush(self, path):
|
|
transfer = dbus.Interface(bus.get_object(BUS_NAME, path),
|
|
'org.freedesktop.DBus.Properties')
|
|
properties = transfer.GetAll(TRANSFER_INTERFACE);
|
|
|
|
self.pending_auth = True
|
|
auth = ask("Authorize (%s, %s) (Y/n):" % (path,
|
|
properties['Name']))
|
|
|
|
if auth == "n" or auth == "N":
|
|
self.pending_auth = False
|
|
raise dbus.DBusException(
|
|
"org.bluez.obex.Error.Rejected: "
|
|
"Not Authorized")
|
|
|
|
self.pending_auth = False
|
|
|
|
return properties['Name']
|
|
|
|
@dbus.service.method(AGENT_INTERFACE, in_signature="",
|
|
out_signature="")
|
|
def Cancel(self):
|
|
print("Authorization Canceled")
|
|
self.pending_auth = False
|
|
|
|
if __name__ == '__main__':
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
bus = dbus.SessionBus()
|
|
manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
|
|
AGENT_MANAGER_INTERFACE)
|
|
|
|
path = "/test/agent"
|
|
agent = Agent(bus, path)
|
|
|
|
mainloop = GObject.MainLoop()
|
|
|
|
manager.RegisterAgent(path)
|
|
print("Agent registered")
|
|
|
|
cont = True
|
|
while cont:
|
|
try:
|
|
mainloop.run()
|
|
except KeyboardInterrupt:
|
|
if agent.pending_auth:
|
|
agent.Cancel()
|
|
elif len(transfers) > 0:
|
|
for a in transfers:
|
|
a.cancel()
|
|
else:
|
|
cont = False
|
|
|
|
# manager.UnregisterAgent(path)
|
|
# print "Agent unregistered"
|