From e725ebe87e407682868897e834fde6da01ec0797 Mon Sep 17 00:00:00 2001 From: stark <18166451309@163.com> Date: Tue, 1 Oct 2024 07:41:15 +0800 Subject: [PATCH] v1 --- ai_smell.py | 48 ++++++++++++ bluetooth.py | 197 +++++++++++++++++++++++++++++++++++++++++++++++++ find.py | 40 ++++++++++ mqtt_client.py | 55 ++++++++++++++ 4 files changed, 340 insertions(+) create mode 100644 ai_smell.py create mode 100644 bluetooth.py create mode 100644 find.py create mode 100644 mqtt_client.py diff --git a/ai_smell.py b/ai_smell.py new file mode 100644 index 0000000..ec34c83 --- /dev/null +++ b/ai_smell.py @@ -0,0 +1,48 @@ +import requests +import json +import ast + +def get_ai_code(url, headers, data): + # 将字典转换为JSON字符串 + payload = json.dumps(data) + + # 发送POST请求 + response = requests.post(url, headers=headers, data=payload) + + # 检查响应状态码是否为200 + if response.status_code != 200: + # 如果状态码不是200,则抛出异常或记录错误信息 + print(f"请求失败,状态码:{response.status_code}") + else: + # 打印响应的状态码和内容 + message = json.loads(response.text) + code = message["data"]["code"] + print(code) + # 提取列表部分 + list_part = code[code.find('['):code.find(']')+1] + # 使用 ast.literal_eval 安全地解析字符串 + data = ast.literal_eval(list_part) + scent_channel = [item.get("channelId") for item in data] + play_time = [item.get("time") for item in data] + print(message["data"]["description"]) + print(message["data"]["remark"]) + return scent_channel, play_time + + +if __name__ == "__main__": + # 定义请求的URL + url = "https://ai.qiweiwangguo.com/api/conversation" + + # 定义请求头,包括Content-Type + headers = { + "Content-Type": "application/json" + } + + # 定义要发送的数据 + data = { + "names": "桂花", + "scents": "我想静静", + "message": "请使用两种气味,调出一个可以缓解疲劳的气味。" + } + get_ai_code(url, headers, data) + diff --git a/bluetooth.py b/bluetooth.py new file mode 100644 index 0000000..8cae8ce --- /dev/null +++ b/bluetooth.py @@ -0,0 +1,197 @@ +import asyncio +from bleak import BleakClient, BleakScanner +from bleak.backends.characteristic import BleakGATTCharacteristic +from binascii import unhexlify +from crcmod import crcmod +import threading +from mqtt_client import sub_run +from queue import LifoQueue +import ast + +def crc16Add(str_data): + crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000) + data = str_data.replace(" ", "") + readcrcout = hex(crc16(unhexlify(data))).upper() + str_list = list(readcrcout) + if len(str_list) < 6: + str_list.insert(2, '0'*(6-len(str_list))) # 位数不足补0 + crc_data = "".join(str_list) + return crc_data[2:4]+' '+crc_data[4:] + + +def ten2sixteen(num, length): + """ + 十进制转十六进制 + :param num: 十进制数字 + :param length: 字节长度 + :return: + """ + data = str(hex(eval(str(num))))[2:] + data_len = len(data) + if data_len % 2 == 1: + data = '0' + data + data_len += 1 + + sixteen_str = "00 " * (length - data_len//2) + data[0:2] + ' ' + data[2:] + return sixteen_str.strip() + + +def cmd2bytearray(cmd_str: str): + verify = crc16Add(cmd_str) + cmd = FRAME_HEAD + ' ' + cmd_str + ' ' + verify + ' ' + FRAME_TAIL + print(cmd) + return bytearray.fromhex(cmd) + + +def device_capluse(): + """ + 获取设备气路胶囊信息 + :return: + """ + cmd_data = '00 00 00 01 0E 01 06 00 00' + return cmd2bytearray(cmd_data) + + +def start_play(scent: int, playtime: int): + play_cmd = '00 00 00 01 02 05' + scent_channel = ten2sixteen(scent, 1) + if playtime == 0: # 一直播放 + playtime16 = 'FF FF FF FF' + else: + playtime16 = ten2sixteen(playtime, 4) + cmd_data = play_cmd + ' ' + scent_channel + ' ' + playtime16 + return cmd2bytearray(cmd_data) + +def multiple_play(scent_channels, play_times): + data = ["00"] * 6 + for i in range(len(scent_channels)): + data[scent_channels[i] - 1] = "FF" if int(play_times[i]/100) > 255 else ten2sixteen(int(play_times[i]/100), 1) + play_cmd = "00 00 00 01 13 06 " + " ".join(data) + return cmd2bytearray(play_cmd) +def stop_play(): + """ + 停止播放 + :return: + """ + stop_cmd = '00 00 00 01 00 01 00' + return cmd2bytearray(stop_cmd) + + +def status_check(): + """ + 检查工作状态 + :return: + """ + status_cmd = '00 00 00 01 11 01 00 00 00' + return cmd2bytearray(status_cmd) +def msg_decoder(msg): + list_part = msg[msg.find('['):msg.find(']')+1] + # 使用 ast.literal_eval 安全地解析字符串 + data = ast.literal_eval(list_part) + scent_channels = [item.get("channelId") for item in data] + play_times = [item.get("time") for item in data] + return scent_channels, play_times + +async def get_services(mac_address: str): + async with BleakClient(mac_address) as client: + svcs = client.services + return svcs + +async def find_and_get_services(device_name: str): + # 发现所有设备 + devices = await BleakScanner.discover() + + # 遍历设备列表,查找目标设备 + for device in devices: + if device.name == device_name: + print(f"Found device: {device.name}, MAC Address: {device.address}") + + # 获取服务 + services = await get_services(device.address) + if services: + last_service = services.services[40] + return { + "device_name": device.name, + "mac_address": device.address, + "last_service_uuid": last_service.uuid, + } + else: + print("No services found for the device.") + return None + + # 如果没有找到设备 + print(f"Device with name '{device_name}' not found.") + return None + +# 监听回调函数,此处为打印消息 +def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray): + # print("rev data:", data) + print("rev data bytes2hex:", ' '.join(['%02x' % b for b in data])) + + +async def main(): + print("starting scan...") + # 基于MAC地址查找设备 + information = await find_and_get_services(device_name) + par_device_addr, svc_uuid = information["mac_address"], information["last_service_uuid"] + # 设备的Characteristic UUID + uuid_head = ["6e400003"] + # 设备的Characteristic UUID(具备写属性Write) + uuid_head_write = ["6e400002"] + uuid = svc_uuid.split('-')[1:] + par_notification_characteristic = "-".join(uuid_head+uuid) + par_write_characteristic = "-".join(uuid_head_write+uuid) + device = await BleakScanner.find_device_by_address( + par_device_addr, cb=dict(use_bdaddr=False) + ) + if device is None: + print("could not find device with address '%s'" % par_device_addr) + return + + # 事件定义 + disconnected_event = asyncio.Event() + + # 断开连接事件回调 + def disconnected_callback(client): + print("Disconnected callback called!") + disconnected_event.set() + + print("connecting to device...") + async with BleakClient(device, disconnected_callback=disconnected_callback) as client: + print("Connected") + await client.start_notify(par_notification_characteristic, notification_handler) + + await client.write_gatt_char(par_write_characteristic, device_capluse()) # 获取设备气路胶囊信息 + await asyncio.sleep(2.0) + + while True: + msg = queue.get() + if msg == "0": + break + scent_channels, play_times = msg_decoder(msg) + print(scent_channels, play_times) + if len(scent_channels) == 1: + await client.write_gatt_char(par_write_characteristic, start_play(scent_channels[0], play_times[0])) # 发送开始播放指令 + await asyncio.sleep(play_times[0] / 1000) + else: + await client.write_gatt_char(par_write_characteristic, multiple_play(scent_channels, play_times)) + await asyncio.sleep(max(play_times) / 1000) + await client.write_gatt_char(par_write_characteristic, stop_play()) # 发送停止播放指令 + await client.write_gatt_char(par_write_characteristic, status_check()) # 检查设备工作状态 + await client.stop_notify(par_notification_characteristic) + await client.disconnect() + +if __name__ == "__main__": + username = "stark" + password = "12345678" + broker = "47.120.70.16" + port = 1883 + sub_topic = "/smell/cmd" + FRAME_HEAD = 'F5' + FRAME_TAIL = '55' + device_name = "scent08d1f90efa9a" + queue = LifoQueue() + t = threading.Thread(target=sub_run, args=(queue, username, password, broker, port, sub_topic),daemon=True) + t.start() + asyncio.run(main()) + diff --git a/find.py b/find.py new file mode 100644 index 0000000..7efa925 --- /dev/null +++ b/find.py @@ -0,0 +1,40 @@ +from bleak import BleakScanner, BleakClient +import asyncio + +async def get_services(mac_address: str): + async with BleakClient(mac_address) as client: + svcs = client.services + return svcs + +async def find_and_get_services(device_name: str): + # 发现所有设备 + devices = await BleakScanner.discover() + + # 遍历设备列表,查找目标设备 + for device in devices: + if device.name == device_name: + print(f"Found device: {device.name}, MAC Address: {device.address}") + + # 获取服务 + services = await get_services(device.address) + if services: + last_service = services.services[40] + return { + "device_name": device.name, + "mac_address": device.address, + "last_service_uuid": last_service.uuid, + } + else: + print("No services found for the device.") + return None + + # 如果没有找到设备 + print(f"Device with name '{device_name}' not found.") + return None + +# 设备名称 +device_name = "scent08d1f90efa9a" + +# 调用函数并获取返回值 +result = asyncio.run(find_and_get_services(device_name)) +print(result) \ No newline at end of file diff --git a/mqtt_client.py b/mqtt_client.py new file mode 100644 index 0000000..ff3fe12 --- /dev/null +++ b/mqtt_client.py @@ -0,0 +1,55 @@ +''' +# Filename: mqtt_sub.py +# Author: Cai Gui Lin +# Created on: 2024-09-2 +# Modified on: 2024-09-2 +# Version: 2.0 +# Copyright (c) 2024 Cai Gui Lin +# License: MIT +# Description: mqtt subscriber +# Contact: 18166451309@163.com + +''' +import random +from paho.mqtt import client as mqtt_client +import re +client_id = f'python-mqtt-{random.randint(0, 100)}' +def connect_mqtt(username, password, broker, port) -> mqtt_client: + def on_connect(client, userdata, flags, rc): + if rc == 0: + print("Mqtt ok, Ready to subscribe.") + else: + print("Failed to connect mqtt, return code %d\n", rc) + + client = mqtt_client.Client(client_id) + client.username_pw_set(username=username,password=password) + client.on_connect = on_connect + client.connect(broker, port) + return client + + +def subscribe(queue, client, sub_topic): + queue = queue + def on_message(client, userdata, msg): + # 解码消息负载为字符串 + payload_str = msg.payload.decode() + # pattern = r'"channelId": (\d+)|"time": (\d+)' + # matches = re.findall(pattern, payload_str) + # print(payload_str) + queue.put(payload_str) + client.subscribe(sub_topic) + client.on_message = on_message + +def sub_run(queue, username, password, broker, port, sub_topic): + client = connect_mqtt(username, password, broker, port) + subscribe(queue, client, sub_topic) + client.loop_forever() + +if __name__ == '__main__': + username = "stark" + password = "12345678" + broker = "47.120.70.16" + port = 1883 + sub_topic = "/smell/cmd" + sub_run(username, password, broker, port, sub_topic) +