import requests import pyaudio import wave import json import os import serial from serial.tools import list_ports def send_data_to_com(com, data_to_send): # 创建Serial对象,设置串口参数 ser = serial.Serial( port=com, # 串口名称 baudrate=115200, # 波特率,根据实际情况设置 parity=serial.PARITY_NONE, # 奇偶校验,默认无 stopbits=serial.STOPBITS_ONE, # 停止位,默认1 bytesize=serial.EIGHTBITS, # 数据位,默认8 timeout=None # 没有超时,阻塞读取 ) # 检查串口是否成功打开 if not ser.isOpen(): print("无法打开串口!") return print("串口 COM3 已打开...") # 写入数据 ser.write(data_to_send.encode()) # 必须将数据转换为字节,例如字符串需用encode()方法 # 为了让示例更完整,添加一个关闭串口的步骤 ser.close() print("串口已关闭") def find_serial_ports(): """Find and return a list of available serial ports.""" available_ports = list_ports.comports() for port in available_ports: print(f"Found: {port.device} - {port.description}") return available_ports def get_audio(filepath, aa): if aa == str("是") : CHUNK = 256 FORMAT = pyaudio.paInt16 CHANNELS = 1 # 声道数 RATE = 11025 # 采样率 RECORD_SECONDS = 5 WAVE_OUTPUT_FILENAME = filepath p = pyaudio.PyAudio() stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK) print("*"*10, "开始录音:请在5秒内输入语音") frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK) frames.append(data) print("*"*10, "录音结束\n") stream.stop_stream() stream.close() p.terminate() wf = wave.open(WAVE_OUTPUT_FILENAME, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b''.join(frames)) wf.close() else: print("无效输入,请重新选择") exit() if __name__ == "__main__": com_string = find_serial_ports() com = str(com_string[0]).split()[0] # 请求地址 url = "http://127.0.0.1:9977/api" input_filename = "input.wav" # 麦克风采集的语音输入 input_filepath = "audio" # 输入文件的path in_path = os.path.join(input_filepath, input_filename) while True: aa = str(input("是否开始录音?(是/否)")) if aa == "否": break get_audio(in_path, aa) # 请求参数 file:音视频文件,language:语言代码,model:模型,response_format:text|json|srt # 返回 code==0 成功,其他失败,msg==成功为ok,其他失败原因,data=识别后返回文字 files = {"file": open(in_path, "rb")} data={"language":"zh","model":"small","response_format":"json"} response = requests.request("POST", url, timeout=600, data=data,files=files) try: data = json.loads(response.json()['data']['answer'])['instruction'] send_data_to_com(com, data) print(data) except: print("描述信息不充分,请修改明确描述问题。")