mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2024-11-16 00:34:39 +08:00
7fe27bbf7d
This patch adds SPDX License Identifier and removes the license text. ------------------------------------- License COUNT ------------------------------------- LGPL-2.1-or-later : 35 License: LGPL-2.1-or-later test/agent.py test/bluezutils.py test/dbusdef.py test/example-advertisement test/example-endpoint test/example-gatt-client test/example-gatt-server test/example-player test/exchange-business-cards test/ftp-client test/get-managed-objects test/get-obex-capabilities test/list-devices test/list-folders test/map-client test/monitor-bluetooth test/opp-client test/pbap-client test/sap_client.py test/simple-endpoint test/simple-obex-agent test/simple-player test/test-adapter test/test-device test/test-discovery test/test-gatt-profile test/test-health test/test-health-sink test/test-hfp test/test-manager test/test-mesh test/test-nap test/test-network test/test-profile test/test-sap-server
437 lines
10 KiB
Python
437 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
|
|
import sys
|
|
import struct
|
|
import numpy
|
|
import dbus
|
|
import dbus.service
|
|
import dbus.exceptions
|
|
|
|
from threading import Timer
|
|
import time
|
|
|
|
try:
|
|
from gi.repository import GObject
|
|
except ImportError:
|
|
import gobject as GObject
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
|
|
import agent
|
|
|
|
MESH_SERVICE_NAME = 'org.bluez.mesh'
|
|
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
|
|
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
|
|
|
|
MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
|
|
MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
|
|
MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
|
|
MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
|
|
|
|
APP_COMPANY_ID = 0x05f1
|
|
APP_PRODUCT_ID = 0x0001
|
|
APP_VERSION_ID = 0x0001
|
|
|
|
VENDOR_ID_NONE = 0xffff
|
|
|
|
mesh_net = None
|
|
app = None
|
|
bus = None
|
|
mainloop = None
|
|
node = None
|
|
|
|
token = None
|
|
|
|
def generic_error_cb(error):
|
|
print('D-Bus call failed: ' + str(error))
|
|
|
|
def generic_reply_cb():
|
|
print('D-Bus call done')
|
|
|
|
def unwrap(item):
|
|
if isinstance(item, dbus.Boolean):
|
|
return bool(item)
|
|
if isinstance(item, (dbus.UInt16, dbus.Int16,
|
|
dbus.UInt32, dbus.Int32,
|
|
dbus.UInt64, dbus.Int64)):
|
|
return int(item)
|
|
if isinstance(item, dbus.Byte):
|
|
return bytes([int(item)])
|
|
if isinstance(item, dbus.String):
|
|
return item
|
|
if isinstance(item, (dbus.Array, list, tuple)):
|
|
return [unwrap(x) for x in item]
|
|
if isinstance(item, (dbus.Dictionary, dict)):
|
|
return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
|
|
|
|
print('Dictionary item not handled')
|
|
print(type(item))
|
|
return item
|
|
|
|
def join_cb():
|
|
print('Join procedure started')
|
|
|
|
def join_error_cb(reason):
|
|
print('Join procedure failed: ', reason)
|
|
|
|
def attach_app_cb(node_path, dict_array):
|
|
print('Mesh application registered ', node_path)
|
|
|
|
obj = bus.get_object(MESH_SERVICE_NAME, node_path)
|
|
|
|
global node
|
|
node = dbus.Interface(obj, MESH_NODE_IFACE)
|
|
|
|
els = unwrap(dict_array)
|
|
print("Get Elements")
|
|
|
|
for el in els:
|
|
idx = struct.unpack('b', el[0])[0]
|
|
print('Configuration for Element ', end='')
|
|
print(idx)
|
|
|
|
models = el[1]
|
|
element = app.get_element(idx)
|
|
element.set_model_config(models)
|
|
|
|
def attach_app_error_cb(error):
|
|
print('Failed to register application: ' + str(error))
|
|
mainloop.quit()
|
|
|
|
def attach(token):
|
|
print('Attach')
|
|
mesh_net.Attach(app.get_path(), token,
|
|
reply_handler=attach_app_cb,
|
|
error_handler=attach_app_error_cb)
|
|
|
|
def interfaces_removed_cb(object_path, interfaces):
|
|
if not mesh_net:
|
|
return
|
|
|
|
if object_path == mesh_net[2]:
|
|
print('Service was removed')
|
|
mainloop.quit()
|
|
|
|
def send_response(path, dest, key, data):
|
|
print('send response ', end='')
|
|
print(data)
|
|
node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
|
|
error_handler=generic_error_cb)
|
|
|
|
def send_publication(path, model_id, data):
|
|
print('send publication ', end='')
|
|
print(data)
|
|
node.Publish(path, model_id, data, reply_handler=generic_reply_cb,
|
|
error_handler=generic_error_cb)
|
|
|
|
class PubTimer():
|
|
def __init__(self):
|
|
self.seconds = None
|
|
self.func = None
|
|
self.thread = None
|
|
self.busy = False
|
|
|
|
def _timeout_cb(self):
|
|
self.func()
|
|
self.busy = True
|
|
self._schedule_timer()
|
|
self.busy =False
|
|
|
|
def _schedule_timer(self):
|
|
self.thread = Timer(self.seconds, self._timeout_cb)
|
|
self.thread.start()
|
|
|
|
def start(self, seconds, func):
|
|
self.func = func
|
|
self.seconds = seconds
|
|
if not self.busy:
|
|
self._schedule_timer()
|
|
|
|
def cancel(self):
|
|
print('Cancel timer')
|
|
if self.thread is not None:
|
|
print('Cancel thread')
|
|
self.thread.cancel()
|
|
self.thread = None
|
|
|
|
class Application(dbus.service.Object):
|
|
|
|
def __init__(self, bus):
|
|
self.path = '/example'
|
|
self.agent = None
|
|
self.elements = []
|
|
dbus.service.Object.__init__(self, bus, self.path)
|
|
|
|
def set_agent(self, agent):
|
|
self.agent = agent
|
|
|
|
def get_path(self):
|
|
return dbus.ObjectPath(self.path)
|
|
|
|
def add_element(self, element):
|
|
self.elements.append(element)
|
|
|
|
def get_element(self, idx):
|
|
for ele in self.elements:
|
|
if ele.get_index() == idx:
|
|
return ele
|
|
|
|
def get_properties(self):
|
|
return {
|
|
MESH_APPLICATION_IFACE: {
|
|
'CompanyID': dbus.UInt16(APP_COMPANY_ID),
|
|
'ProductID': dbus.UInt16(APP_PRODUCT_ID),
|
|
'VersionID': dbus.UInt16(APP_VERSION_ID)
|
|
}
|
|
}
|
|
|
|
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
|
|
|
|
def GetManagedObjects(self):
|
|
response = {}
|
|
print('GetManagedObjects')
|
|
response[self.path] = self.get_properties()
|
|
response[self.agent.get_path()] = self.agent.get_properties()
|
|
for element in self.elements:
|
|
response[element.get_path()] = element.get_properties()
|
|
return response
|
|
|
|
@dbus.service.method(MESH_APPLICATION_IFACE,
|
|
in_signature="t", out_signature="")
|
|
|
|
def JoinComplete(self, value):
|
|
global token
|
|
print('JoinComplete ', value)
|
|
|
|
token = value
|
|
attach(token)
|
|
|
|
@dbus.service.method(MESH_APPLICATION_IFACE,
|
|
in_signature="s", out_signature="")
|
|
|
|
def JoinFailed(self, value):
|
|
print('JoinFailed ', value)
|
|
token = value
|
|
|
|
class Element(dbus.service.Object):
|
|
PATH_BASE = '/example/ele'
|
|
|
|
def __init__(self, bus, index):
|
|
self.path = self.PATH_BASE + format(index, '02x')
|
|
print(self.path)
|
|
self.models = []
|
|
self.bus = bus
|
|
self.index = index
|
|
dbus.service.Object.__init__(self, bus, self.path)
|
|
|
|
def _get_sig_models(self):
|
|
ids = []
|
|
for model in self.models:
|
|
id = model.get_id()
|
|
vendor = model.get_vendor()
|
|
if vendor == VENDOR_ID_NONE:
|
|
ids.append(id)
|
|
return ids
|
|
|
|
def _get_v_models(self):
|
|
ids = []
|
|
for model in self.models:
|
|
id = model.get_id()
|
|
v = model.get_vendor()
|
|
if v != VENDOR_ID_NONE:
|
|
vendor_id = (v, id)
|
|
ids.append(vendor_id)
|
|
return ids
|
|
|
|
def get_properties(self):
|
|
vendor_models = self._get_v_models()
|
|
sig_models = self._get_sig_models()
|
|
|
|
return {
|
|
MESH_ELEMENT_IFACE: {
|
|
'Index': dbus.Byte(self.index),
|
|
'Models': dbus.Array(sig_models, 'q'),
|
|
'VendorModels': dbus.Array(vendor_models, '(qq)'),
|
|
}
|
|
}
|
|
|
|
def add_model(self, model):
|
|
model.set_path(self.path)
|
|
self.models.append(model)
|
|
|
|
def get_index(self):
|
|
return self.index
|
|
|
|
def set_model_config(self, configs):
|
|
print('Set element models config')
|
|
for config in configs:
|
|
mod_id = config[0]
|
|
self.UpdateModelConfiguration(mod_id, config[1])
|
|
|
|
@dbus.service.method(MESH_ELEMENT_IFACE,
|
|
in_signature="qqvay", out_signature="")
|
|
def MessageReceived(self, source, key, destination, data):
|
|
print('Message Received on Element %d, src=%04x, dst=%s' %
|
|
self.index, source, destination)
|
|
for model in self.models:
|
|
model.process_message(source, key, data)
|
|
|
|
@dbus.service.method(MESH_ELEMENT_IFACE,
|
|
in_signature="qa{sv}", out_signature="")
|
|
|
|
def UpdateModelConfiguration(self, model_id, config):
|
|
print('UpdateModelConfig ', end='')
|
|
print(hex(model_id))
|
|
for model in self.models:
|
|
if model_id == model.get_id():
|
|
model.set_config(config)
|
|
return
|
|
|
|
@dbus.service.method(MESH_ELEMENT_IFACE,
|
|
in_signature="", out_signature="")
|
|
|
|
def get_path(self):
|
|
return dbus.ObjectPath(self.path)
|
|
|
|
class Model():
|
|
def __init__(self, model_id):
|
|
self.cmd_ops = []
|
|
self.model_id = model_id
|
|
self.vendor = VENDOR_ID_NONE
|
|
self.bindings = []
|
|
self.pub_period = 0
|
|
self.pub_id = 0
|
|
self.path = None
|
|
|
|
def set_path(self, path):
|
|
self.path = path
|
|
|
|
def get_id(self):
|
|
return self.model_id
|
|
|
|
def get_vendor(self):
|
|
return self.vendor
|
|
|
|
def process_message(self, source, key, data):
|
|
print('Model process message')
|
|
|
|
def set_publication(self, period):
|
|
self.pub_period = period
|
|
|
|
def set_config(self, config):
|
|
if 'Bindings' in config:
|
|
self.bindings = config.get('Bindings')
|
|
print('Bindings: ', end='')
|
|
print(self.bindings)
|
|
if 'PublicationPeriod' in config:
|
|
self.set_publication(config.get('PublicationPeriod'))
|
|
print('Model publication period ', end='')
|
|
print(self.pub_period, end='')
|
|
print(' ms')
|
|
if 'Subscriptions' in config:
|
|
self.print_subscriptions(config.get('Subscriptions'))
|
|
|
|
def print_subscriptions(self, subscriptions):
|
|
print('Model subscriptions ', end='')
|
|
for sub in subscriptions:
|
|
if isinstance(sub, int):
|
|
print('%04x' % sub, end=' ')
|
|
|
|
if isinstance(sub, list):
|
|
label = uuid.UUID(bytes=b''.join(sub))
|
|
print(label, end=' ')
|
|
print()
|
|
|
|
class OnOffServer(Model):
|
|
def __init__(self, model_id):
|
|
Model.__init__(self, model_id)
|
|
self.cmd_ops = { 0x8201, # get
|
|
0x8202, # set
|
|
0x8203 } # set unacknowledged
|
|
|
|
print("OnOff Server ", end="")
|
|
self.state = 0
|
|
print('State ', end='')
|
|
self.timer = PubTimer()
|
|
|
|
def process_message(self, source, key, data):
|
|
datalen = len(data)
|
|
print('OnOff Server process message len ', datalen)
|
|
|
|
if datalen!=2 and datalen!=3:
|
|
return
|
|
|
|
if datalen==2:
|
|
op_tuple=struct.unpack('<H',bytes(data))
|
|
opcode = op_tuple[0]
|
|
if opcode != 0x8201:
|
|
print(hex(opcode))
|
|
return
|
|
print('Get state')
|
|
elif datalen==3:
|
|
opcode,self.state=struct.unpack('<HB',bytes(data))
|
|
if opcode != 0x8202 and opcode != 0x8203:
|
|
print(hex(opcode))
|
|
return
|
|
print('Set state: ', end='')
|
|
print(self.state)
|
|
|
|
rsp_data = struct.pack('<HB', 0x8204, self.state)
|
|
send_response(self.path, source, key, rsp_data)
|
|
|
|
def publish(self):
|
|
print('Publish')
|
|
data = struct.pack('B', self.state)
|
|
send_publication(self.path, self.model_id, data)
|
|
|
|
def set_publication(self, period):
|
|
if period == 0:
|
|
self.pub_period = 0
|
|
self.timer.cancel()
|
|
return
|
|
|
|
# We do not handle ms in this example
|
|
if period < 1000:
|
|
return
|
|
|
|
self.pub_period = period
|
|
self.timer.start(period/1000, self.publish)
|
|
|
|
def main():
|
|
|
|
DBusGMainLoop(set_as_default=True)
|
|
|
|
global bus
|
|
bus = dbus.SystemBus()
|
|
global mainloop
|
|
global app
|
|
global mesh_net
|
|
|
|
mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
|
|
"/org/bluez/mesh"),
|
|
MESH_NETWORK_IFACE)
|
|
mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
|
|
|
|
app = Application(bus)
|
|
prov_agent = agent.Agent(bus)
|
|
app.set_agent(prov_agent)
|
|
first_ele = Element(bus, 0x00)
|
|
first_ele.add_model(OnOffServer(0x1000))
|
|
app.add_element(first_ele)
|
|
|
|
mainloop = GObject.MainLoop()
|
|
|
|
print('Join')
|
|
caps = ["out-numeric"]
|
|
oob = ["other"]
|
|
uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
|
|
print(uuid)
|
|
mesh_net.Join(app.get_path(), uuid,
|
|
reply_handler=join_cb,
|
|
error_handler=join_error_cb)
|
|
|
|
mainloop.run()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|