连接到系统总线
busbus = dbus.SystemBus()
获得目前机器上有的适配器,改了一个函数,把busbus
传进去,会返回一个列表,里面每个都是一个对象,他的顺序是倒着的
dbus 可以导出对象供其他应用程序使用,要与远程对象进行交互需要使用代理对象get_object
获取代理对象
dbus 的接口是一组相关的方法和信号,使用 dbus.Interface(远程对象, 接口名称) 获得
GetManagedObjects() 可以获得所有对象和属性
1 2 3 4 5 6 7 8 9 10 11 12
| def find_adapter(bus): remote_om = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = remote_om.GetManagedObjects() adapters = [] for o, props in objects.items(): if "org.bluez.GattManager1" in props.keys(): adapters.append(o) print(adapters) if len(adapters) > 0: return adapters
return None
|
这样就把适配器选出来了
1 2 3
| adapter_obj = bus.get_object("org.bluez", adapter) adapter_props = dbus.Interface(adapter_obj, "org.freedesktop.DBus.Properties") adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(1))
|
这里就涉及到 bluez 的 api 了,在 adapter-api 处定义了
#3 这个是打开适配器,效果应该是hciconfig hci0 up
Advertising
开头附 bluez 定义好的广播类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| class Advertisement(dbus.service.Object): PATH_BASE = "/org/bluez/example/advertisement"
def __init__(self, bus, index, advertising_type): self.path = self.PATH_BASE + str(index) self.bus = bus self.ad_type = advertising_type self.service_uuids = None self.manufacturer_data = None self.solicit_uuids = None self.service_data = None self.local_name = None self.include_tx_power = None self.data = None dbus.service.Object.__init__(self, bus, self.path)
def get_properties(self): properties = dict() properties["Type"] = self.ad_type if self.service_uuids is not None: properties["ServiceUUIDs"] = dbus.Array(self.service_uuids, signature="s") if self.solicit_uuids is not None: properties["SolicitUUIDs"] = dbus.Array(self.solicit_uuids, signature="s") if self.manufacturer_data is not None: properties["ManufacturerData"] = dbus.Dictionary( self.manufacturer_data, signature="qv" ) if self.service_data is not None: properties["ServiceData"] = dbus.Dictionary( self.service_data, signature="sv" ) if self.local_name is not None: properties["LocalName"] = dbus.String(self.local_name) if self.include_tx_power is not None: properties["IncludeTxPower"] = dbus.Boolean(self.include_tx_power)
if self.data is not None: properties["Data"] = dbus.Dictionary(self.data, signature="yv") return {LE_ADVERTISEMENT_IFACE: properties}
def get_path(self): return dbus.ObjectPath(self.path)
def add_service_uuid(self, uuid): if not self.service_uuids: self.service_uuids = [] self.service_uuids.append(uuid)
def add_solicit_uuid(self, uuid): if not self.solicit_uuids: self.solicit_uuids = [] self.solicit_uuids.append(uuid)
def add_manufacturer_data(self, manuf_code, data): if not self.manufacturer_data: self.manufacturer_data = dbus.Dictionary({}, signature="qv") self.manufacturer_data[manuf_code] = dbus.Array(data, signature="y")
def add_service_data(self, uuid, data): if not self.service_data: self.service_data = dbus.Dictionary({}, signature="sv") self.service_data[uuid] = dbus.Array(data, signature="y")
def add_local_name(self, name): if not self.local_name: self.local_name = "" self.local_name = dbus.String(name)
def add_data(self, ad_type, data): if not self.data: self.data = dbus.Dictionary({}, signature="yv") self.data[ad_type] = dbus.Array(data, signature="y")
@dbus.service.method(DBUS_PROP_IFACE, in_signature="s", out_signature="a{sv}") def GetAll(self, interface): logger.info("GetAll") if interface != LE_ADVERTISEMENT_IFACE: raise InvalidArgsException() logger.info("returning props") return self.get_properties()[LE_ADVERTISEMENT_IFACE]
@dbus.service.method(LE_ADVERTISEMENT_IFACE, in_signature="", out_signature="") def Release(self): logger.info("%s: Released!" % self.path)
|
获取到 Advertising 的管理的接口
1
| ad_manager = dbus.Interface(adapter_obj, "org.bluez.LEAdvertisingManager1")
|
然后先使用已经定义好的广播类,去创建一个广播对象
1 2 3 4 5
| advertisement = Advertisement(bus, 0, "peripheral") advertisement.add_manufacturer_data(0x038f, [0x11, 0x22],) advertisement.add_service_uuid("9fdc9c81-fffe-51a1-e511-5a38c414c2f9") advertisement.add_local_name("BLUE-F045DAF3F9CC") advertisement.include_tx_power = True
|
然后 RegisterAdvertisement 注册和使能广播接口,需要告诉 bluez 我们的 advertisement 在哪里,并且设置应答或错误处理函数
1 2 3
| mainloop = MainLoop() ad_manager.RegisterAdvertisement(advertisement.get_path(),{},reply_handler=register_ad_cb,error_handler=register_ad_error_cb,) mainloop.run()
|
成功克隆出一个可以欺骗 APP 的广播包(他检测的是 service_uuid)
GATT
首先注册 GATT 的管理接口
1
| service_manager = dbus.Interface(adapter_obj, "org.bluez.GattManager1")
|
定义一个 application,定义一个 service,给 service 添加 characteristic 然后把 service 给 application
1 2 3 4
| app = Application(bus) service = Service(bus, 2, "12634d89-d598-4874-8e86-7d042ee07ba7", True) service.add_characteristic(TestCharacteristic(bus, 0, "12634d89-d598-4874-8e86-7d042ee07ba7", ["encrypt-read", "encrypt-write"], service)) app.add_service(service)
|
为了方便,直接定义了一个 TestCharacteristic 类,继承自官方示例中的 Characteristic 类
1 2 3 4 5 6 7 8 9 10 11 12
| class TestCharacteristic(Characteristic): def __init__(self, bus, index, uuid, flags, service): Characteristic.__init__( self, bus, index, uuid, flags, service, ) self.value = [0xFF]
def ReadValue(self, options): return self.value
def WriteValue(self, value, options): self.value = value
|
最后注册 GATT,这样就可以成功识别到特性了,读写这类的属性也可以显示出来
1
| service_manager.RegisterApplication(app.get_path(),{},reply_handler=register_app_cb,error_handler=[register_app_error_cb],)
|
接下来解决数据收发的问题
问题备忘
如何检测外设是否可连接(connectable)
广播报文如何定义可连接与不可连接
gatt 如何设置不配对仅连接,以及如何设置各种配对的 IO 能力
参考:https://github.com/PunchThrough/espresso-ble
最后更新时间: