test: Update GATT examples with the new API

This commit is contained in:
Luiz Augusto von Dentz 2016-05-06 11:36:28 +03:00
parent bf370f3bd6
commit 206c0c88ed
3 changed files with 172 additions and 50 deletions

View File

@ -113,7 +113,7 @@ def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
def start_client():
# Read the Body Sensor Location value and print it asynchronously.
body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
body_snsr_loc_chrc[0].ReadValue({}, reply_handler=body_sensor_val_cb,
error_handler=generic_error_cb,
dbus_interface=GATT_CHRC_IFACE)

View File

@ -166,13 +166,15 @@ class Characteristic(dbus.service.Object):
return self.get_properties[GATT_CHRC_IFACE]
@dbus.service.method(GATT_CHRC_IFACE, out_signature='ay')
def ReadValue(self):
@dbus.service.method(GATT_CHRC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
print('Default ReadValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_CHRC_IFACE, in_signature='ay')
def WriteValue(self, value):
@dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('Default WriteValue called, returning error')
raise NotSupportedException()
@ -222,13 +224,15 @@ class Descriptor(dbus.service.Object):
return self.get_properties[GATT_CHRC_IFACE]
@dbus.service.method(GATT_DESC_IFACE, out_signature='ay')
def ReadValue(self):
@dbus.service.method(GATT_DESC_IFACE,
in_signature='a{sv}',
out_signature='ay')
def ReadValue(self, options):
print ('Default ReadValue called, returning error')
raise NotSupportedException()
@dbus.service.method(GATT_DESC_IFACE, in_signature='ay')
def WriteValue(self, value):
@dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
def WriteValue(self, value, options):
print('Default WriteValue called, returning error')
raise NotSupportedException()
@ -317,7 +321,7 @@ class BodySensorLocationChrc(Characteristic):
['read'],
service)
def ReadValue(self):
def ReadValue(self, options):
# Return 'Chest' as the sensor location.
return [ 0x01 ]
@ -331,7 +335,7 @@ class HeartRateControlPointChrc(Characteristic):
['write'],
service)
def WriteValue(self, value):
def WriteValue(self, value, options):
print('Heart Rate Control Point WriteValue called')
if len(value) != 1:
@ -393,7 +397,7 @@ class BatteryLevelCharacteristic(Characteristic):
self.notify_battery_level()
return True
def ReadValue(self):
def ReadValue(self, options):
print('Battery Level read: ' + repr(self.battery_lvl))
return [dbus.Byte(self.battery_lvl)]
@ -425,6 +429,7 @@ class TestService(Service):
Service.__init__(self, bus, index, self.TEST_SVC_UUID, False)
self.add_characteristic(TestCharacteristic(bus, 0, self))
self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))
self.add_characteristic(TestSecureCharacteristic(bus, 2, self))
class TestCharacteristic(Characteristic):
"""
@ -445,11 +450,11 @@ class TestCharacteristic(Characteristic):
self.add_descriptor(
CharacteristicUserDescriptionDescriptor(bus, 1, self))
def ReadValue(self):
def ReadValue(self, options):
print('TestCharacteristic Read: ' + repr(self.value))
return self.value
def WriteValue(self, value):
def WriteValue(self, value, options):
print('TestCharacteristic Write: ' + repr(value))
self.value = value
@ -468,7 +473,7 @@ class TestDescriptor(Descriptor):
['read', 'write'],
characteristic)
def ReadValue(self):
def ReadValue(self, options):
return [
dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
]
@ -491,10 +496,10 @@ class CharacteristicUserDescriptionDescriptor(Descriptor):
['read', 'write'],
characteristic)
def ReadValue(self):
def ReadValue(self, options):
return self.value
def WriteValue(self, value):
def WriteValue(self, value, options):
if not self.writable:
raise NotPermittedException()
self.value = value
@ -517,11 +522,11 @@ class TestEncryptCharacteristic(Characteristic):
self.add_descriptor(
CharacteristicUserDescriptionDescriptor(bus, 3, self))
def ReadValue(self):
def ReadValue(self, options):
print('TestCharacteristic Read: ' + repr(self.value))
return self.value
def WriteValue(self, value):
def WriteValue(self, value, options):
print('TestCharacteristic Write: ' + repr(value))
self.value = value
@ -539,7 +544,54 @@ class TestEncryptDescriptor(Descriptor):
['encrypt-read', 'encrypt-write'],
characteristic)
def ReadValue(self):
def ReadValue(self, options):
return [
dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
]
class TestSecureCharacteristic(Characteristic):
"""
Dummy test characteristic requiring secure connection.
"""
TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'
def __init__(self, bus, index, service):
Characteristic.__init__(
self, bus, index,
self.TEST_CHRC_UUID,
['secure-read', 'secure-write'],
service)
self.value = []
self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
self.add_descriptor(
CharacteristicUserDescriptionDescriptor(bus, 3, self))
def ReadValue(self, options):
print('TestCharacteristic Read: ' + repr(self.value))
return self.value
def WriteValue(self, value, options):
print('TestCharacteristic Write: ' + repr(value))
self.value = value
class TestSecureDescriptor(Descriptor):
"""
Dummy test descriptor requiring secure connection. Returns a static value.
"""
TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'
def __init__(self, bus, index, characteristic):
Descriptor.__init__(
self, bus, index,
self.TEST_DESC_UUID,
['secure-read', 'secure-write'],
characteristic)
def ReadValue(self, options):
return [
dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
]

View File

@ -15,46 +15,116 @@ except ImportError:
import gobject as GObject
import bluezutils
class GattProfile(dbus.service.Object):
@dbus.service.method("org.bluez.GattProfile1",
in_signature="", out_signature="")
def Release(self):
print("Release")
mainloop.quit()
BLUEZ_SERVICE_NAME = 'org.bluez'
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
GATT_PROFILE_IFACE = 'org.bluez.GattProfile1'
class InvalidArgsException(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class Application(dbus.service.Object):
def __init__(self, bus):
self.path = '/'
self.profiles = []
dbus.service.Object.__init__(self, bus, self.path)
def get_path(self):
return dbus.ObjectPath(self.path)
def add_profile(self, profile):
self.profiles.append(profile)
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
print('GetManagedObjects')
for profile in self.profiles:
response[profile.get_path()] = profile.get_properties()
return response
class Profile(dbus.service.Object):
PATH_BASE = '/org/bluez/example/profile'
def __init__(self, bus, uuids):
self.path = self.PATH_BASE
self.bus = bus
self.uuids = uuids
dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self):
return {
GATT_PROFILE_IFACE: {
'UUIDs': self.uuids,
}
}
def get_path(self):
return dbus.ObjectPath(self.path)
@dbus.service.method(GATT_PROFILE_IFACE,
in_signature="",
out_signature="")
def Release(self):
print("Release")
mainloop.quit()
@dbus.service.method(DBUS_PROP_IFACE,
in_signature='s',
out_signature='a{sv}')
def GetAll(self, interface):
if interface != GATT_PROFILE_IFACE:
raise InvalidArgsException()
return self.get_properties[GATT_PROFILE_IFACE]
def register_app_cb():
print('GATT application registered')
def register_app_error_cb(error):
print('Failed to register application: ' + str(error))
mainloop.quit()
if __name__ == '__main__':
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
bus = dbus.SystemBus()
path = bluezutils.find_adapter().object_path
path = bluezutils.find_adapter().object_path
manager = dbus.Interface(bus.get_object("org.bluez", path),
"org.bluez.GattManager1")
manager = dbus.Interface(bus.get_object("org.bluez", path),
GATT_MANAGER_IFACE)
option_list = [
make_option("-u", "--uuid", action="store",
type="string", dest="uuid",
default=None),
make_option("-p", "--path", action="store",
type="string", dest="path",
default="/foo/bar/profile"),
]
option_list = [make_option("-u", "--uuid", action="store",
type="string", dest="uuid",
default=None),
]
opts = dbus.Dictionary({ }, signature='sv')
opts = dbus.Dictionary({}, signature='sv')
parser = OptionParser(option_list=option_list)
parser = OptionParser(option_list=option_list)
(options, args) = parser.parse_args()
(options, args) = parser.parse_args()
profile = GattProfile(bus, options.path)
mainloop = GObject.MainLoop()
mainloop = GObject.MainLoop()
if not options.uuid:
options.uuid = str(uuid.uuid4())
if not options.uuid:
options.uuid = str(uuid.uuid4())
app = Application(bus)
profile = Profile(bus, [options.uuid])
app.add_profile(profile)
manager.RegisterApplication(app.get_path(), {},
reply_handler=register_app_cb,
error_handler=register_app_error_cb)
uuids = { options.uuid }
manager.RegisterProfile(options.path, uuids, opts)
mainloop.run()
mainloop.run()