mirror of
https://git.kernel.org/pub/scm/bluetooth/bluez.git
synced 2025-01-24 21:33:27 +08:00
122 lines
3.2 KiB
Python
Executable File
122 lines
3.2 KiB
Python
Executable File
#!/usr/bin/python
|
|
|
|
import sys
|
|
import dbus
|
|
import gobject
|
|
import dbus.mainloop.glib
|
|
import os.path
|
|
from optparse import OptionParser
|
|
|
|
def parse_options():
|
|
parser.add_option("-d", "--device", dest="device",
|
|
help="Device to connect", metavar="DEVICE")
|
|
parser.add_option("-p", "--pull", dest="pull_to_file",
|
|
help="Pull vcard and store in FILE", metavar="FILE")
|
|
parser.add_option("-s", "--send", dest="send_file",
|
|
help="Send FILE", metavar="FILE")
|
|
parser.add_option("-v", "--verbose", action="store_true",
|
|
dest="verbose")
|
|
|
|
return parser.parse_args()
|
|
|
|
class OppClient:
|
|
def __init__(self, session_path, verbose=False):
|
|
self.progress = 0
|
|
self.transfer_path = None
|
|
self.verbose = verbose
|
|
bus = dbus.SessionBus()
|
|
obj = bus.get_object("org.bluez.obex.client", session_path)
|
|
self.session = dbus.Interface(obj, "org.bluez.obex.Session")
|
|
self.opp = dbus.Interface(obj, "org.bluez.obex.ObjectPush")
|
|
bus.add_signal_receiver(self.transfer_complete,
|
|
dbus_interface="org.bluez.obex.Transfer",
|
|
signal_name="Complete",
|
|
path_keyword="path")
|
|
bus.add_signal_receiver(self.transfer_error,
|
|
dbus_interface="org.bluez.obex.Transfer",
|
|
signal_name="Error",
|
|
path_keyword="path")
|
|
if self.verbose:
|
|
bus.add_signal_receiver(self.transfer_progress,
|
|
dbus_interface="org.bluez.obex.Transfer",
|
|
signal_name="PropertyChanged",
|
|
path_keyword="path")
|
|
|
|
def create_transfer_reply(self, reply):
|
|
(path, properties) = reply
|
|
self.transfer_path = path
|
|
self.transfer_size = properties["Size"]
|
|
if self.verbose:
|
|
print "Transfer created: %s" % path
|
|
|
|
def error(self, err):
|
|
print err
|
|
mainloop.quit()
|
|
|
|
def transfer_complete(self, path):
|
|
if path != self.transfer_path:
|
|
return
|
|
if self.verbose:
|
|
print "Transfer finished"
|
|
mainloop.quit()
|
|
|
|
def transfer_error(self, code, message, path):
|
|
if path != self.transfer_path:
|
|
return
|
|
print "Transfer finished with error %s: %s" % (code, message)
|
|
mainloop.quit()
|
|
|
|
def transfer_progress(self, prop, value, path):
|
|
if path != self.transfer_path:
|
|
return
|
|
|
|
if prop != "Progress":
|
|
return
|
|
|
|
speed = (value - self.progress) / 1000
|
|
print "Transfer progress %d/%d at %d kBps" % (value,
|
|
self.transfer_size,
|
|
speed)
|
|
self.progress = value
|
|
|
|
def pull_business_card(self, filename):
|
|
self.opp.PullBusinessCard(os.path.abspath(filename),
|
|
reply_handler=self.create_transfer_reply,
|
|
error_handler=self.error)
|
|
|
|
def send_file(self, filename):
|
|
self.opp.SendFile(os.path.abspath(filename),
|
|
reply_handler=self.create_transfer_reply,
|
|
error_handler=self.error)
|
|
|
|
if __name__ == '__main__':
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
|
|
parser = OptionParser()
|
|
|
|
(options, args) = parse_options()
|
|
|
|
if not options.device:
|
|
parser.print_help()
|
|
sys.exit(0)
|
|
|
|
bus = dbus.SessionBus()
|
|
mainloop = gobject.MainLoop()
|
|
|
|
client = dbus.Interface(bus.get_object("org.bluez.obex.client", "/"),
|
|
"org.bluez.obex.Client")
|
|
|
|
print "Creating Session"
|
|
path = client.CreateSession(options.device, { "Target": "OPP" })
|
|
|
|
opp_client = OppClient(path, options.verbose)
|
|
|
|
if options.pull_to_file:
|
|
opp_client.pull_business_card(options.pull_to_file)
|
|
|
|
if options.send_file:
|
|
opp_client.send_file(options.send_file)
|
|
|
|
mainloop.run()
|