mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2024-11-25 13:14:14 +08:00
449 lines
18 KiB
Python
Executable File
449 lines
18 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import dbus
|
|
import dbus.decorators
|
|
import dbus.glib
|
|
import gobject
|
|
import sys
|
|
import getopt
|
|
from signal import *
|
|
|
|
mgr_cmds = [ "InterfaceVersion", "ListAdapters", "DefaultAdapter" ]
|
|
mgr_signals = [ "AdapterAdded", "AdapterRemoved" ]
|
|
|
|
dev_cmds = [ "GetAddress",
|
|
"GetVersion",
|
|
"GetRevision",
|
|
"GetManufacturer",
|
|
"GetCompany",
|
|
"GetMode",
|
|
"SetMode",
|
|
"GetDiscoverableTimeout",
|
|
"SetDiscoverableTimeout",
|
|
"IsConnectable",
|
|
"IsDiscoverable",
|
|
"IsConnected",
|
|
"ListConnections",
|
|
"GetMajorClass",
|
|
"ListAvailableMinorClasses",
|
|
"GetMinorClass",
|
|
"SetMinorClass",
|
|
"GetServiceClasses",
|
|
"GetName",
|
|
"SetName",
|
|
"GetRemoteVersion",
|
|
"GetRemoteRevision",
|
|
"GetRemoteManufacturer",
|
|
"GetRemoteCompany",
|
|
"GetRemoteMajorClass",
|
|
"GetRemoteMinorClass",
|
|
"GetRemoteServiceClasses",
|
|
"GetRemoteClass",
|
|
"GetRemoteName",
|
|
"GetRemoteAlias",
|
|
"SetRemoteAlias",
|
|
"ClearRemoteAlias",
|
|
"LastSeen",
|
|
"LastUsed",
|
|
"DisconnectRemoteDevice",
|
|
"CreateBonding",
|
|
"CancelBondingProcess",
|
|
"RemoveBonding",
|
|
"HasBonding",
|
|
"ListBondings",
|
|
"GetPinCodeLength",
|
|
"GetEncryptionKeySize",
|
|
"DiscoverDevices",
|
|
"DiscoverDevicesWithoutNameResolving",
|
|
"CancelDiscovery",
|
|
"ListRemoteDevices",
|
|
"ListRecentRemoteDevices" ]
|
|
dev_signals = [ "ModeChanged",
|
|
"NameChanged",
|
|
"MinorClassChanged",
|
|
"DiscoveryStarted",
|
|
"DiscoveryCompleted",
|
|
"RemoteDeviceFound",
|
|
"RemoteNameUpdated",
|
|
"RemoteNameFailed",
|
|
"RemoteAliasChanged"
|
|
"RemoteAliasCleared",
|
|
"RemoteDeviceConnected",
|
|
"RemoteDeviceDisconnectRequested",
|
|
"RemoteDeviceDisconnected",
|
|
"BondingCreated",
|
|
"BondingRemoved" ]
|
|
|
|
dev_signals_filter = [ "/org/bluez/hci0", "/org/bluez/hci1",
|
|
"/org/bluez/hci2", "/org/bluez/hci3",
|
|
"/org/bluez/hci4", "/org/bluez/hci5",
|
|
"/org/bluez/hci6", "/org/bluez/hci7" ]
|
|
|
|
class Tester:
|
|
exit_events = []
|
|
dev_path = None
|
|
need_dev = False
|
|
listen = False
|
|
at_interrupt = None
|
|
|
|
def __init__(self, argv):
|
|
self.name = argv[0]
|
|
|
|
self.parse_args(argv[1:])
|
|
|
|
try:
|
|
self.dbus_setup()
|
|
except dbus.DBusException, e:
|
|
print 'Failed to do D-Bus setup: %s' % e
|
|
sys.exit(1)
|
|
|
|
def parse_args(self, argv):
|
|
try:
|
|
opts, args = getopt.getopt(argv, "hli:")
|
|
except getopt.GetoptError:
|
|
self.usage()
|
|
sys.exit(1)
|
|
|
|
for o, a in opts:
|
|
if o == "-h":
|
|
self.usage()
|
|
sys.exit()
|
|
elif o == "-l":
|
|
self.listen = True
|
|
elif o == "-i":
|
|
if a[0] == '/':
|
|
self.dev_path = a
|
|
else:
|
|
self.dev_path = '/org/bluez/%s' % a
|
|
|
|
if not (args or self.listen):
|
|
self.usage()
|
|
sys.exit(1)
|
|
|
|
if args:
|
|
self.cmd = args[0]
|
|
self.cmd_args = args[1:]
|
|
|
|
def dbus_dev_setup(self):
|
|
if not self.dev_path:
|
|
try:
|
|
self.dbus_mgr_setup()
|
|
self.dev_path = self.manager.DefaultAdapter()
|
|
except dbus.DBusException, e:
|
|
print 'Failed to get default device: %s' % e
|
|
sys.exit(1)
|
|
try:
|
|
obj = self.bus.get_object('org.bluez', self.dev_path)
|
|
self.device = dbus.Interface(obj, 'org.bluez.Adapter')
|
|
except dbus.DBusException, e:
|
|
print 'Failed to setup device path: %s' % e
|
|
sys.exit(1)
|
|
|
|
def dbus_dev_sig_setup(self):
|
|
try:
|
|
for signal in dev_signals:
|
|
for path in dev_signals_filter:
|
|
self.bus.add_signal_receiver(self.dev_signal_handler,
|
|
signal, 'org.bluez.Adapter',
|
|
'org.bluez', path,
|
|
message_keyword='dbus_message')
|
|
except dbus.DBusException, e:
|
|
print 'Failed to setup signal handler for device path: %s' % e
|
|
sys.exit(1)
|
|
|
|
def dbus_mgr_sig_setup(self):
|
|
try:
|
|
for signal in mgr_signals:
|
|
self.bus.add_signal_receiver(self.mgr_signal_handler,
|
|
signal,'org.bluez.Manager',
|
|
'org.bluez', '/org/bluez')
|
|
except dbus.DBusException, e:
|
|
print 'Failed to setup signal handler for manager path: %s' % e
|
|
sys.exit(1)
|
|
|
|
def dbus_mgr_setup(self):
|
|
self.manager_obj = self.bus.get_object('org.bluez', '/org/bluez')
|
|
self.manager = dbus.Interface(self.manager_obj, 'org.bluez.Manager')
|
|
|
|
def dbus_setup(self):
|
|
self.bus = dbus.SystemBus()
|
|
|
|
def usage(self):
|
|
print 'Usage: %s [-i <dev>] [-l] [-h] <cmd> [arg1..]' % self.name
|
|
print ' -i <dev> Specify device (e.g. "hci0" or "/org/bluez/hci0")'
|
|
print ' -l Listen for events (no command required)'
|
|
print ' -h Show this help'
|
|
print 'Manager commands:'
|
|
for cmd in mgr_cmds:
|
|
print '\t%s' % cmd
|
|
print 'Adapter commands:'
|
|
for cmd in dev_cmds:
|
|
print '\t%s' % cmd
|
|
|
|
#@dbus.decorators.explicitly_pass_message
|
|
def dev_signal_handler(*args, **keywords):
|
|
dbus_message = keywords["dbus_message"]
|
|
print '%s - %s: ' % (dbus_message.get_member(), dbus_message.get_path()),
|
|
for arg in args[1:]:
|
|
print '%s ' % arg,
|
|
print
|
|
|
|
#@dbus.decorators.explicitly_pass_message
|
|
def mgr_signal_handler(*args, **keywords):
|
|
dbus_message = keywords["dbus_message"]
|
|
print '%s: ' % dbus_message.get_member()
|
|
for arg in args[1:]:
|
|
print '%s ' % arg,
|
|
print
|
|
|
|
def signal_cb(self, sig, frame):
|
|
print 'Caught signal, exiting'
|
|
if self.at_interrupt:
|
|
self.at_interrupt()
|
|
self.main_loop.quit()
|
|
|
|
def call_mgr_dbus_func(self):
|
|
if self.cmd == 'InterfaceVersion':
|
|
try:
|
|
print self.manager.InterfaceVersion()
|
|
except dbus.DBusException, e:
|
|
print 'Sending %s failed: %s' % (self.cmd, e)
|
|
if self.cmd == 'ListAdapters':
|
|
try:
|
|
devices = self.manager.ListAdapters()
|
|
except dbus.DBusException, e:
|
|
print 'Sending %s failed: %s' % (self.cmd, e)
|
|
sys.exit(1)
|
|
for device in devices:
|
|
print device
|
|
elif self.cmd == 'DefaultAdapter':
|
|
try:
|
|
print self.manager.DefaultAdapter()
|
|
except dbus.DBusException, e:
|
|
print 'Sending %s failed: %s' % (self.cmd, e)
|
|
sys.exit(1)
|
|
|
|
def call_dev_dbus_func(self):
|
|
try:
|
|
if self.cmd == 'GetAddress':
|
|
print self.device.GetAddress()
|
|
elif self.cmd == 'GetManufacturer':
|
|
print self.device.GetManufacturer()
|
|
elif self.cmd == 'GetVersion':
|
|
print self.device.GetVersion()
|
|
elif self.cmd == 'GetRevision':
|
|
print self.device.GetRevision()
|
|
elif self.cmd == 'GetCompany':
|
|
print self.device.GetCompany()
|
|
elif self.cmd == 'GetMode':
|
|
print self.device.GetMode()
|
|
elif self.cmd == 'SetMode':
|
|
if len(self.cmd_args) == 1:
|
|
self.device.SetMode(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> SetMode scan_mode' % self.name
|
|
elif self.cmd == 'GetDiscoverableTimeout':
|
|
print '%u' % (self.device.GetDiscoverableTimeout())
|
|
elif self.cmd == 'SetDiscoverableTimeout':
|
|
if len(self.cmd_args) == 1:
|
|
self.device.SetDiscoverableTimeout(dbus.UInt32(self.cmd_args[0]))
|
|
else:
|
|
print 'Usage: %s -i <dev> SetDiscoverableTimeout timeout' % self.name
|
|
elif self.cmd == 'IsConnectable':
|
|
print self.device.IsConnectable()
|
|
elif self.cmd == 'IsDiscoverable':
|
|
print self.device.IsDiscoverable()
|
|
elif self.cmd == 'IsConnected':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.IsConnected(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> IsConnected address' % self.name
|
|
elif self.cmd == 'ListConnections':
|
|
print self.device.ListConnections()
|
|
elif self.cmd == 'GetMajorClass':
|
|
print self.device.GetMajorClass()
|
|
elif self.cmd == 'ListAvailableMinorClasses':
|
|
print self.device.ListAvailableMinorClasses()
|
|
elif self.cmd == 'GetMinorClass':
|
|
print self.device.GetMinorClass()
|
|
elif self.cmd == 'SetMinorClass':
|
|
if len(self.cmd_args) == 1:
|
|
self.device.SetMinorClass(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> SetMinorClass minor' % self.name
|
|
elif self.cmd == 'GetServiceClasses':
|
|
classes = self.device.GetServiceClasses()
|
|
for clas in classes:
|
|
print clas,
|
|
elif self.cmd == 'GetName':
|
|
print self.device.GetName()
|
|
elif self.cmd == 'SetName':
|
|
if len(self.cmd_args) == 1:
|
|
self.device.SetName(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> SetName newname' % self.name
|
|
elif self.cmd == 'GetRemoteName':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteName(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteName address' % self.name
|
|
elif self.cmd == 'GetRemoteVersion':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteVersion(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteVersion address' % self.name
|
|
elif self.cmd == 'GetRemoteRevision':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteRevision(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteRevision address' % self.name
|
|
elif self.cmd == 'GetRemoteManufacturer':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteManufacturer(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteManufacturer address' % self.name
|
|
elif self.cmd == 'GetRemoteCompany':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteCompany(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteCompany address' % self.name
|
|
elif self.cmd == 'GetRemoteAlias':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteAlias(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteAlias address' % self.name
|
|
elif self.cmd == 'GetRemoteMajorClass':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteMajorClass(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteMajorClass address' % self.name
|
|
elif self.cmd == 'GetRemoteMinorClass':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteMinorClass(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteMinorClass address' % self.name
|
|
elif self.cmd == 'GetRemoteServiceClasses':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetRemoteServiceClasses(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetRemoteServiceClasses address' % self.name
|
|
elif self.cmd == 'SetRemoteAlias':
|
|
if len(self.cmd_args) == 2:
|
|
self.device.SetRemoteAlias(self.cmd_args[0], self.cmd_args[1])
|
|
else:
|
|
print 'Usage: %s -i <dev> SetRemoteAlias address alias' % self.name
|
|
elif self.cmd == 'ClearRemoteAlias':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.ClearRemoteAlias(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> ClearRemoteAlias address' % self.name
|
|
elif self.cmd == 'LastSeen':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.LastSeen(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> LastSeen address' % self.name
|
|
elif self.cmd == 'LastUsed':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.LastUsed(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> LastUsed address' % self.name
|
|
elif self.cmd == 'DisconnectRemoteDevice':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.LastUsed(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> DisconnectRemoteDevice address' % self.name
|
|
elif self.cmd == 'CreateBonding':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.CreateBonding(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> CreateBonding address' % self.name
|
|
elif self.cmd == 'RemoveBonding':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.RemoveBonding(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> RemoveBonding address' % self.name
|
|
elif self.cmd == 'CancelBondingProcess':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.CancelBondingProcess(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> CancelBondingProcess address' % self.name
|
|
elif self.cmd == 'HasBonding':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.HasBonding(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> HasBonding address' % self.name
|
|
elif self.cmd == 'ListBondings':
|
|
bondings = self.device.ListBondings()
|
|
for bond in bondings:
|
|
print bond,
|
|
elif self.cmd == 'GetPinCodeLength':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetPinCodeLength(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetPinCodeLength address' % self.name
|
|
elif self.cmd == 'GetEncryptionKeySize':
|
|
if len(self.cmd_args) == 1:
|
|
print self.device.GetEncryptionKeySize(self.cmd_args[0])
|
|
else:
|
|
print 'Usage: %s -i <dev> GetEncryptionKeySize address' % self.name
|
|
elif self.cmd == 'DiscoverDevices':
|
|
print self.device.DiscoverDevices()
|
|
elif self.cmd == 'DiscoverDevicesWithoutNameResolving':
|
|
print self.device.DiscoverDevicesWithoutNameResolving()
|
|
elif self.cmd == 'ListRemoteDevices':
|
|
devices = self.device.ListRemoteDevices()
|
|
for device in devices:
|
|
print device,
|
|
elif self.cmd == 'ListRecentRemoteDevices':
|
|
if len(self.cmd_args) == 1:
|
|
devices = self.device.ListRecentRemoteDevices(self.cmd_args[0])
|
|
for device in devices:
|
|
print device,
|
|
else:
|
|
print 'Usage: %s -i <dev> ListRecentRemoteDevices date' % self.name
|
|
else:
|
|
# FIXME: remove at future version
|
|
print 'Script Error: Method %s not found. Maybe a mispelled word.' % (self.cmd_args)
|
|
except dbus.DBusException, e:
|
|
print '%s failed: %s' % (self.cmd, e)
|
|
sys.exit(1)
|
|
|
|
def run(self):
|
|
# Manager methods
|
|
if self.listen:
|
|
self.dbus_mgr_sig_setup()
|
|
self.dbus_dev_sig_setup()
|
|
print 'Listening for events...'
|
|
|
|
if self.cmd in mgr_cmds:
|
|
try:
|
|
self.dbus_mgr_setup()
|
|
except dbus.DBusException, e:
|
|
print 'Failed to setup manager interface: %s' % e
|
|
sys.exit(1)
|
|
self.call_mgr_dbus_func()
|
|
elif self.cmd in dev_cmds:
|
|
try:
|
|
self.dbus_dev_setup()
|
|
except dbus.DBusException, e:
|
|
print 'Failed to setup device interface: %s' % e
|
|
sys.exit(1)
|
|
self.call_dev_dbus_func()
|
|
elif not self.listen:
|
|
print 'Unknown command: %s' % self.cmd
|
|
self.usage()
|
|
sys.exit(1)
|
|
|
|
if self.listen:
|
|
signal(SIGINT, self.signal_cb)
|
|
signal(SIGTERM, self.signal_cb)
|
|
self.main_loop = gobject.MainLoop()
|
|
self.main_loop.run()
|
|
|
|
if __name__ == '__main__':
|
|
gobject.threads_init()
|
|
dbus.glib.init_threads()
|
|
|
|
tester = Tester(sys.argv)
|
|
tester.run()
|