mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2024-11-15 00:04:29 +08:00
d31f04aa92
Currently we have a mix of /usr/bin/python, /usr/bin/python3 and /usr/bin/env python3. Use the latter since is the more common way of handling this, plus it allows people to override the system python (for what ever reason). Inspired by a Debian patch, doing a mass /usr/bin/python{,3} conversion. Cc: Nobuhiro Iwamatsu <iwamatsu@debian.org>
244 lines
5.2 KiB
Python
Executable File
244 lines
5.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# SPDX-License-Identifier: LGPL-2.1-or-later
|
|
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
# -*- coding: utf-8 -*-
|
|
|
|
import sys
|
|
import dbus
|
|
import dbus.service
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
try:
|
|
from gi.repository import GObject
|
|
except ImportError:
|
|
import gobject as GObject
|
|
|
|
BUS_NAME = 'org.bluez'
|
|
PATH = '/org/bluez'
|
|
ADAPTER_INTERFACE = 'org.bluez.Adapter1'
|
|
HEALTH_MANAGER_INTERFACE = 'org.bluez.HealthManager1'
|
|
HEALTH_DEVICE_INTERFACE = 'org.bluez.HealthDevice1'
|
|
|
|
DBusGMainLoop(set_as_default=True)
|
|
loop = GObject.MainLoop()
|
|
|
|
bus = dbus.SystemBus()
|
|
|
|
def sig_received(*args, **kwargs):
|
|
if "member" not in kwargs:
|
|
return
|
|
if "path" not in kwargs:
|
|
return;
|
|
sig_name = kwargs["member"]
|
|
path = kwargs["path"]
|
|
print(sig_name)
|
|
print(path)
|
|
if sig_name == "PropertyChanged":
|
|
k, v = args
|
|
print(k)
|
|
print(v)
|
|
else:
|
|
ob = args[0]
|
|
print(ob)
|
|
|
|
|
|
def enter_mainloop():
|
|
bus.add_signal_receiver(sig_received, bus_name=BUS_NAME,
|
|
dbus_interface=HEALTH_DEVICE_INTERFACE,
|
|
path_keyword="path",
|
|
member_keyword="member",
|
|
interface_keyword="interface")
|
|
|
|
try:
|
|
print("Entering main lopp, push Ctrl+C for finish")
|
|
|
|
mainloop = GObject.MainLoop()
|
|
mainloop.run()
|
|
except KeyboardInterrupt:
|
|
pass
|
|
finally:
|
|
print("Exiting, bye")
|
|
|
|
hdp_manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
|
|
HEALTH_MANAGER_INTERFACE)
|
|
|
|
role = None
|
|
while role == None:
|
|
print("Select 1. source or 2. sink: ",)
|
|
try:
|
|
sel = int(sys.stdin.readline())
|
|
if sel == 1:
|
|
role = "source"
|
|
elif sel == 2:
|
|
role = "sink"
|
|
else:
|
|
raise ValueError
|
|
except (TypeError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
dtype = None
|
|
while dtype == None:
|
|
print("Select a data type: ",)
|
|
try:
|
|
sel = int(sys.stdin.readline())
|
|
if (sel < 0) or (sel > 65535):
|
|
raise ValueError
|
|
dtype = sel;
|
|
except (TypeError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
pref = None
|
|
if role == "source":
|
|
while pref == None:
|
|
try:
|
|
print("Select a preferred data channel type 1.",)
|
|
print("reliable 2. streaming: ",)
|
|
sel = int(sys.stdin.readline())
|
|
if sel == 1:
|
|
pref = "reliable"
|
|
elif sel == 2:
|
|
pref = "streaming"
|
|
else:
|
|
raise ValueError
|
|
|
|
except (TypeError, ValueError):
|
|
print("Wrong selection, try again")
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
app_path = hdp_manager.CreateApplication({
|
|
"DataType": dbus.types.UInt16(dtype),
|
|
"Role": role,
|
|
"Description": "Test Source",
|
|
"ChannelType": pref})
|
|
else:
|
|
app_path = hdp_manager.CreateApplication({
|
|
"DataType": dbus.types.UInt16(dtype),
|
|
"Description": "Test sink",
|
|
"Role": role})
|
|
|
|
print("New application created:", app_path)
|
|
|
|
con = None
|
|
while con == None:
|
|
try:
|
|
print("Connect to a remote device (y/n)? ",)
|
|
sel = sys.stdin.readline()
|
|
if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
|
|
con = True
|
|
elif sel in ("n\n", "no\n", "N\n", "NO\n"):
|
|
con = False
|
|
else:
|
|
print("Wrong selection, try again.")
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
if not con:
|
|
enter_mainloop()
|
|
sys.exit()
|
|
|
|
manager = dbus.Interface(bus.get_object(BUS_NAME, "/"),
|
|
"org.freedesktop.DBus.ObjectManager")
|
|
|
|
objects = manager.GetManagedObjects()
|
|
adapters = []
|
|
|
|
for path, ifaces in objects.items():
|
|
if ifaces.has_key(ADAPTER_INTERFACE):
|
|
adapters.append(path)
|
|
|
|
i = 1
|
|
for ad in adapters:
|
|
print("%d. %s" % (i, ad))
|
|
i = i + 1
|
|
|
|
print("Select an adapter: ",)
|
|
select = None
|
|
while select == None:
|
|
try:
|
|
pos = int(sys.stdin.readline()) - 1
|
|
if pos < 0:
|
|
raise TypeError
|
|
select = adapters[pos]
|
|
except (TypeError, IndexError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
adapter = dbus.Interface(bus.get_object(BUS_NAME, select), ADAPTER_INTERFACE)
|
|
|
|
devices = []
|
|
for path, interfaces in objects.items():
|
|
if "org.bluez.Device1" not in interfaces:
|
|
continue
|
|
properties = interfaces["org.bluez.Device1"]
|
|
if properties["Adapter"] != select:
|
|
continue;
|
|
|
|
if HEALTH_DEVICE_INTERFACE not in interfaces:
|
|
continue
|
|
devices.append(path)
|
|
|
|
if len(devices) == 0:
|
|
print("No devices available")
|
|
sys.exit()
|
|
|
|
i = 1
|
|
for dev in devices:
|
|
print("%d. %s" % (i, dev))
|
|
i = i + 1
|
|
|
|
print("Select a device: ",)
|
|
select = None
|
|
while select == None:
|
|
try:
|
|
pos = int(sys.stdin.readline()) - 1
|
|
if pos < 0:
|
|
raise TypeError
|
|
select = devices[pos]
|
|
except (TypeError, IndexError, ValueError):
|
|
print("Wrong selection, try again: ",)
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
device = dbus.Interface(bus.get_object(BUS_NAME, select),
|
|
HEALTH_DEVICE_INTERFACE)
|
|
|
|
echo = None
|
|
while echo == None:
|
|
try:
|
|
print("Perform an echo (y/n)? ",)
|
|
sel = sys.stdin.readline()
|
|
if sel in ("y\n", "yes\n", "Y\n", "YES\n"):
|
|
echo = True
|
|
elif sel in ("n\n", "no\n", "N\n", "NO\n"):
|
|
echo = False
|
|
else:
|
|
print("Wrong selection, try again.")
|
|
except KeyboardInterrupt:
|
|
sys.exit()
|
|
|
|
if echo:
|
|
if device.Echo():
|
|
print("Echo was ok")
|
|
else:
|
|
print("Echo war wrong, exiting")
|
|
sys.exit()
|
|
|
|
print("Connecting to device %s" % (select))
|
|
|
|
if role == "source":
|
|
chan = device.CreateChannel(app_path, "reliable")
|
|
else:
|
|
chan = device.CreateChannel(app_path, "any")
|
|
|
|
print(chan)
|
|
|
|
enter_mainloop()
|
|
|
|
hdp_manager.DestroyApplication(app_path)
|