mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2024-12-05 01:54:22 +08:00
220 lines
4.6 KiB
Python
Executable File
220 lines
4.6 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import dbus
|
|
import dbus.service
|
|
import gobject
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
import sys
|
|
|
|
DBusGMainLoop(set_as_default=True)
|
|
loop = gobject.MainLoop()
|
|
|
|
bus = dbus.SystemBus()
|
|
|
|
def sig_received(*args, **kwargs):
|
|
if "member" not in kwargs:
|
|
return
|
|
if "path" not in kwargs:
|
|
return;
|
|
sig_name = kwargs["member"]
|
|
path = kwargs["path"]
|
|
print(sig_name)
|
|
print(path)
|
|
if sig_name == "PropertyChanged":
|
|
k, v = args
|
|
print(k)
|
|
print(v)
|
|
else:
|
|
ob = args[0]
|
|
print(ob)
|
|
|
|
|
|
def enter_mainloop():
|
|
bus.add_signal_receiver(sig_received, bus_name="org.bluez",
|
|
dbus_interface = "org.bluez.HealthDevice",
|
|
path_keyword="path",
|
|
member_keyword="member",
|
|
interface_keyword="interface")
|
|
|
|
try:
|
|
print("Entering main lopp, push Ctrl+C for finish")
|
|
|
|
mainloop = gobject.MainLoop()
|
|
mainloop.run()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
print("Exiting, bye")
|
|
|
|
hdp_manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"),
|
|
"org.bluez.HealthManager")
|
|
|
|
role = None
|
|
while role == None:
|
|
print("Select 1. source or 2. sink: ",)
|
|
try:
|
|
sel = int(sys.stdin.readline())
|
|
if sel == 1:
|
|
role = "Source"
|
|
elif sel == 2:
|
|
role = "Sink"
|
|
else:
|
|
raise ValueError
|
|
except (TypeError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
dtype = None
|
|
while dtype == None:
|
|
print("Select a data type: ",)
|
|
try:
|
|
sel = int(sys.stdin.readline())
|
|
if (sel < 0) or (sel > 65535):
|
|
raise ValueError
|
|
dtype = sel;
|
|
except (TypeError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
pref = None
|
|
if role == "Source":
|
|
while pref == None:
|
|
try:
|
|
print("Select a preferred data channel type 1.",)
|
|
print("reliable 2. streaming: ",)
|
|
sel = int(sys.stdin.readline())
|
|
if sel == 1:
|
|
pref = "Reliable"
|
|
elif sel == 2:
|
|
pref = "Streaming"
|
|
else:
|
|
raise ValueError
|
|
|
|
except (TypeError, ValueError):
|
|
print("Wrong selection, try again")
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
app_path = hdp_manager.CreateApplication({
|
|
"DataType": dbus.types.UInt16(dtype),
|
|
"Role": role,
|
|
"Description": "Test Source",
|
|
"ChannelType": pref})
|
|
else:
|
|
app_path = hdp_manager.CreateApplication({
|
|
"DataType": dbus.types.UInt16(dtype),
|
|
"Description": "Test sink",
|
|
"Role": role})
|
|
|
|
print("New application created:", app_path)
|
|
|
|
con = None
|
|
while con == None:
|
|
try:
|
|
print("Connect to a remote device (y/n)? ",)
|
|
sel = sys.stdin.readline()
|
|
if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
|
|
con = True
|
|
elif sel in ("n\n", "no\n", "N\n", "NO\n"):
|
|
con = False
|
|
else:
|
|
print("Wrong selection, try again.")
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
if not con:
|
|
enter_mainloop()
|
|
sys.exit()
|
|
|
|
manager = dbus.Interface(bus.get_object("org.bluez", "/"),
|
|
"org.bluez.Manager")
|
|
|
|
adapters = manager.GetProperties()["Adapters"]
|
|
|
|
i = 1
|
|
for ad in adapters:
|
|
print("%d. %s" % (i, ad))
|
|
i = i + 1
|
|
|
|
print("Select an adapter: ",)
|
|
select = None
|
|
while select == None:
|
|
try:
|
|
pos = int(sys.stdin.readline()) - 1
|
|
if pos < 0:
|
|
raise TypeError
|
|
select = adapters[pos]
|
|
except (TypeError, IndexError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
adapter = dbus.Interface(bus.get_object("org.bluez", select),
|
|
"org.bluez.Adapter")
|
|
|
|
devices = adapter.GetProperties()["Devices"]
|
|
|
|
if len(devices) == 0:
|
|
print("No devices available")
|
|
sys.exit()
|
|
|
|
i = 1
|
|
for dev in devices:
|
|
print("%d. %s" % (i, dev))
|
|
i = i + 1
|
|
|
|
print("Select a device: ",)
|
|
select = None
|
|
while select == None:
|
|
try:
|
|
pos = int(sys.stdin.readline()) - 1
|
|
if pos < 0:
|
|
raise TypeError
|
|
select = devices[pos]
|
|
except (TypeError, IndexError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
device = dbus.Interface(bus.get_object("org.bluez", select),
|
|
"org.bluez.HealthDevice")
|
|
|
|
echo = None
|
|
while echo == None:
|
|
try:
|
|
print("Perform an echo (y/n)? ",)
|
|
sel = sys.stdin.readline()
|
|
if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
|
|
echo = True
|
|
elif sel in ("n\n", "no\n", "N\n", "NO\n"):
|
|
echo = False
|
|
else:
|
|
print("Wrong selection, try again.")
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
if echo:
|
|
if device.Echo():
|
|
print("Echo was ok")
|
|
else:
|
|
print("Echo war wrong, exiting")
|
|
sys.exit()
|
|
|
|
print("Connecting to device %s" % (select))
|
|
|
|
if role == "Source":
|
|
chan = device.CreateChannel(app_path, "Reliable")
|
|
else:
|
|
chan = device.CreateChannel(app_path, "Any")
|
|
|
|
print(chan)
|
|
|
|
enter_mainloop()
|
|
|
|
hdp_manager.DestroyApplication(app_path)
|