import ble
DEV_NAME ::= "O2Ring"
SCAN_DURATION ::= Duration --s=3
find_by_name central/ble.Central name/string:
central.scan --duration=SCAN_DURATION: | device/ble.RemoteScannedDevice |
if device.data.name and device.data.name.contains name:
return device.address
throw "no ring device found"
main:
adapter := ble.Adapter
central := adapter.central
address := find_by_name central DEV_NAME
remote_device := central.connect address
print remote_device.address
services := remote_device.discover_services
//0x0d BLE_HS_ETIMEOUT Operation timed out.
// EXCEPTION error.
// NimBLE error, Type: host, error code: 0x0d. See https://gist.github.com/mikkeldamsgaard/0857ce6a8b073a52...
// 0: ble_get_error_ <sdk>/ble.toit:980:3
// 1: Resource_.throw_error_ <sdk>/ble.toit:811:5
discover_services
def on_detection(self, device, advertisement_data):
if device.address not in self.devices:
name = device.name or device.address
uuids = device.metadata["uuids"] if "uuids" in device.metadata else None
if self.verbose > 4 and device.address not in self.pipe_down:
print(f"Considering {device.address} {name} {uuids}")
self.pipe_down.append(device.address)
valid = False
if uuids is not None and BLE_MATCH_UUID in uuids and BLE_SERVICE_UUID in uuids:
valid = True
else:
# We might not have the list of UUIDs yet, so also check by name
names = ("Checkme_O2", "CheckO2", "SleepU", "SleepO2", "O2Ring", "WearO2", "KidsO2", "BabyO2", "Oxylink")
for n in names:
if n in name:
if self.verbose > 1:
print(f"Found device by name: {n}")
valid = True
break
if not valid:
return
print(f"Adding device {device.address}")
dev = O2BTDevice(address_or_ble_device=device, timeout=10.0, disconnected_callback=O2BTDevice.on_disconnect)
async def _go_get_services(self):
if self.disconnect_pending or not self.is_connected:
return
# services = await self.get_services()
if self.manager.verbose > 1:
print(f"[{self.name}] Resolved services")
for service in self.services:
print(f"[{self.name}]\tService [{service.uuid}]")
for characteristic in service.characteristics:
print(f"[{self.name}]\t\tCharacteristic [{characteristic.uuid}]")
for descriptor in characteristic.descriptors:
value = await self.read_gatt_descriptor(descriptor.handle)
print(f"[{self.name}]\t\t\tDescriptor [{descriptor.uuid}] ({value})")
for s in self.services:
if s.uuid == BLE_SERVICE_UUID:
for c in s.characteristics:
if c.uuid == BLE_READ_UUID:
asyncio.ensure_future(self._go_enable_notifications(c))
elif c.uuid == BLE_WRITE_UUID:
self.write = c
bleak
connect
self
BleakClient
class BleakClient:
"""The Client interface for connecting to a specific BLE GATT server and communicating with it.
A BleakClient can be used as an asynchronous context manager in which case it automatically
connects and disconnects.
bleak/backends/corebluetooth/PeripheralDelegate.py
peripheral.discoverServices_(None)
CoreBluetooth
discover_services
remote_device := central.connect address
print remote_device.address
services := remote_device.discover_services
//0x0d BLE_HS_ETIMEOUT Operation timed out.
// EXCEPTION error.
// NimBLE error, Type: host, error code: 0x0d. See https://gist.github.com/mikkeldamsgaard/0857ce6a8b073a52...
// 0: ble_get_error_ <sdk>/ble.toit:980:3
// 1: Resource_.throw_error_ <sdk>/ble.toit:811:5
toit.run
~/.cache/jaguar/sdk
toit.run
~/.cache/jaguar/sdk/bin/toit.run
-d host
jag run -d host foo.toit
jag run -d host blescan.toit
cm started
B6DF069F-1B4D-A7B1-984A-FAFC71D6760A
print services
jag run -d host blescan.toit
cm started
B6DF069F-1B4D-A7B1-984A-FAFC71D6760A
[an instance with class-id 47, an instance with class-id 47, an instance with class-id 47]
an instance with class-id 47
stringify
~/.cache/jaguar/sdk/bin/toit.compile -w /tmp/out.snapshot your_program.toit
~/.cache/jaguar/sdk/tools/toitp -c /tmp/out.snapshot
jag run
an instance with class-id XX
write
#ifdef ESP_PLATFORM
NimBLEDevice::setPower(ESP_PWR_LVL_P9); /** +9db */
#else
NimBLEDevice::setPower(9); /** +9db */
#endif
⸮⸮�⸮�⸮⸮⸮�⸮���⸮�⸮⸮⸮⸮⸮�⸮⸮�������������⸮���⸮��������⸮��������⸮�⸮⸮⸮�⸮⸮�⸮⸮��⸮�⸮⸮��⸮���⸮⸮�⸮�⸮⸮�⸮f%$6⸮0P\⸮'⸮⸮�E (243) esp_core_dump_flash: a⸮ֽ⸮ɕ⸮сsize of core dump image: 0
Starting NimBLE Client
Advertised Device found: Name: , Address: c5:d2:2d:0f:1f:0c, manufacturer data: 4c0012020002
Advertised Device found: Name: O2Ring 0543, Address: c6:d1:08:ef:8a:4d, manufacturer data: 4ef300
Found Our Service
Scan Ended
New client created
Connected
Connected to: c6:d1:08:ef:8a:4d
RSSI: -54
Found our write characteristic
8b00ace7-eb0b-49b0-bbe9-9aee0a26e1a3
Done with this device!
Success! we should now be getting notifications, scanning for more!
#define CONFIG_NIMBLE_CPP_LOG_LEVEL 4
Core
DEBUG
sleep --ms=500
discover_services
[jaguar] INFO: program 73549571-68fd-594e-80c7-a2904b16810e started
#[0x01, 0xc6, 0xd1, 0x08, 0xef, 0x8a, 0x4d]
[an instance with class-id 48]
[an instance with class-id 45]
[jaguar] INFO: program 73549571-68fd-594e-80c7-a2904b16810e stopped
wait_for_notification
subscribe
b'\xaa\x17\xe8\x00\x00\x00\x00\x1b'
b'aa17e8000000001b'
[O2Ring 0543] Sending aa17e8000000001b
Writing b'\xaa\x17\xe8\x00\x00\x00\x00\x1b' up to 20 b'\xaa\x17\xe8\x00\x00\x00\x00\x1b'
[O2Ring 0543] Characteristic 8b00ace7-eb0b-49b0-bbe9-9aee0a26e1a3 write value performed
[O2Ring 0543] Characteristic 0734594a-a8e7-4b1a-a6b1-cd5243059a57 updated: 5500ff00000d0061320000000000340000120100
o2pkt recv: 5500ff00000d0061320000000000340000120100
want 21 have 20
[O2Ring 0543] Need more data
[O2Ring 0543] Characteristic 0734594a-a8e7-4b1a-a6b1-cd5243059a57 updated: 07
o2pkt recv: 07
[O2Ring 0543] Final recv: 5500ff00000d006132000000000034000012010007
[O2Ring 0543] 61320000000000340000120100
[O2Ring 0543] SpO2 97%, HR 50 bpm, Perfusion Idx 18, motion 0, batt 52%
characteristics.subscribe // This will subscribe to notifications on the characteristic. The call returns immediately
while true:
notification := characteristics.wait_for_notification // This call blocks until a notification is received. The notification is a ByteArray
print "I received a notification: $notification"
characteristics.subscribe
task::
while true:
notification := characteristics.wait_for_notification
unsubscribe
wait_for_notification
while true:
write_characteristic.write #[0xaa, 0x1b, 0xe4, 0x00, 0x00, 0x01, 0x00, 0x00, 0x5e] // Realtime
first_block := subscribe_characteristic.wait_for_notification //Get first packet
msg_length := first_block[5]
total_length := msg_length + 8
print "$total_length @ $Time.now.utc"
print (hex.encode first_block)
//You'll get 20 bytes at a time
(total_length / 20 ).to_float.floor.to_int.repeat:
notificationline := subscribe_characteristic.wait_for_notification
print (hex.encode notificationline)
sleep --ms=800
/**
Waits until the remote device sends a notification or indication on the characteristics. Returns the
notified/indicated value.
See $subscribe.
*/
wait_for_notification -> ByteArray?:
if properties & (CHARACTERISTIC_PROPERTY_INDICATE | CHARACTERISTIC_PROPERTY_NOTIFY) == 0:
throw "Characteristic does not support notifications or indications"
while true:
resource_state_.clear_state VALUE_DATA_READY_EVENT_
buf := ble_get_value_ resource_
if buf: return buf
state := resource_state_.wait_for_state VALUE_DATA_READY_EVENT_ | VALUE_DATA_READ_FAILED_EVENT_ | DISCONNECTED_EVENT_
if state & VALUE_DATA_READ_FAILED_EVENT_ != 0: throw_error_
if state & DISCONNECTED_EVENT_ != 0: throw "Disconnected"
with_timeout --ms=4000:
data := char.wait_for_notification