mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2024-11-23 12:14:26 +08:00
d31f04aa92
Currently we have a mix of /usr/bin/python, /usr/bin/python3 and /usr/bin/env python3. Use the latter since is the more common way of handling this, plus it allows people to override the system python (for what ever reason). Inspired by a Debian patch, doing a mass /usr/bin/python{,3} conversion. Cc: Nobuhiro Iwamatsu <iwamatsu@debian.org>
404 lines
13 KiB
Python
404 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
|
|
import argparse
|
|
import dbus
|
|
import dbus.mainloop.glib
|
|
import dbus.service
|
|
import json
|
|
import time
|
|
|
|
from threading import Thread
|
|
|
|
try:
|
|
from gi.repository import GObject # python3
|
|
except ImportError:
|
|
import gobject as GObject # python2
|
|
|
|
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
|
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
|
|
|
BLUEZ_SERVICE_NAME = 'org.bluez'
|
|
|
|
ADV_MONITOR_MANAGER_IFACE = 'org.bluez.AdvertisementMonitorManager1'
|
|
ADV_MONITOR_IFACE = 'org.bluez.AdvertisementMonitor1'
|
|
ADV_MONITOR_APP_BASE_PATH = '/org/bluez/example/adv_monitor_app'
|
|
|
|
|
|
class AdvMonitor(dbus.service.Object):
|
|
|
|
# Indexes of the Monitor object parameters in a monitor data list.
|
|
MONITOR_TYPE = 0
|
|
RSSI_FILTER = 1
|
|
PATTERNS = 2
|
|
|
|
# Indexes of the RSSI filter parameters in a monitor data list.
|
|
RSSI_H_THRESH = 0
|
|
RSSI_H_TIMEOUT = 1
|
|
RSSI_L_THRESH = 2
|
|
RSSI_L_TIMEOUT = 3
|
|
|
|
# Indexes of the Patterns filter parameters in a monitor data list.
|
|
PATTERN_START_POS = 0
|
|
PATTERN_AD_TYPE = 1
|
|
PATTERN_DATA = 2
|
|
|
|
def __init__(self, bus, app_path, monitor_id, monitor_data):
|
|
self.path = app_path + '/monitor' + str(monitor_id)
|
|
self.bus = bus
|
|
|
|
self._set_type(monitor_data[self.MONITOR_TYPE])
|
|
self._set_rssi(monitor_data[self.RSSI_FILTER])
|
|
self._set_patterns(monitor_data[self.PATTERNS])
|
|
|
|
super(AdvMonitor, self).__init__(self.bus, self.path)
|
|
|
|
|
|
def get_path(self):
|
|
return dbus.ObjectPath(self.path)
|
|
|
|
|
|
def get_properties(self):
|
|
properties = dict()
|
|
properties['Type'] = dbus.String(self.monitor_type)
|
|
properties['RSSIHighThreshold'] = dbus.Int16(self.rssi_h_thresh)
|
|
properties['RSSIHighTimeout'] = dbus.UInt16(self.rssi_h_timeout)
|
|
properties['RSSILowThreshold'] = dbus.Int16(self.rssi_l_thresh)
|
|
properties['RSSILowTimeout'] = dbus.UInt16(self.rssi_l_timeout)
|
|
properties['Patterns'] = dbus.Array(self.patterns, signature='(yyay)')
|
|
return {ADV_MONITOR_IFACE: properties}
|
|
|
|
|
|
def _set_type(self, monitor_type):
|
|
self.monitor_type = monitor_type
|
|
|
|
|
|
def _set_rssi(self, rssi):
|
|
self.rssi_h_thresh = rssi[self.RSSI_H_THRESH]
|
|
self.rssi_h_timeout = rssi[self.RSSI_H_TIMEOUT]
|
|
self.rssi_l_thresh = rssi[self.RSSI_L_THRESH]
|
|
self.rssi_l_timeout = rssi[self.RSSI_L_TIMEOUT]
|
|
|
|
|
|
def _set_patterns(self, patterns):
|
|
self.patterns = []
|
|
for pattern in patterns:
|
|
start_pos = dbus.Byte(pattern[self.PATTERN_START_POS])
|
|
ad_type = dbus.Byte(pattern[self.PATTERN_AD_TYPE])
|
|
ad_data = []
|
|
for byte in pattern[self.PATTERN_DATA]:
|
|
ad_data.append(dbus.Byte(byte))
|
|
adv_pattern = dbus.Struct((start_pos, ad_type, ad_data),
|
|
signature='yyay')
|
|
self.patterns.append(adv_pattern)
|
|
|
|
|
|
def remove_monitor(self):
|
|
self.remove_from_connection()
|
|
|
|
|
|
@dbus.service.method(DBUS_PROP_IFACE,
|
|
in_signature='s',
|
|
out_signature='a{sv}')
|
|
def GetAll(self, interface):
|
|
print('{}: {} GetAll'.format(self.path, interface))
|
|
if interface != ADV_MONITOR_IFACE:
|
|
print('{}: GetAll: Invalid arg {}'.format(self.path, interface))
|
|
return {}
|
|
|
|
return self.get_properties()[ADV_MONITOR_IFACE]
|
|
|
|
|
|
@dbus.service.method(ADV_MONITOR_IFACE,
|
|
in_signature='',
|
|
out_signature='')
|
|
def Activate(self):
|
|
print('{}: Monitor Activated'.format(self.path))
|
|
|
|
|
|
@dbus.service.method(ADV_MONITOR_IFACE,
|
|
in_signature='',
|
|
out_signature='')
|
|
def Release(self):
|
|
print('{}: Monitor Released'.format(self.path))
|
|
|
|
|
|
@dbus.service.method(ADV_MONITOR_IFACE,
|
|
in_signature='o',
|
|
out_signature='')
|
|
def DeviceFound(self, device):
|
|
print('{}: {} Device Found'.format(self.path, device))
|
|
|
|
|
|
@dbus.service.method(ADV_MONITOR_IFACE,
|
|
in_signature='o',
|
|
out_signature='')
|
|
def DeviceLost(self, device):
|
|
print('{}: {} Device Lost'.format(self.path, device))
|
|
|
|
|
|
class AdvMonitorApp(dbus.service.Object):
|
|
|
|
def __init__(self, bus, advmon_manager, app_id):
|
|
self.bus = bus
|
|
self.advmon_mgr = advmon_manager
|
|
self.app_path = ADV_MONITOR_APP_BASE_PATH + str(app_id)
|
|
|
|
self.monitors = dict()
|
|
|
|
super(AdvMonitorApp, self).__init__(self.bus, self.app_path)
|
|
|
|
|
|
def get_app_path(self):
|
|
return dbus.ObjectPath(self.app_path)
|
|
|
|
|
|
def add_monitor(self, monitor_data):
|
|
monitor_id = 0
|
|
while monitor_id in self.monitors:
|
|
monitor_id += 1
|
|
|
|
monitor = AdvMonitor(self.bus, self.app_path, monitor_id, monitor_data)
|
|
|
|
# Emit the InterfacesAdded signal once the Monitor object is created.
|
|
self.InterfacesAdded(monitor.get_path(), monitor.get_properties())
|
|
|
|
self.monitors[monitor_id] = monitor
|
|
|
|
return monitor_id
|
|
|
|
|
|
def remove_monitor(self, monitor_id):
|
|
monitor = self.monitors.pop(monitor_id, None)
|
|
if not monitor:
|
|
return False
|
|
|
|
# Emit the InterfacesRemoved signal before removing the Monitor object.
|
|
self.InterfacesRemoved(monitor.get_path(),
|
|
monitor.get_properties().keys())
|
|
|
|
monitor.remove_monitor()
|
|
|
|
return True
|
|
|
|
|
|
def register_app(self):
|
|
self.register_successful = None
|
|
|
|
def register_cb():
|
|
print('{}: RegisterMonitor successful'.format(self.app_path))
|
|
self.register_successful = True
|
|
|
|
def register_error_cb(error):
|
|
print('{}: RegisterMonitor failed: {}'.format(self.app_path,
|
|
str(error)))
|
|
self.register_successful = False
|
|
|
|
self.advmon_mgr.RegisterMonitor(self.get_app_path(),
|
|
reply_handler=register_cb,
|
|
error_handler=register_error_cb)
|
|
|
|
# Wait for the reply.
|
|
while self.register_successful is None:
|
|
pass
|
|
|
|
return self.register_successful
|
|
|
|
|
|
def unregister_app(self):
|
|
self.unregister_successful = None
|
|
|
|
def unregister_cb():
|
|
print('{}: UnregisterMonitor successful'.format(self.app_path))
|
|
self.unregister_successful = True
|
|
|
|
def unregister_error_cb(error):
|
|
print('{}: UnregisterMonitor failed: {}'.format(self.app_path,
|
|
str(error)))
|
|
self.unregister_successful = False
|
|
|
|
self.advmon_mgr.UnregisterMonitor(self.get_app_path(),
|
|
reply_handler=unregister_cb,
|
|
error_handler=unregister_error_cb)
|
|
|
|
# Wait for the reply.
|
|
while self.unregister_successful is None:
|
|
pass
|
|
|
|
return self.unregister_successful
|
|
|
|
|
|
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
|
|
def GetManagedObjects(self):
|
|
print('{}: GetManagedObjects'.format(self.app_path))
|
|
objects = dict()
|
|
for monitor_id in self.monitors:
|
|
monitor = self.monitors[monitor_id]
|
|
objects[monitor.get_path()] = monitor.get_properties()
|
|
|
|
return objects
|
|
|
|
|
|
@dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}')
|
|
def InterfacesAdded(self, object_path, interfaces_and_properties):
|
|
# Invoking this method emits the InterfacesAdded signal,
|
|
# nothing needs to be done here.
|
|
return
|
|
|
|
|
|
@dbus.service.signal(DBUS_OM_IFACE, signature='oas')
|
|
def InterfacesRemoved(self, object_path, interfaces):
|
|
# Invoking this method emits the InterfacesRemoved signal,
|
|
# nothing needs to be done here.
|
|
return
|
|
|
|
|
|
def read_adapter_supported_monitor_types(adapter_props):
|
|
types = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE,
|
|
'SupportedMonitorTypes',
|
|
dbus_interface=DBUS_PROP_IFACE))
|
|
return json.loads(types)
|
|
|
|
|
|
def read_adapter_supported_monitor_features(adapter_props):
|
|
features = json.dumps(adapter_props.Get(ADV_MONITOR_MANAGER_IFACE,
|
|
'SupportedFeatures',
|
|
dbus_interface=DBUS_PROP_IFACE))
|
|
return json.loads(features)
|
|
|
|
|
|
def print_supported_types_and_features(adapter_props):
|
|
supported_types = read_adapter_supported_monitor_types(adapter_props)
|
|
for supported_type in supported_types:
|
|
print(supported_type)
|
|
|
|
supported_features = read_adapter_supported_monitor_features(adapter_props)
|
|
for supported_feature in supported_features:
|
|
print(supported_feature)
|
|
|
|
|
|
def find_advmon_mgr(bus, adapter):
|
|
return dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter),
|
|
ADV_MONITOR_MANAGER_IFACE)
|
|
|
|
|
|
def find_adapter(bus):
|
|
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
|
|
DBUS_OM_IFACE)
|
|
objects = remote_om.GetManagedObjects()
|
|
|
|
adapter = None
|
|
adapter_props = None
|
|
|
|
for o, props in objects.items():
|
|
if ADV_MONITOR_MANAGER_IFACE in props:
|
|
adapter = o
|
|
break
|
|
|
|
if adapter:
|
|
# Turn on the bluetooth adapter.
|
|
adapter_props = dbus.Interface(
|
|
bus.get_object(BLUEZ_SERVICE_NAME, adapter),
|
|
DBUS_PROP_IFACE)
|
|
adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
|
|
|
|
return adapter, adapter_props
|
|
|
|
|
|
def test(bus, mainloop, advmon_mgr, app_id):
|
|
# Create an App instance.
|
|
app = AdvMonitorApp(bus, advmon_mgr, app_id)
|
|
|
|
# Create two monitor objects before registering the app. No Activate() or
|
|
# Release() should get called yet as the app is not registered.
|
|
data0 = [
|
|
'invalid_patterns',
|
|
[-50, 1, -70, 1],
|
|
[[0, 0x03, [0x12, 0x18]]] # Service Class UUID is 0x1812 (HOG)
|
|
]
|
|
data1 = [
|
|
'or_patterns',
|
|
[127, 0, 127, 0],
|
|
[[5, 0x09, [ord('_')]]] # 5th character of the Local Name is '_'
|
|
]
|
|
monitor0 = app.add_monitor(data0)
|
|
monitor1 = app.add_monitor(data1)
|
|
|
|
# Register the app root path to expose advertisement monitors.
|
|
# Release() should get called on monitor0 - incorrect monitor type.
|
|
# Activate() should get called on monitor1.
|
|
ret = app.register_app()
|
|
if not ret:
|
|
print('RegisterMonitor failed.')
|
|
mainloop.quit()
|
|
exit(-1)
|
|
|
|
# Create two more monitor objects.
|
|
# Release() should get called on monitor2 - incorrect RSSI Filter values.
|
|
# Activate() should get called on monitor3.
|
|
data2 = [
|
|
'or_patterns',
|
|
[-50, 1, -30, 1],
|
|
[[0, 0x19, [0xC2, 0x03]]] # Appearance is 0xC203 (Mouse)
|
|
]
|
|
data3 = [
|
|
'or_patterns',
|
|
[-50, 1, -70, 1],
|
|
[[0, 0x03, [0x12, 0x18]], [0, 0x19, [0xC2, 0x03]]]
|
|
]
|
|
monitor2 = app.add_monitor(data2)
|
|
monitor3 = app.add_monitor(data3)
|
|
|
|
# Run until user hits the 'Enter' key. If any peer device is advertising
|
|
# during this time, DeviceFound() should get triggered for monitors
|
|
# matching the advertisements.
|
|
raw_input('Press "Enter" key to quit...\n')
|
|
|
|
# Remove a monitor. DeviceFound() for this monitor should not get
|
|
# triggered any more.
|
|
app.remove_monitor(monitor1)
|
|
|
|
# Unregister the app. Release() should get invoked on active monitors,
|
|
# monitor3 in this case.
|
|
app.unregister_app()
|
|
|
|
mainloop.quit()
|
|
|
|
|
|
def main(app_id):
|
|
# Initialize threads in gobject/dbus-glib before creating local threads.
|
|
GObject.threads_init()
|
|
dbus.mainloop.glib.threads_init()
|
|
|
|
# Arrange for the GLib main loop to be the default.
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
bus = dbus.SystemBus()
|
|
mainloop = GObject.MainLoop()
|
|
|
|
# Find bluetooth adapter and power it on.
|
|
adapter, adapter_props = find_adapter(bus)
|
|
if not adapter or not adapter_props:
|
|
print('Bluetooth adapter not found.')
|
|
exit(-1)
|
|
|
|
# Read supported types and find AdvertisementMonitorManager1 interface.
|
|
print_supported_types_and_features(adapter_props)
|
|
advmon_mgr = find_advmon_mgr(bus, adapter)
|
|
if not advmon_mgr :
|
|
print('AdvertisementMonitorManager1 interface not found.')
|
|
exit(-1)
|
|
|
|
Thread(target=test, args=(bus, mainloop, advmon_mgr, app_id)).start()
|
|
|
|
mainloop.run() # blocks until mainloop.quit() is called
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--app_id', default=0, type=int, help='use this App-ID '
|
|
'for creating dbus objects (default: 0)')
|
|
args = parser.parse_args()
|
|
|
|
main(args.app_id)
|