stark 4 months ago
commit e725ebe87e

@ -0,0 +1,48 @@
import requests
import json
import ast
def get_ai_code(url, headers, data):
# 将字典转换为JSON字符串
payload = json.dumps(data)
# 发送POST请求
response = requests.post(url, headers=headers, data=payload)
# 检查响应状态码是否为200
if response.status_code != 200:
# 如果状态码不是200则抛出异常或记录错误信息
print(f"请求失败,状态码:{response.status_code}")
else:
# 打印响应的状态码和内容
message = json.loads(response.text)
code = message["data"]["code"]
print(code)
# 提取列表部分
list_part = code[code.find('['):code.find(']')+1]
# 使用 ast.literal_eval 安全地解析字符串
data = ast.literal_eval(list_part)
scent_channel = [item.get("channelId") for item in data]
play_time = [item.get("time") for item in data]
print(message["data"]["description"])
print(message["data"]["remark"])
return scent_channel, play_time
if __name__ == "__main__":
# 定义请求的URL
url = "https://ai.qiweiwangguo.com/api/conversation"
# 定义请求头包括Content-Type
headers = {
"Content-Type": "application/json"
}
# 定义要发送的数据
data = {
"names": "桂花",
"scents": "我想静静",
"message": "请使用两种气味,调出一个可以缓解疲劳的气味。"
}
get_ai_code(url, headers, data)

@ -0,0 +1,197 @@
import asyncio
from bleak import BleakClient, BleakScanner
from bleak.backends.characteristic import BleakGATTCharacteristic
from binascii import unhexlify
from crcmod import crcmod
import threading
from mqtt_client import sub_run
from queue import LifoQueue
import ast
def crc16Add(str_data):
crc16 = crcmod.mkCrcFun(0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
data = str_data.replace(" ", "")
readcrcout = hex(crc16(unhexlify(data))).upper()
str_list = list(readcrcout)
if len(str_list) < 6:
str_list.insert(2, '0'*(6-len(str_list))) # 位数不足补0
crc_data = "".join(str_list)
return crc_data[2:4]+' '+crc_data[4:]
def ten2sixteen(num, length):
"""
十进制转十六进制
:param num: 十进制数字
:param length: 字节长度
:return:
"""
data = str(hex(eval(str(num))))[2:]
data_len = len(data)
if data_len % 2 == 1:
data = '0' + data
data_len += 1
sixteen_str = "00 " * (length - data_len//2) + data[0:2] + ' ' + data[2:]
return sixteen_str.strip()
def cmd2bytearray(cmd_str: str):
verify = crc16Add(cmd_str)
cmd = FRAME_HEAD + ' ' + cmd_str + ' ' + verify + ' ' + FRAME_TAIL
print(cmd)
return bytearray.fromhex(cmd)
def device_capluse():
"""
获取设备气路胶囊信息
:return:
"""
cmd_data = '00 00 00 01 0E 01 06 00 00'
return cmd2bytearray(cmd_data)
def start_play(scent: int, playtime: int):
play_cmd = '00 00 00 01 02 05'
scent_channel = ten2sixteen(scent, 1)
if playtime == 0: # 一直播放
playtime16 = 'FF FF FF FF'
else:
playtime16 = ten2sixteen(playtime, 4)
cmd_data = play_cmd + ' ' + scent_channel + ' ' + playtime16
return cmd2bytearray(cmd_data)
def multiple_play(scent_channels, play_times):
data = ["00"] * 6
for i in range(len(scent_channels)):
data[scent_channels[i] - 1] = "FF" if int(play_times[i]/100) > 255 else ten2sixteen(int(play_times[i]/100), 1)
play_cmd = "00 00 00 01 13 06 " + " ".join(data)
return cmd2bytearray(play_cmd)
def stop_play():
"""
停止播放
:return:
"""
stop_cmd = '00 00 00 01 00 01 00'
return cmd2bytearray(stop_cmd)
def status_check():
"""
检查工作状态
:return:
"""
status_cmd = '00 00 00 01 11 01 00 00 00'
return cmd2bytearray(status_cmd)
def msg_decoder(msg):
list_part = msg[msg.find('['):msg.find(']')+1]
# 使用 ast.literal_eval 安全地解析字符串
data = ast.literal_eval(list_part)
scent_channels = [item.get("channelId") for item in data]
play_times = [item.get("time") for item in data]
return scent_channels, play_times
async def get_services(mac_address: str):
async with BleakClient(mac_address) as client:
svcs = client.services
return svcs
async def find_and_get_services(device_name: str):
# 发现所有设备
devices = await BleakScanner.discover()
# 遍历设备列表,查找目标设备
for device in devices:
if device.name == device_name:
print(f"Found device: {device.name}, MAC Address: {device.address}")
# 获取服务
services = await get_services(device.address)
if services:
last_service = services.services[40]
return {
"device_name": device.name,
"mac_address": device.address,
"last_service_uuid": last_service.uuid,
}
else:
print("No services found for the device.")
return None
# 如果没有找到设备
print(f"Device with name '{device_name}' not found.")
return None
# 监听回调函数,此处为打印消息
def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
# print("rev data:", data)
print("rev data bytes2hex:", ' '.join(['%02x' % b for b in data]))
async def main():
print("starting scan...")
# 基于MAC地址查找设备
information = await find_and_get_services(device_name)
par_device_addr, svc_uuid = information["mac_address"], information["last_service_uuid"]
# 设备的Characteristic UUID
uuid_head = ["6e400003"]
# 设备的Characteristic UUID具备写属性Write
uuid_head_write = ["6e400002"]
uuid = svc_uuid.split('-')[1:]
par_notification_characteristic = "-".join(uuid_head+uuid)
par_write_characteristic = "-".join(uuid_head_write+uuid)
device = await BleakScanner.find_device_by_address(
par_device_addr, cb=dict(use_bdaddr=False)
)
if device is None:
print("could not find device with address '%s'" % par_device_addr)
return
# 事件定义
disconnected_event = asyncio.Event()
# 断开连接事件回调
def disconnected_callback(client):
print("Disconnected callback called!")
disconnected_event.set()
print("connecting to device...")
async with BleakClient(device, disconnected_callback=disconnected_callback) as client:
print("Connected")
await client.start_notify(par_notification_characteristic, notification_handler)
await client.write_gatt_char(par_write_characteristic, device_capluse()) # 获取设备气路胶囊信息
await asyncio.sleep(2.0)
while True:
msg = queue.get()
if msg == "0":
break
scent_channels, play_times = msg_decoder(msg)
print(scent_channels, play_times)
if len(scent_channels) == 1:
await client.write_gatt_char(par_write_characteristic, start_play(scent_channels[0], play_times[0])) # 发送开始播放指令
await asyncio.sleep(play_times[0] / 1000)
else:
await client.write_gatt_char(par_write_characteristic, multiple_play(scent_channels, play_times))
await asyncio.sleep(max(play_times) / 1000)
await client.write_gatt_char(par_write_characteristic, stop_play()) # 发送停止播放指令
await client.write_gatt_char(par_write_characteristic, status_check()) # 检查设备工作状态
await client.stop_notify(par_notification_characteristic)
await client.disconnect()
if __name__ == "__main__":
username = "stark"
password = "12345678"
broker = "47.120.70.16"
port = 1883
sub_topic = "/smell/cmd"
FRAME_HEAD = 'F5'
FRAME_TAIL = '55'
device_name = "scent08d1f90efa9a"
queue = LifoQueue()
t = threading.Thread(target=sub_run, args=(queue, username, password, broker, port, sub_topic),daemon=True)
t.start()
asyncio.run(main())

@ -0,0 +1,40 @@
from bleak import BleakScanner, BleakClient
import asyncio
async def get_services(mac_address: str):
async with BleakClient(mac_address) as client:
svcs = client.services
return svcs
async def find_and_get_services(device_name: str):
# 发现所有设备
devices = await BleakScanner.discover()
# 遍历设备列表,查找目标设备
for device in devices:
if device.name == device_name:
print(f"Found device: {device.name}, MAC Address: {device.address}")
# 获取服务
services = await get_services(device.address)
if services:
last_service = services.services[40]
return {
"device_name": device.name,
"mac_address": device.address,
"last_service_uuid": last_service.uuid,
}
else:
print("No services found for the device.")
return None
# 如果没有找到设备
print(f"Device with name '{device_name}' not found.")
return None
# 设备名称
device_name = "scent08d1f90efa9a"
# 调用函数并获取返回值
result = asyncio.run(find_and_get_services(device_name))
print(result)

@ -0,0 +1,55 @@
'''
# Filename: mqtt_sub.py
# Author: Cai Gui Lin
# Created on: 2024-09-2
# Modified on: 2024-09-2
# Version: 2.0
# Copyright (c) 2024 Cai Gui Lin
# License: MIT
# Description: mqtt subscriber
# Contact: 18166451309@163.com
'''
import random
from paho.mqtt import client as mqtt_client
import re
client_id = f'python-mqtt-{random.randint(0, 100)}'
def connect_mqtt(username, password, broker, port) -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Mqtt ok, Ready to subscribe.")
else:
print("Failed to connect mqtt, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username=username,password=password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(queue, client, sub_topic):
queue = queue
def on_message(client, userdata, msg):
# 解码消息负载为字符串
payload_str = msg.payload.decode()
# pattern = r'"channelId": (\d+)|"time": (\d+)'
# matches = re.findall(pattern, payload_str)
# print(payload_str)
queue.put(payload_str)
client.subscribe(sub_topic)
client.on_message = on_message
def sub_run(queue, username, password, broker, port, sub_topic):
client = connect_mqtt(username, password, broker, port)
subscribe(queue, client, sub_topic)
client.loop_forever()
if __name__ == '__main__':
username = "stark"
password = "12345678"
broker = "47.120.70.16"
port = 1883
sub_topic = "/smell/cmd"
sub_run(username, password, broker, port, sub_topic)
Loading…
Cancel
Save