bluez/test/test-join
Inga Stotland 4ed860a452 mesh: Sample Mesh Joiner (provision acceptor)
This implements a simple test to excercise Join() method
of org.bluez.mesh.Network interface.
2019-01-08 08:26:22 -08:00

409 lines
9.3 KiB
Python

#!/usr/bin/env python3
import sys
import struct
import numpy
import dbus
import dbus.service
import dbus.exceptions
from threading import Timer
import time
try:
from gi.repository import GObject
except ImportError:
import gobject as GObject
from dbus.mainloop.glib import DBusGMainLoop
import agent
MESH_SERVICE_NAME = 'org.bluez.mesh'
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
MESH_NETWORK_IFACE = 'org.bluez.mesh.Network1'
MESH_NODE_IFACE = 'org.bluez.mesh.Node1'
MESH_APPLICATION_IFACE = 'org.bluez.mesh.Application1'
MESH_ELEMENT_IFACE = 'org.bluez.mesh.Element1'
APP_COMPANY_ID = 0x05f1
APP_PRODUCT_ID = 0x0001
APP_VERSION_ID = 0x0001
VENDOR_ID_NONE = 0xffff
mesh_net = None
app = None
bus = None
mainloop = None
node = None
token = None
def generic_error_cb(error):
print('D-Bus call failed: ' + str(error))
def generic_reply_cb():
print('D-Bus call done')
def unwrap(item):
if isinstance(item, dbus.Boolean):
return bool(item)
if isinstance(item, (dbus.UInt16, dbus.Int16,
dbus.UInt32, dbus.Int32,
dbus.UInt64, dbus.Int64)):
return int(item)
if isinstance(item, dbus.Byte):
return bytes([int(item)])
if isinstance(item, dbus.String):
return item
if isinstance(item, (dbus.Array, list, tuple)):
return [unwrap(x) for x in item]
if isinstance(item, (dbus.Dictionary, dict)):
return dict([(unwrap(x), unwrap(y)) for x, y in item.items()])
print('Dictionary item not handled')
print(type(item))
return item
def join_cb():
print('Join procedure started')
def join_error_cb(reason):
print('Join procedure failed: ', reason)
def attach_app_cb(node_path, dict_array):
print('Mesh application registered ', node_path)
obj = bus.get_object(MESH_SERVICE_NAME, node_path)
global node
node = dbus.Interface(obj, MESH_NODE_IFACE)
els = unwrap(dict_array)
print("Get Elements")
for el in els:
idx = struct.unpack('b', el[0])[0]
print('Configuration for Element ', end='')
print(idx)
models = el[1]
element = app.get_element(idx)
element.set_model_config(models)
def attach_app_error_cb(error):
print('Failed to register application: ' + str(error))
mainloop.quit()
def attach(token):
print('Attach')
mesh_net.Attach(app.get_path(), token,
reply_handler=attach_app_cb,
error_handler=attach_app_error_cb)
def interfaces_removed_cb(object_path, interfaces):
if not mesh_net:
return
if object_path == mesh_net[2]:
print('Service was removed')
mainloop.quit()
def send_response(path, dest, key, data):
print('send response ', end='')
print(data)
node.Send(path, dest, key, data, reply_handler=generic_reply_cb,
error_handler=generic_error_cb)
def send_publication(path, model_id, data):
print('send publication ', end='')
print(data)
node.Publish(path, model_id, data, reply_handler=generic_reply_cb,
error_handler=generic_error_cb)
class PubTimer():
def __init__(self):
self.seconds = None
self.func = None
self.thread = None
self.busy = False
def _timeout_cb(self):
self.func()
self.busy = True
self._schedule_timer()
self.busy =False
def _schedule_timer(self):
self.thread = Timer(self.seconds, self._timeout_cb)
self.thread.start()
def start(self, seconds, func):
self.func = func
self.seconds = seconds
if not self.busy:
self._schedule_timer()
def cancel(self):
print('Cancel timer')
if self.thread is not None:
print('Cancel thread')
self.thread.cancel()
self.thread = None
class Application(dbus.service.Object):
def __init__(self, bus):
self.path = '/example'
self.agent = None
self.elements = []
dbus.service.Object.__init__(self, bus, self.path)
def set_agent(self, agent):
self.agent = agent
def get_path(self):
return dbus.ObjectPath(self.path)
def add_element(self, element):
self.elements.append(element)
def get_element(self, idx):
for ele in self.elements:
if ele.get_index() == idx:
return ele
def get_properties(self):
return {
MESH_APPLICATION_IFACE: {
'CompanyID': dbus.UInt16(APP_COMPANY_ID),
'ProductID': dbus.UInt16(APP_PRODUCT_ID),
'VersionID': dbus.UInt16(APP_VERSION_ID)
}
}
@dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
def GetManagedObjects(self):
response = {}
print('GetManagedObjects')
response[self.path] = self.get_properties()
response[self.agent.get_path()] = self.agent.get_properties()
for element in self.elements:
response[element.get_path()] = element.get_properties()
return response
@dbus.service.method(MESH_APPLICATION_IFACE,
in_signature="t", out_signature="")
def JoinComplete(self, value):
global token
print('JoinComplete ', value)
token = value
attach(token)
@dbus.service.method(MESH_APPLICATION_IFACE,
in_signature="s", out_signature="")
def JoinFailed(self, value):
print('JoinFailed ', value)
token = value
class Element(dbus.service.Object):
PATH_BASE = '/example/ele'
def __init__(self, bus, index):
self.path = self.PATH_BASE + format(index, '02x')
print(self.path)
self.models = []
self.bus = bus
self.index = index
dbus.service.Object.__init__(self, bus, self.path)
def _get_sig_models(self):
ids = []
for model in self.models:
id = model.get_id()
vendor = model.get_vendor()
if vendor == VENDOR_ID_NONE:
ids.append(id)
return ids
def get_properties(self):
return {
MESH_ELEMENT_IFACE: {
'Index': dbus.Byte(self.index),
'Models': dbus.Array(self._get_sig_models(), 'q')
}
}
def add_model(self, model):
model.set_path(self.path)
self.models.append(model)
def get_index(self):
return self.index
def set_model_config(self, configs):
print('Set element models config')
for config in configs:
mod_id = config[0]
self.UpdateModelConfiguration(mod_id, config[1])
@dbus.service.method(MESH_ELEMENT_IFACE,
in_signature="qqbay", out_signature="")
def MessageReceived(self, source, key, is_sub, data):
print('Message Received on Element ', end='')
print(self.index)
for model in self.models:
model.process_message(source, key, data)
@dbus.service.method(MESH_ELEMENT_IFACE,
in_signature="qa{sv}", out_signature="")
def UpdateModelConfiguration(self, model_id, config):
print('UpdateModelConfig ', end='')
print(hex(model_id))
for model in self.models:
if model_id == model.get_id():
model.set_config(config)
return
@dbus.service.method(MESH_ELEMENT_IFACE,
in_signature="", out_signature="")
def get_path(self):
return dbus.ObjectPath(self.path)
class Model():
def __init__(self, model_id):
self.cmd_ops = []
self.model_id = model_id
self.vendor = VENDOR_ID_NONE
self.bindings = []
self.pub_period = 0
self.pub_id = 0
self.path = None
def set_path(self, path):
self.path = path
def get_id(self):
return self.model_id
def get_vendor(self):
return self.vendor
def process_message(self, source, key, data):
print('Model process message')
def set_publication(self, period):
self.pub_period = period
def set_config(self, config):
if 'Bindings' in config:
self.bindings = config.get('Bindings')
print('Bindings: ', end='')
print(self.bindings)
if 'PublicationPeriod' in config:
self.set_publication(config.get('PublicationPeriod'))
print('Model publication period ', end='')
print(self.pub_period, end='')
print(' ms')
class OnOffServer(Model):
def __init__(self, model_id):
Model.__init__(self, model_id)
self.cmd_ops = { 0x8201, # get
0x8202, # set
0x8203 } # set unacknowledged
print("OnOff Server ", end="")
self.state = 0
print('State ', end='')
self.timer = PubTimer()
def process_message(self, source, key, data):
datalen = len(data)
print('OnOff Server process message len ', datalen)
if datalen!=2 and datalen!=3:
return
if datalen==2:
op_tuple=struct.unpack('<H',bytes(data))
opcode = op_tuple[0]
if opcode != 0x8201:
print(hex(opcode))
return
print('Get state')
elif datalen==3:
opcode,self.state=struct.unpack('<HB',bytes(data))
if opcode != 0x8202 and opcode != 0x8203:
print(hex(opcode))
return
print('Set state: ', end='')
print(self.state)
rsp_data = struct.pack('<HB', 0x8204, self.state)
send_response(self.path, source, key, rsp_data)
def publish(self):
print('Publish')
data = struct.pack('B', self.state)
send_publication(self.path, self.model_id, data)
def set_publication(self, period):
if period == 0:
self.pub_period = 0
self.timer.cancel()
return
# We do not handle ms in this example
if period < 1000:
return
self.pub_period = period
self.timer.start(period/1000, self.publish)
def main():
DBusGMainLoop(set_as_default=True)
global bus
bus = dbus.SystemBus()
global mainloop
global app
global mesh_net
mesh_net = dbus.Interface(bus.get_object(MESH_SERVICE_NAME,
"/org/bluez/mesh"),
MESH_NETWORK_IFACE)
mesh_net.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
app = Application(bus)
prov_agent = agent.Agent(bus)
app.set_agent(prov_agent)
first_ele = Element(bus, 0x00)
first_ele.add_model(OnOffServer(0x1000))
app.add_element(first_ele)
mainloop = GObject.MainLoop()
print('Join')
caps = ["out-numeric"]
oob = ["other"]
uuid = bytearray.fromhex("0a0102030405060708090A0B0C0D0E0F")
print(uuid)
mesh_net.Join(app.get_path(), uuid,
reply_handler=join_cb,
error_handler=join_error_cb)
mainloop.run()
if __name__ == '__main__':
main()