|
|
|
|
import requests
|
|
|
|
|
import pyaudio
|
|
|
|
|
import wave
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import serial
|
|
|
|
|
from serial.tools import list_ports
|
|
|
|
|
def send_data_to_com(com, data_to_send):
|
|
|
|
|
# 创建Serial对象,设置串口参数
|
|
|
|
|
ser = serial.Serial(
|
|
|
|
|
port=com, # 串口名称
|
|
|
|
|
baudrate=115200, # 波特率,根据实际情况设置
|
|
|
|
|
parity=serial.PARITY_NONE, # 奇偶校验,默认无
|
|
|
|
|
stopbits=serial.STOPBITS_ONE, # 停止位,默认1
|
|
|
|
|
bytesize=serial.EIGHTBITS, # 数据位,默认8
|
|
|
|
|
timeout=None # 没有超时,阻塞读取
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# 检查串口是否成功打开
|
|
|
|
|
if not ser.isOpen():
|
|
|
|
|
print("无法打开串口!")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
print("串口 COM3 已打开...")
|
|
|
|
|
|
|
|
|
|
# 写入数据
|
|
|
|
|
ser.write(data_to_send.encode()) # 必须将数据转换为字节,例如字符串需用encode()方法
|
|
|
|
|
|
|
|
|
|
# 为了让示例更完整,添加一个关闭串口的步骤
|
|
|
|
|
ser.close()
|
|
|
|
|
print("串口已关闭")
|
|
|
|
|
def find_serial_ports():
|
|
|
|
|
"""Find and return a list of available serial ports."""
|
|
|
|
|
available_ports = list_ports.comports()
|
|
|
|
|
for port in available_ports:
|
|
|
|
|
print(f"Found: {port.device} - {port.description}")
|
|
|
|
|
return available_ports
|
|
|
|
|
def get_audio(filepath, aa):
|
|
|
|
|
if aa == str("是") :
|
|
|
|
|
CHUNK = 256
|
|
|
|
|
FORMAT = pyaudio.paInt16
|
|
|
|
|
CHANNELS = 1 # 声道数
|
|
|
|
|
RATE = 11025 # 采样率
|
|
|
|
|
RECORD_SECONDS = 5
|
|
|
|
|
WAVE_OUTPUT_FILENAME = filepath
|
|
|
|
|
p = pyaudio.PyAudio()
|
|
|
|
|
|
|
|
|
|
stream = p.open(format=FORMAT,
|
|
|
|
|
channels=CHANNELS,
|
|
|
|
|
rate=RATE,
|
|
|
|
|
input=True,
|
|
|
|
|
frames_per_buffer=CHUNK)
|
|
|
|
|
|
|
|
|
|
print("*"*10, "开始录音:请在5秒内输入语音")
|
|
|
|
|
frames = []
|
|
|
|
|
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
|
|
|
|
|
data = stream.read(CHUNK)
|
|
|
|
|
frames.append(data)
|
|
|
|
|
print("*"*10, "录音结束\n")
|
|
|
|
|
|
|
|
|
|
stream.stop_stream()
|
|
|
|
|
stream.close()
|
|
|
|
|
p.terminate()
|
|
|
|
|
|
|
|
|
|
wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb')
|
|
|
|
|
wf.setnchannels(CHANNELS)
|
|
|
|
|
wf.setsampwidth(p.get_sample_size(FORMAT))
|
|
|
|
|
wf.setframerate(RATE)
|
|
|
|
|
wf.writeframes(b''.join(frames))
|
|
|
|
|
wf.close()
|
|
|
|
|
else:
|
|
|
|
|
print("无效输入,请重新选择")
|
|
|
|
|
exit()
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
com_string = find_serial_ports()
|
|
|
|
|
com = str(com_string[0]).split()[0]
|
|
|
|
|
# 请求地址
|
|
|
|
|
url = "http://127.0.0.1:9977/api"
|
|
|
|
|
input_filename = "input.wav" # 麦克风采集的语音输入
|
|
|
|
|
input_filepath = "audio" # 输入文件的path
|
|
|
|
|
in_path = os.path.join(input_filepath, input_filename)
|
|
|
|
|
while True:
|
|
|
|
|
aa = str(input("是否开始录音?(是/否)"))
|
|
|
|
|
if aa == "否":
|
|
|
|
|
break
|
|
|
|
|
get_audio(in_path, aa)
|
|
|
|
|
# 请求参数 file:音视频文件,language:语言代码,model:模型,response_format:text|json|srt
|
|
|
|
|
# 返回 code==0 成功,其他失败,msg==成功为ok,其他失败原因,data=识别后返回文字
|
|
|
|
|
files = {"file": open(in_path, "rb")}
|
|
|
|
|
data={"language":"zh","model":"small","response_format":"json"}
|
|
|
|
|
response = requests.request("POST", url, timeout=600, data=data,files=files)
|
|
|
|
|
try:
|
|
|
|
|
data = json.loads(response.json()['data']['answer'])['instruction']
|
|
|
|
|
send_data_to_com(com, data)
|
|
|
|
|
print(data)
|
|
|
|
|
except:
|
|
|
|
|
print("描述信息不充分,请修改明确描述问题。")
|