mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2025-01-02 17:43:23 +08:00
7adf8d09b4
Some test scripts use "from gi.repository import GObject" whereas others use "import gobject". gi.repository is not always available on embedded systems, so convert all instances to this format: try: from gi.repository import GObject except ImportError: import gobject as GObject Also, sort the imports in this order: system, dbus, gobject, bluezutils
249 lines
5.6 KiB
Python
Executable File
249 lines
5.6 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
from __future__ import absolute_import, print_function, unicode_literals
|
|
|
|
from optparse import OptionParser, make_option
|
|
import os
|
|
from socket import SOCK_SEQPACKET, socket
|
|
import sys
|
|
import dbus
|
|
import dbus.service
|
|
import dbus.mainloop.glib
|
|
import glib
|
|
try:
|
|
from gi.repository import GObject
|
|
except ImportError:
|
|
import gobject as GObject
|
|
|
|
mainloop = None
|
|
audio_supported = True
|
|
|
|
try:
|
|
from socket import AF_BLUETOOTH, BTPROTO_SCO
|
|
except:
|
|
print("WARNING: python compiled without Bluetooth support"
|
|
" - audio will not be available")
|
|
audio_supported = False
|
|
|
|
BUF_SIZE = 1024
|
|
|
|
BDADDR_ANY = '00:00:00:00:00:00'
|
|
|
|
HF_NREC = 0x0001
|
|
HF_3WAY = 0x0002
|
|
HF_CLI = 0x0004
|
|
HF_VOICE_RECOGNITION = 0x0008
|
|
HF_REMOTE_VOL = 0x0010
|
|
HF_ENHANCED_STATUS = 0x0020
|
|
HF_ENHANCED_CONTROL = 0x0040
|
|
HF_CODEC_NEGOTIATION = 0x0080
|
|
|
|
AG_3WAY = 0x0001
|
|
AG_NREC = 0x0002
|
|
AG_VOICE_RECOGNITION = 0x0004
|
|
AG_INBAND_RING = 0x0008
|
|
AG_VOICE_TAG = 0x0010
|
|
AG_REJECT_CALL = 0x0020
|
|
AG_ENHANCED_STATUS = 0x0040
|
|
AG_ENHANCED_CONTROL = 0x0080
|
|
AG_EXTENDED_RESULT = 0x0100
|
|
AG_CODEC_NEGOTIATION = 0x0200
|
|
|
|
HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
|
|
HF_REMOTE_VOL | HF_ENHANCED_STATUS |
|
|
HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
|
|
|
|
AVAIL_CODECS = "1,2"
|
|
|
|
class HfpConnection:
|
|
slc_complete = False
|
|
fd = None
|
|
io_id = 0
|
|
version = 0
|
|
features = 0
|
|
pending = None
|
|
|
|
def disconnect(self):
|
|
if (self.fd >= 0):
|
|
os.close(self.fd)
|
|
self.fd = -1
|
|
glib.source_remove(self.io_id)
|
|
self.io_id = 0
|
|
|
|
def slc_completed(self):
|
|
print("SLC establisment complete")
|
|
self.slc_complete = True
|
|
|
|
def slc_next_cmd(self, cmd):
|
|
if not cmd:
|
|
self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
|
|
elif (cmd.startswith("AT+BRSF")):
|
|
if (self.features & AG_CODEC_NEGOTIATION and
|
|
HF_FEATURES & HF_CODEC_NEGOTIATION):
|
|
self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
|
|
else:
|
|
self.send_cmd("AT+CIND=?")
|
|
elif (cmd.startswith("AT+BAC")):
|
|
self.send_cmd("AT+CIND=?")
|
|
elif (cmd.startswith("AT+CIND=?")):
|
|
self.send_cmd("AT+CIND?")
|
|
elif (cmd.startswith("AT+CIND?")):
|
|
self.send_cmd("AT+CMER=3,0,0,1")
|
|
elif (cmd.startswith("AT+CMER=")):
|
|
if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
|
|
self.send_cmd("AT+CHLD=?")
|
|
else:
|
|
self.slc_completed()
|
|
elif (cmd.startswith("AT+CHLD=?")):
|
|
self.slc_completed()
|
|
else:
|
|
print("Unknown SLC command completed: %s" % (cmd))
|
|
|
|
def io_cb(self, fd, cond):
|
|
buf = os.read(fd, BUF_SIZE)
|
|
buf = buf.strip()
|
|
|
|
print("Received: %s" % (buf))
|
|
|
|
if (buf == "OK" or buf == "ERROR"):
|
|
cmd = self.pending
|
|
self.pending = None
|
|
|
|
if (not self.slc_complete):
|
|
self.slc_next_cmd(cmd)
|
|
|
|
return True
|
|
|
|
parts = buf.split(':')
|
|
|
|
if (parts[0] == "+BRSF"):
|
|
self.features = int(parts[1])
|
|
|
|
return True
|
|
|
|
def send_cmd(self, cmd):
|
|
if (self.pending):
|
|
print("ERROR: Another command is pending")
|
|
return
|
|
|
|
print("Sending: %s" % (cmd))
|
|
|
|
os.write(self.fd, cmd + "\r\n")
|
|
self.pending = cmd
|
|
|
|
def __init__(self, fd, version, features):
|
|
self.fd = fd
|
|
self.version = version
|
|
self.features = features
|
|
|
|
print("Version 0x%04x Features 0x%04x" % (version, features))
|
|
|
|
self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
|
|
|
|
self.slc_next_cmd(None)
|
|
|
|
class HfpProfile(dbus.service.Object):
|
|
sco_socket = None
|
|
io_id = 0
|
|
conns = {}
|
|
|
|
def sco_cb(self, sock, cond):
|
|
(sco, peer) = sock.accept()
|
|
print("New SCO connection from %s" % (peer))
|
|
|
|
def init_sco(self, sock):
|
|
self.sco_socket = sock
|
|
self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
|
|
|
|
def __init__(self, bus, path, sco):
|
|
dbus.service.Object.__init__(self, bus, path)
|
|
|
|
if sco:
|
|
self.init_sco(sco)
|
|
|
|
@dbus.service.method("org.bluez.Profile1",
|
|
in_signature="", out_signature="")
|
|
def Release(self):
|
|
print("Release")
|
|
mainloop.quit()
|
|
|
|
@dbus.service.method("org.bluez.Profile1",
|
|
in_signature="", out_signature="")
|
|
def Cancel(self):
|
|
print("Cancel")
|
|
|
|
@dbus.service.method("org.bluez.Profile1",
|
|
in_signature="o", out_signature="")
|
|
def RequestDisconnection(self, path):
|
|
conn = self.conns.pop(path)
|
|
conn.disconnect()
|
|
|
|
@dbus.service.method("org.bluez.Profile1",
|
|
in_signature="oha{sv}", out_signature="")
|
|
def NewConnection(self, path, fd, properties):
|
|
fd = fd.take()
|
|
version = 0x0105
|
|
features = 0
|
|
print("NewConnection(%s, %d)" % (path, fd))
|
|
for key in properties.keys():
|
|
if key == "Version":
|
|
version = properties[key]
|
|
elif key == "Features":
|
|
features = properties[key]
|
|
|
|
conn = HfpConnection(fd, version, features)
|
|
|
|
self.conns[path] = conn
|
|
|
|
if __name__ == '__main__':
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
bus = dbus.SystemBus()
|
|
|
|
manager = dbus.Interface(bus.get_object("org.bluez",
|
|
"/org/bluez"), "org.bluez.ProfileManager1")
|
|
|
|
option_list = [
|
|
make_option("-p", "--path", action="store",
|
|
type="string", dest="path",
|
|
default="/bluez/test/hfp"),
|
|
make_option("-n", "--name", action="store",
|
|
type="string", dest="name",
|
|
default=None),
|
|
make_option("-C", "--channel", action="store",
|
|
type="int", dest="channel",
|
|
default=None),
|
|
]
|
|
|
|
parser = OptionParser(option_list=option_list)
|
|
|
|
(options, args) = parser.parse_args()
|
|
|
|
mainloop = GObject.MainLoop()
|
|
|
|
opts = {
|
|
"Version" : dbus.UInt16(0x0106),
|
|
"Features" : dbus.UInt16(HF_FEATURES),
|
|
}
|
|
|
|
if (options.name):
|
|
opts["Name"] = options.name
|
|
|
|
if (options.channel is not None):
|
|
opts["Channel"] = dbus.UInt16(options.channel)
|
|
|
|
if audio_supported:
|
|
sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
|
|
sco.bind(BDADDR_ANY)
|
|
sco.listen(1)
|
|
else:
|
|
sco = None
|
|
|
|
profile = HfpProfile(bus, options.path, sco)
|
|
|
|
manager.RegisterProfile(options.path, "hfp-hf", opts)
|
|
|
|
print("Profile registered - waiting for connections")
|
|
|
|
mainloop.run()
|