From f95f762c7305c41d1a69b88e4e575bfc47d621f8 Mon Sep 17 00:00:00 2001 From: Arman Uguray Date: Wed, 1 Apr 2015 17:40:00 -0700 Subject: [PATCH] test: Add Python GATT client example This patch introduces test/example-gatt-client which implements a simple D-Bus client application for a remote Heart Rate service. --- test/example-gatt-client | 218 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100755 test/example-gatt-client diff --git a/test/example-gatt-client b/test/example-gatt-client new file mode 100755 index 000000000..724a45d89 --- /dev/null +++ b/test/example-gatt-client @@ -0,0 +1,218 @@ +#!/usr/bin/python + +import argparse +import dbus +import gobject +import sys + +from dbus.mainloop.glib import DBusGMainLoop + +bus = None +mainloop = None + +BLUEZ_SERVICE_NAME = 'org.bluez' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +GATT_SERVICE_IFACE = 'org.bluez.GattService1' +GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' + +HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb' +HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb' +BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb' +HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb' + +# The objects that we interact with. +hr_service = None +hr_msrmt_chrc = None +body_snsr_loc_chrc = None +hr_ctrl_pt_chrc = None + + +def generic_error_cb(error): + print('D-Bus call failed: ' + str(error)) + mainloop.quit() + + +def body_sensor_val_to_str(val): + if val == 0: + return 'Other' + if val == 1: + return 'Chest' + if val == 2: + return 'Wrist' + if val == 3: + return 'Finger' + if val == 4: + return 'Hand' + if val == 5: + return 'Ear Lobe' + if val == 6: + return 'Foot' + + return 'Reserved value' + + +def sensor_contact_val_to_str(val): + if val == 0 or val == 1: + return 'not supported' + if val == 2: + return 'no contact detected' + if val == 3: + return 'contact detected' + + return 'invalid value' + + +def body_sensor_val_cb(value): + if len(value) != 1: + print('Invalid body sensor location value: ' + repr(value)) + return + + print('Body sensor location value: ' + body_sensor_val_to_str(value[0])) + + +def hr_msrmt_start_notify_cb(): + print('HR Measurement notifications enabled') + + +def hr_msrmt_changed_cb(iface, changed_props, invalidated_props): + if iface != GATT_CHRC_IFACE: + return + + if not len(changed_props): + return + + value = changed_props.get('Value', None) + if not value: + return + + print('New HR Measurement') + + flags = value[0] + value_format = flags & 0x01 + sc_status = (flags >> 1) & 0x03 + ee_status = flags & 0x08 + + if value_format == 0x00: + hr_msrmt = value[1] + next_ind = 2 + else: + hr_msrmt = value[1] | (value[2] << 8) + next_ind = 3 + + print('\tHR: ' + str(int(hr_msrmt))) + print('\tSensor Contact status: ' + + sensor_contact_val_to_str(sc_status)) + + if ee_status: + print('\tEnergy Expended: ' + str(int(value[next_ind]))) + + +def start_client(): + # Read the Body Sensor Location value and print it asynchronously. + body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb, + error_handler=generic_error_cb, + dbus_interface=GATT_CHRC_IFACE) + + # Listen to PropertiesChanged signals from the Heart Measurement + # Characteristic. + hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE) + hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged", + hr_msrmt_changed_cb) + + # Subscribe to Heart Rate Measurement notifications. + hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb, + error_handler=generic_error_cb, + dbus_interface=GATT_CHRC_IFACE) + + +def process_chrc(chrc_path): + chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path) + chrc_props = chrc.GetAll(GATT_CHRC_IFACE, + dbus_interface=DBUS_PROP_IFACE) + + uuid = chrc_props['UUID'] + + if uuid == HR_MSRMT_UUID: + global hr_msrmt_chrc + hr_msrmt_chrc = (chrc, chrc_props) + elif uuid == BODY_SNSR_LOC_UUID: + global body_snsr_loc_chrc + body_snsr_loc_chrc = (chrc, chrc_props) + elif uuid == HR_CTRL_PT_UUID: + global hr_ctrl_pt_chrc + hr_ctrl_pt_chrc = (chrc, chrc_props) + else: + print('Unrecognized characteristic: ' + uuid) + + return True + + +def process_hr_service(service_path): + service = bus.get_object(BLUEZ_SERVICE_NAME, service_path) + service_props = service.GetAll(GATT_SERVICE_IFACE, + dbus_interface=DBUS_PROP_IFACE) + + uuid = service_props['UUID'] + + if uuid != HR_SVC_UUID: + print('Service is not a Heart Rate Service: ' + uuid) + return False + + # Process the characteristics. + chrc_paths = service_props['Characteristics'] + for chrc_path in chrc_paths: + process_chrc(chrc_path) + + global hr_service + hr_service = (service, service_props, service_path) + + return True + + +def interfaces_removed_cb(object_path, interfaces): + if not hr_service: + return + + if object_path == hr_service[2]: + print('Service was removed') + mainloop.quit() + + +def main(): + # Prase the service path from the arguments. + parser = argparse.ArgumentParser( + description='D-Bus Heart Rate Service client example') + parser.add_argument('service_path', metavar='', + type=dbus.ObjectPath, nargs=1, + help='GATT service object path') + args = parser.parse_args() + service_path = args.service_path[0] + + # Set up the main loop. + DBusGMainLoop(set_as_default=True) + global bus + bus = dbus.SystemBus() + global mainloop + mainloop = gobject.MainLoop() + + om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE) + om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb) + + try: + if not process_hr_service(service_path): + sys.exit(1) + except dbus.DBusException as e: + print e.message + sys.exit(1) + + print 'Heart Rate Service ready' + + start_client() + + mainloop.run() + + +if __name__ == '__main__': + main()