mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2024-11-16 08:44:38 +08:00
test: Add Python GATT client example
This patch introduces test/example-gatt-client which implements a simple D-Bus client application for a remote Heart Rate service.
This commit is contained in:
parent
156df086a3
commit
f95f762c73
218
test/example-gatt-client
Executable file
218
test/example-gatt-client
Executable file
@ -0,0 +1,218 @@
|
||||
#!/usr/bin/python
|
||||
|
||||
import argparse
|
||||
import dbus
|
||||
import gobject
|
||||
import sys
|
||||
|
||||
from dbus.mainloop.glib import DBusGMainLoop
|
||||
|
||||
bus = None
|
||||
mainloop = None
|
||||
|
||||
BLUEZ_SERVICE_NAME = 'org.bluez'
|
||||
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
||||
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
||||
|
||||
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
|
||||
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
|
||||
|
||||
HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
|
||||
HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
|
||||
BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
|
||||
HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
|
||||
|
||||
# The objects that we interact with.
|
||||
hr_service = None
|
||||
hr_msrmt_chrc = None
|
||||
body_snsr_loc_chrc = None
|
||||
hr_ctrl_pt_chrc = None
|
||||
|
||||
|
||||
def generic_error_cb(error):
|
||||
print('D-Bus call failed: ' + str(error))
|
||||
mainloop.quit()
|
||||
|
||||
|
||||
def body_sensor_val_to_str(val):
|
||||
if val == 0:
|
||||
return 'Other'
|
||||
if val == 1:
|
||||
return 'Chest'
|
||||
if val == 2:
|
||||
return 'Wrist'
|
||||
if val == 3:
|
||||
return 'Finger'
|
||||
if val == 4:
|
||||
return 'Hand'
|
||||
if val == 5:
|
||||
return 'Ear Lobe'
|
||||
if val == 6:
|
||||
return 'Foot'
|
||||
|
||||
return 'Reserved value'
|
||||
|
||||
|
||||
def sensor_contact_val_to_str(val):
|
||||
if val == 0 or val == 1:
|
||||
return 'not supported'
|
||||
if val == 2:
|
||||
return 'no contact detected'
|
||||
if val == 3:
|
||||
return 'contact detected'
|
||||
|
||||
return 'invalid value'
|
||||
|
||||
|
||||
def body_sensor_val_cb(value):
|
||||
if len(value) != 1:
|
||||
print('Invalid body sensor location value: ' + repr(value))
|
||||
return
|
||||
|
||||
print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
|
||||
|
||||
|
||||
def hr_msrmt_start_notify_cb():
|
||||
print('HR Measurement notifications enabled')
|
||||
|
||||
|
||||
def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
|
||||
if iface != GATT_CHRC_IFACE:
|
||||
return
|
||||
|
||||
if not len(changed_props):
|
||||
return
|
||||
|
||||
value = changed_props.get('Value', None)
|
||||
if not value:
|
||||
return
|
||||
|
||||
print('New HR Measurement')
|
||||
|
||||
flags = value[0]
|
||||
value_format = flags & 0x01
|
||||
sc_status = (flags >> 1) & 0x03
|
||||
ee_status = flags & 0x08
|
||||
|
||||
if value_format == 0x00:
|
||||
hr_msrmt = value[1]
|
||||
next_ind = 2
|
||||
else:
|
||||
hr_msrmt = value[1] | (value[2] << 8)
|
||||
next_ind = 3
|
||||
|
||||
print('\tHR: ' + str(int(hr_msrmt)))
|
||||
print('\tSensor Contact status: ' +
|
||||
sensor_contact_val_to_str(sc_status))
|
||||
|
||||
if ee_status:
|
||||
print('\tEnergy Expended: ' + str(int(value[next_ind])))
|
||||
|
||||
|
||||
def start_client():
|
||||
# Read the Body Sensor Location value and print it asynchronously.
|
||||
body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
|
||||
error_handler=generic_error_cb,
|
||||
dbus_interface=GATT_CHRC_IFACE)
|
||||
|
||||
# Listen to PropertiesChanged signals from the Heart Measurement
|
||||
# Characteristic.
|
||||
hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
|
||||
hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
|
||||
hr_msrmt_changed_cb)
|
||||
|
||||
# Subscribe to Heart Rate Measurement notifications.
|
||||
hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
|
||||
error_handler=generic_error_cb,
|
||||
dbus_interface=GATT_CHRC_IFACE)
|
||||
|
||||
|
||||
def process_chrc(chrc_path):
|
||||
chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
|
||||
chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
|
||||
dbus_interface=DBUS_PROP_IFACE)
|
||||
|
||||
uuid = chrc_props['UUID']
|
||||
|
||||
if uuid == HR_MSRMT_UUID:
|
||||
global hr_msrmt_chrc
|
||||
hr_msrmt_chrc = (chrc, chrc_props)
|
||||
elif uuid == BODY_SNSR_LOC_UUID:
|
||||
global body_snsr_loc_chrc
|
||||
body_snsr_loc_chrc = (chrc, chrc_props)
|
||||
elif uuid == HR_CTRL_PT_UUID:
|
||||
global hr_ctrl_pt_chrc
|
||||
hr_ctrl_pt_chrc = (chrc, chrc_props)
|
||||
else:
|
||||
print('Unrecognized characteristic: ' + uuid)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def process_hr_service(service_path):
|
||||
service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
|
||||
service_props = service.GetAll(GATT_SERVICE_IFACE,
|
||||
dbus_interface=DBUS_PROP_IFACE)
|
||||
|
||||
uuid = service_props['UUID']
|
||||
|
||||
if uuid != HR_SVC_UUID:
|
||||
print('Service is not a Heart Rate Service: ' + uuid)
|
||||
return False
|
||||
|
||||
# Process the characteristics.
|
||||
chrc_paths = service_props['Characteristics']
|
||||
for chrc_path in chrc_paths:
|
||||
process_chrc(chrc_path)
|
||||
|
||||
global hr_service
|
||||
hr_service = (service, service_props, service_path)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def interfaces_removed_cb(object_path, interfaces):
|
||||
if not hr_service:
|
||||
return
|
||||
|
||||
if object_path == hr_service[2]:
|
||||
print('Service was removed')
|
||||
mainloop.quit()
|
||||
|
||||
|
||||
def main():
|
||||
# Prase the service path from the arguments.
|
||||
parser = argparse.ArgumentParser(
|
||||
description='D-Bus Heart Rate Service client example')
|
||||
parser.add_argument('service_path', metavar='<service-path>',
|
||||
type=dbus.ObjectPath, nargs=1,
|
||||
help='GATT service object path')
|
||||
args = parser.parse_args()
|
||||
service_path = args.service_path[0]
|
||||
|
||||
# Set up the main loop.
|
||||
DBusGMainLoop(set_as_default=True)
|
||||
global bus
|
||||
bus = dbus.SystemBus()
|
||||
global mainloop
|
||||
mainloop = gobject.MainLoop()
|
||||
|
||||
om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
|
||||
om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
|
||||
|
||||
try:
|
||||
if not process_hr_service(service_path):
|
||||
sys.exit(1)
|
||||
except dbus.DBusException as e:
|
||||
print e.message
|
||||
sys.exit(1)
|
||||
|
||||
print 'Heart Rate Service ready'
|
||||
|
||||
start_client()
|
||||
|
||||
mainloop.run()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in New Issue
Block a user