mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2024-11-23 12:14:26 +08:00
4e54b08c94
The python test app simulates an application registering to BlueZ as a Battery Provider providing three fake batteries drained periodically. Reviewed-by: Miao-chen Chou <mcchou@chromium.org>
233 lines
6.8 KiB
Python
Executable File
233 lines
6.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
|
|
import dbus
|
|
import dbus.exceptions
|
|
import dbus.mainloop.glib
|
|
import dbus.service
|
|
|
|
try:
|
|
from gi.repository import GObject
|
|
except ImportError:
|
|
import gobject as GObject
|
|
import sys
|
|
|
|
mainloop = None
|
|
app = None
|
|
bus = None
|
|
|
|
BLUEZ_SERVICE_NAME = 'org.bluez'
|
|
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
|
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
|
|
|
BATTERY_PROVIDER_MANAGER_IFACE = 'org.bluez.BatteryProviderManager1'
|
|
BATTERY_PROVIDER_IFACE = 'org.bluez.BatteryProvider1'
|
|
BATTERY_PROVIDER_PATH = '/path/to/provider'
|
|
|
|
BATTERY_PATH1 = '11_11_11_11_11_11'
|
|
BATTERY_PATH2 = '22_22_22_22_22_22'
|
|
BATTERY_PATH3 = '33_33_33_33_33_33'
|
|
|
|
class InvalidArgsException(dbus.exceptions.DBusException):
|
|
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
|
|
|
|
|
|
class Application(dbus.service.Object):
|
|
def __init__(self, bus):
|
|
self.path = BATTERY_PROVIDER_PATH
|
|
self.services = []
|
|
self.batteries = []
|
|
dbus.service.Object.__init__(self, bus, self.path)
|
|
|
|
def get_path(self):
|
|
return dbus.ObjectPath(self.path)
|
|
|
|
def add_battery(self, battery):
|
|
self.batteries.append(battery)
|
|
self.InterfacesAdded(battery.get_path(), battery.get_properties())
|
|
GObject.timeout_add(1000, drain_battery, battery)
|
|
|
|
def remove_battery(self, battery):
|
|
self.batteries.remove(battery)
|
|
self.InterfacesRemoved(battery.get_path(), [BATTERY_PROVIDER_IFACE])
|
|
|
|
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
|
|
def GetManagedObjects(self):
|
|
response = {}
|
|
print('GetManagedObjects called')
|
|
|
|
for battery in self.batteries:
|
|
response[battery.get_path()] = battery.get_properties()
|
|
|
|
return response
|
|
|
|
@dbus.service.signal(DBUS_OM_IFACE, signature='oa{sa{sv}}')
|
|
def InterfacesAdded(self, object_path, interfaces_and_properties):
|
|
return
|
|
|
|
@dbus.service.signal(DBUS_OM_IFACE, signature='oas')
|
|
def InterfacesRemoved(self, object_path, interfaces):
|
|
return
|
|
|
|
|
|
class Battery(dbus.service.Object):
|
|
"""
|
|
org.bluez.BatteryProvider1 interface implementation
|
|
"""
|
|
def __init__(self, bus, dev, percentage, source = None):
|
|
self.path = BATTERY_PROVIDER_PATH + '/dev_' + dev
|
|
self.dev_path = '/org/bluez/hci0/dev_' + dev
|
|
self.bus = bus
|
|
self.percentage = percentage
|
|
self.source = source
|
|
dbus.service.Object.__init__(self, bus, self.path)
|
|
|
|
def get_battery_properties(self):
|
|
properties = {}
|
|
if self.percentage != None:
|
|
properties['Percentage'] = dbus.Byte(self.percentage)
|
|
if self.source != None:
|
|
properties['Source'] = self.source
|
|
properties['Device'] = dbus.ObjectPath(self.dev_path)
|
|
return properties
|
|
|
|
def get_properties(self):
|
|
return { BATTERY_PROVIDER_IFACE: self.get_battery_properties() }
|
|
|
|
def get_path(self):
|
|
return dbus.ObjectPath(self.path)
|
|
|
|
def set_percentage(self, percentage):
|
|
if percentage < 0 or percentage > 100:
|
|
print('percentage not valid')
|
|
return
|
|
|
|
self.percentage = percentage
|
|
print('battery %s percentage %d' % (self.path, self.percentage))
|
|
self.PropertiesChanged(
|
|
BATTERY_PROVIDER_IFACE, self.get_battery_properties())
|
|
|
|
@dbus.service.method(DBUS_PROP_IFACE,
|
|
in_signature='s',
|
|
out_signature='a{sv}')
|
|
def GetAll(self, interface):
|
|
if interface != BATTERY_PROVIDER_IFACE:
|
|
raise InvalidArgsException()
|
|
|
|
return self.get_properties()[BATTERY_PROVIDER_IFACE]
|
|
|
|
@dbus.service.signal(DBUS_PROP_IFACE, signature='sa{sv}')
|
|
def PropertiesChanged(self, interface, properties):
|
|
return
|
|
|
|
|
|
def add_late_battery():
|
|
app.add_battery(Battery(bus, BATTERY_PATH3, 70, 'Protocol 2'))
|
|
|
|
|
|
def drain_battery(battery):
|
|
new_percentage = 100
|
|
if battery.percentage != None:
|
|
new_percentage = battery.percentage - 5
|
|
if new_percentage < 0:
|
|
new_percentage = 0
|
|
|
|
battery.set_percentage(new_percentage)
|
|
|
|
if new_percentage <= 0:
|
|
return False
|
|
|
|
return True
|
|
|
|
def register_provider_cb():
|
|
print('Battery Provider registered')
|
|
|
|
# Battery added early right after RegisterBatteryProvider succeeds
|
|
app.add_battery(Battery(bus, BATTERY_PATH2, None))
|
|
# Battery added later
|
|
GObject.timeout_add(5000, add_late_battery)
|
|
|
|
|
|
def register_provider_error_cb(error):
|
|
print('Failed to register Battery Provider: ' + str(error))
|
|
mainloop.quit()
|
|
|
|
|
|
def find_manager(bus):
|
|
remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'),
|
|
DBUS_OM_IFACE)
|
|
objects = remote_om.GetManagedObjects()
|
|
|
|
for o, props in objects.items():
|
|
if BATTERY_PROVIDER_MANAGER_IFACE in props.keys():
|
|
return o
|
|
|
|
return None
|
|
|
|
|
|
def unregister_provider_cb():
|
|
print('Battery Provider unregistered')
|
|
|
|
|
|
def unregister_provider_error_cb(error):
|
|
print('Failed to unregister Battery Provider: ' + str(error))
|
|
|
|
|
|
def unregister_battery_provider(battery_provider_manager):
|
|
battery_provider_manager.UnregisterBatteryProvider(BATTERY_PROVIDER_PATH,
|
|
reply_handler=unregister_provider_cb,
|
|
error_handler=unregister_provider_error_cb)
|
|
|
|
|
|
def remove_battery(app, battery):
|
|
app.remove_battery(battery)
|
|
|
|
|
|
"""
|
|
Simulates an application registering to BlueZ as a Battery Provider providing
|
|
fake batteries drained periodically.
|
|
"""
|
|
def main():
|
|
global mainloop, bus, app
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
bus = dbus.SystemBus()
|
|
|
|
manager_path = find_manager(bus)
|
|
if not manager_path:
|
|
print('BatteryProviderManager1 interface not found')
|
|
return
|
|
|
|
print('BatteryProviderManager1 path = ', manager_path)
|
|
|
|
battery_provider_manager = dbus.Interface(
|
|
bus.get_object(BLUEZ_SERVICE_NAME, manager_path),
|
|
BATTERY_PROVIDER_MANAGER_IFACE)
|
|
|
|
app = Application(bus)
|
|
|
|
# Battery pre-added before RegisterBatteryProvider
|
|
battery1 = Battery(bus, BATTERY_PATH1, 87, 'Protocol 1')
|
|
app.add_battery(battery1)
|
|
|
|
mainloop = GObject.MainLoop()
|
|
|
|
print('Registering Battery Provider...')
|
|
|
|
battery_provider_manager.RegisterBatteryProvider(BATTERY_PROVIDER_PATH,
|
|
reply_handler=register_provider_cb,
|
|
error_handler=register_provider_error_cb)
|
|
|
|
# Unregister the Battery Provider after an arbitrary amount of time
|
|
GObject.timeout_add(
|
|
12000, unregister_battery_provider, battery_provider_manager)
|
|
# Simulate battery removal by a provider
|
|
GObject.timeout_add(8000, remove_battery, app, battery1)
|
|
|
|
mainloop.run()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|