连接到系统总线
busbus = dbus.SystemBus()
获得目前机器上有的适配器,改了一个函数,把busbus传进去,会返回一个列表,里面每个都是一个对象,他的顺序是倒着的
dbus 可以导出对象供其他应用程序使用,要与远程对象进行交互需要使用代理对象get_object获取代理对象
dbus 的接口是一组相关的方法和信号,使用 dbus.Interface(远程对象, 接口名称) 获得
GetManagedObjects() 可以获得所有对象和属性

1
2
3
4
5
6
7
8
9
10
11
12
def find_adapter(bus):
remote_om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = remote_om.GetManagedObjects()
adapters = []
for o, props in objects.items():
if "org.bluez.GattManager1" in props.keys():
adapters.append(o)
print(adapters)
if len(adapters) > 0:
return adapters

return None
image.png
image.png

这样就把适配器选出来了

1
2
3
adapter_obj = bus.get_object("org.bluez", adapter)
adapter_props = dbus.Interface(adapter_obj, "org.freedesktop.DBus.Properties")
adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))

这里就涉及到 bluez 的 api 了,在 adapter-api 处定义了
#3 这个是打开适配器,效果应该是hciconfig hci0 up

image.png
image.png

Advertising

开头附 bluez 定义好的广播类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
class Advertisement(dbus.service.Object):
PATH_BASE = "/org/bluez/example/advertisement"

def __init__(self, bus, index, advertising_type):
self.path = self.PATH_BASE + str(index)
self.bus = bus
self.ad_type = advertising_type
self.service_uuids = None
self.manufacturer_data = None
self.solicit_uuids = None
self.service_data = None
self.local_name = None
self.include_tx_power = None
self.data = None
dbus.service.Object.__init__(self, bus, self.path)

def get_properties(self):
properties = dict()
properties["Type"] = self.ad_type
if self.service_uuids is not None:
properties["ServiceUUIDs"] = dbus.Array(self.service_uuids, signature="s")
if self.solicit_uuids is not None:
properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids, signature="s")
if self.manufacturer_data is not None:
properties["ManufacturerData"] = dbus.Dictionary(
self.manufacturer_data, signature="qv"
)
if self.service_data is not None:
properties["ServiceData"] = dbus.Dictionary(
self.service_data, signature="sv"
)
if self.local_name is not None:
properties["LocalName"] = dbus.String(self.local_name)
if self.include_tx_power is not None:
properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)

if self.data is not None:
properties["Data"] = dbus.Dictionary(self.data, signature="yv")
return {LE_ADVERTISEMENT_IFACE: properties}

def get_path(self):
return dbus.ObjectPath(self.path)

def add_service_uuid(self, uuid):
if not self.service_uuids:
self.service_uuids = []
self.service_uuids.append(uuid)

def add_solicit_uuid(self, uuid):
if not self.solicit_uuids:
self.solicit_uuids = []
self.solicit_uuids.append(uuid)

def add_manufacturer_data(self, manuf_code, data):
if not self.manufacturer_data:
self.manufacturer_data = dbus.Dictionary({}, signature="qv")
self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y")

def add_service_data(self, uuid, data):
if not self.service_data:
self.service_data = dbus.Dictionary({}, signature="sv")
self.service_data[uuid] = dbus.Array(data, signature="y")

def add_local_name(self, name):
if not self.local_name:
self.local_name = ""
self.local_name = dbus.String(name)

def add_data(self, ad_type, data):
if not self.data:
self.data = dbus.Dictionary({}, signature="yv")
self.data[ad_type] = dbus.Array(data, signature="y")

@dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}")
def GetAll(self, interface):
logger.info("GetAll")
if interface != LE_ADVERTISEMENT_IFACE:
raise InvalidArgsException()
logger.info("returning props")
return self.get_properties()[LE_ADVERTISEMENT_IFACE]

@dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature="", out_signature="")
def Release(self):
logger.info("%s: Released!" % self.path)

获取到 Advertising 的管理的接口

1
ad_manager = dbus.Interface(adapter_obj, "org.bluez.LEAdvertisingManager1")

然后先使用已经定义好的广播类,去创建一个广播对象

1
2
3
4
5
advertisement = Advertisement(bus, 0, "peripheral")
advertisement.add_manufacturer_data(0x038f, [0x11, 0x22],) #这里下面就是设置广播数据了
advertisement.add_service_uuid("9fdc9c81-fffe-51a1-e511-5a38c414c2f9")
advertisement.add_local_name("BLUE-F045DAF3F9CC")
advertisement.include_tx_power = True

然后 RegisterAdvertisement 注册和使能广播接口,需要告诉 bluez 我们的 advertisement 在哪里,并且设置应答或错误处理函数

1
2
3
mainloop = MainLoop()
ad_manager.RegisterAdvertisement(advertisement.get_path(),{},reply_handler=register_ad_cb,error_handler=register_ad_error_cb,)
mainloop.run()

成功克隆出一个可以欺骗 APP 的广播包(他检测的是 service_uuid)

image.png
image.png
image.png
image.png

GATT

首先注册 GATT 的管理接口

1
service_manager = dbus.Interface(adapter_obj, "org.bluez.GattManager1")

定义一个 application,定义一个 service,给 service 添加 characteristic 然后把 service 给 application

1
2
3
4
app = Application(bus)
service = Service(bus, 2, "12634d89-d598-4874-8e86-7d042ee07ba7", True)
service.add_characteristic(TestCharacteristic(bus, 0, "12634d89-d598-4874-8e86-7d042ee07ba7", ["encrypt-read", "encrypt-write"], service))
app.add_service(service)

为了方便,直接定义了一个 TestCharacteristic 类,继承自官方示例中的 Characteristic 类

1
2
3
4
5
6
7
8
9
10
11
12
class TestCharacteristic(Characteristic):
def __init__(self, bus, index, uuid, flags, service):
Characteristic.__init__(
self, bus, index, uuid, flags, service,
)
self.value = [0xFF]

def ReadValue(self, options):
return self.value

def WriteValue(self, value, options):
self.value = value

最后注册 GATT,这样就可以成功识别到特性了,读写这类的属性也可以显示出来

1
service_manager.RegisterApplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=[register_app_error_cb],)
image.png
image.png

接下来解决数据收发的问题

问题备忘

如何检测外设是否可连接(connectable)
广播报文如何定义可连接与不可连接
gatt 如何设置不配对仅连接,以及如何设置各种配对的 IO 能力

参考:https://github.com/PunchThrough/espresso-ble