| 知乎专栏 |
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home : http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-03
# Description: wav 文件录音
##############################################
import datetime
import wave
import pyaudio
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
FRAMES_PER_BUFFER = 2048
if __name__ == '__main__':
filename = f"录音:time-{RATE}-" + str(datetime.datetime.now().strftime("%H%M%S")) + ".wav"
print(filename)
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=FRAMES_PER_BUFFER)
isLoop = True
buffer = []
try:
while (isLoop):
raw_data = stream.read(FRAMES_PER_BUFFER)
if len(raw_data) > 0:
buffer.append(raw_data)
except KeyboardInterrupt:
isLoop = False
print("停止录音")
except Exception as e:
print(f"录音出错:{e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
wf = wave.open(filename, 'wb')
wf.setnchannels(CHANNELS)
wf.setsampwidth(p.get_sample_size(FORMAT))
wf.setframerate(RATE)
wf.writeframes(b''.join(buffer))
wf.close()
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home : http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-03
# Description: mp3 文件录音
##############################################
import datetime
import io
import pyaudio
from pydub import AudioSegment
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
FRAMES_PER_BUFFER = 2048
if __name__ == '__main__':
filename = f"test-" + str(datetime.datetime.now().strftime("%H%M%S")) + ".mp3"
print(f"录音:{filename}")
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=FRAMES_PER_BUFFER)
isLoop = True
pcm_buffer = []
try:
while (isLoop):
raw_data = stream.read(FRAMES_PER_BUFFER)
if len(raw_data) > 0:
pcm_buffer.append(raw_data)
except KeyboardInterrupt:
isLoop = False
print("停止录音")
except Exception as e:
print(f"录音出错:{e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
# 内存里加载PCM数据流到 AudioSegment
audio = AudioSegment.from_raw(io.BytesIO(b''.join(pcm_buffer)),
sample_width=2,
channels=CHANNELS,
frame_rate=RATE
)
mp3_buffer = io.BytesIO()
mp3_buffer.seek(0)
audio.export(mp3_buffer, 'mp3')
mp3_data = mp3_buffer.getbuffer().tobytes()
file = open(filename, 'wb')
file.write(mp3_data)
file.close()
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home : http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-03
# Description: opus 文件录音
##############################################
import datetime
import io
import pyaudio
from pydub import AudioSegment
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
FRAMES_PER_BUFFER = 2048
if __name__ == '__main__':
filename = f"test-" + str(datetime.datetime.now().strftime("%H%M%S")) + ".opus"
print(f"录音:{filename}")
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=FRAMES_PER_BUFFER)
isLoop = True
pcm_buffer = []
try:
while (isLoop):
raw_data = stream.read(FRAMES_PER_BUFFER)
if len(raw_data) > 0:
pcm_buffer.append(raw_data)
except KeyboardInterrupt:
isLoop = False
print("停止录音")
except Exception as e:
print(f"录音出错:{e}")
finally:
stream.stop_stream()
stream.close()
p.terminate()
# 内存里加载PCM数据流到 AudioSegment
audio = AudioSegment.from_raw(io.BytesIO(b''.join(pcm_buffer)),
sample_width=2,
channels=CHANNELS,
frame_rate=RATE
)
mp3_buffer = io.BytesIO()
mp3_buffer.seek(0)
audio.export(mp3_buffer, 'opus')
mp3_data = mp3_buffer.getbuffer().tobytes()
file = open(filename, 'wb')
file.write(mp3_data)
file.close()
#!/usr/bin/env python
import pyaudio
import socket
import select
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 4096
audio = pyaudio.PyAudio()
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('', 4444))
serversocket.listen(5)
def callback(in_data, frame_count, time_info, status):
for s in read_list[1:]:
s.send(in_data)
return (None, pyaudio.paContinue)
# start Recording
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, stream_callback=callback)
# stream.start_stream()
read_list = [serversocket]
print "recording..."
try:
while True:
readable, writable, errored = select.select(read_list, [], [])
for s in readable:
if s is serversocket:
(clientsocket, address) = serversocket.accept()
read_list.append(clientsocket)
print "Connection from", address
else:
data = s.recv(1024)
if not data:
read_list.remove(s)
except KeyboardInterrupt:
pass
print "finished recording"
serversocket.close()
# stop Recording
stream.stop_stream()
stream.close()
audio.terminate()
#!/usr/bin/env python
import pyaudio
import socket
import sys
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 4096
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((sys.argv[1], int(sys.argv[2])))
audio = pyaudio.PyAudio()
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=CHUNK)
try:
while True:
data = s.recv(CHUNK)
stream.write(data)
except KeyboardInterrupt:
pass
print('Shutting down')
s.close()
stream.close()
audio.terminate()
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home : http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-05
# Description: Opus UDP 接收端
##############################################
import socket
import struct
import opuslib_next
import pyaudio
# 音频配置(必须和发送端完全一致)
SAMPLING_RATE = 16000
CHANNELS = 1
SAMPLE_WIDTH = 2
FRAME_DURATION = 20
FRAMES_PER_BUFFER = int(SAMPLING_RATE * FRAME_DURATION / 1000)
# UDP配置
UDP_IP = "0.0.0.0" # 监听所有网卡
UDP_PORT = 5005
BUFFER_SIZE = 4096 # UDP接收缓冲区
def main():
# 1. 初始化UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))
# 2. 初始化Opus解码器
decoder = opuslib_next.Decoder(
fs=SAMPLING_RATE,
channels=CHANNELS
)
# 3. 初始化PyAudio
p = pyaudio.PyAudio()
# 4. 打开音频播放流
stream = p.open(
format=p.get_format_from_width(SAMPLE_WIDTH),
channels=CHANNELS,
rate=SAMPLING_RATE,
output=True,
frames_per_buffer=FRAMES_PER_BUFFER
)
print(f"开始监听UDP {UDP_IP}:{UDP_PORT}... (按Ctrl+C停止)")
try:
while True:
# 接收UDP数据
data, addr = sock.recvfrom(BUFFER_SIZE)
# 解析数据:前4字节是opus数据长度
opus_len = struct.unpack('!I', data[:4])[0]
opus_data = data[4:]
# 验证数据长度(防止传输异常)
if len(opus_data) != opus_len:
print(f"警告:接收到的数据长度异常,跳过该帧")
continue
# 将Opus解码为PCM
pcm_data = decoder.decode(opus_data, FRAMES_PER_BUFFER)
# 播放PCM音频
stream.write(pcm_data)
except KeyboardInterrupt:
print("\n停止接收...")
finally:
# 清理资源
stream.stop_stream()
stream.close()
p.terminate()
sock.close()
if __name__ == "__main__":
main()
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home : http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-05
# Description: Opus UDP 发送端
##############################################
import socket
import struct
import opuslib_next
import pyaudio
# 音频配置参数
SAMPLING_RATE = 16000 # 16kHz 采样率
CHANNELS = 1 # 单声道
SAMPLE_WIDTH = 2 # 16bit = 2字节
FRAME_DURATION = 20 # 20ms 帧长 (Opus推荐)
FRAMES_PER_BUFFER = int(SAMPLING_RATE * FRAME_DURATION / 1000) # 每帧采样数: 16000*20/1000=320
# UDP配置
UDP_IP = "127.0.0.1" # 接收端IP,实际使用时改为对方IP
UDP_PORT = 5005
def main():
# 1. 初始化UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 2. 初始化Opus编码器
encoder = opuslib_next.Encoder(
fs=SAMPLING_RATE,
channels=CHANNELS,
application=opuslib_next.APPLICATION_VOIP # 针对语音优化
)
# 设置编码比特率 (语音推荐8000-48000)
encoder.bitrate = 16000
# 3. 初始化PyAudio
p = pyaudio.PyAudio()
# 4. 打开麦克风音频流
stream = p.open(
format=p.get_format_from_width(SAMPLE_WIDTH),
channels=CHANNELS,
rate=SAMPLING_RATE,
input=True,
frames_per_buffer=FRAMES_PER_BUFFER
)
print("开始录音并发送音频... (按Ctrl+C停止)")
try:
while True:
# 读取麦克风的PCM音频数据
pcm_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)
# 将PCM编码为Opus
opus_data = encoder.encode(pcm_data, FRAMES_PER_BUFFER)
# 发送前先打包数据长度(解决UDP粘包/拆包问题)
# 用4字节存储数据长度,再拼接opus数据
data_to_send = struct.pack('!I', len(opus_data)) + opus_data
# 通过UDP发送
sock.sendto(data_to_send, (UDP_IP, UDP_PORT))
except KeyboardInterrupt:
print("\n停止发送...")
finally:
# 清理资源
stream.stop_stream()
stream.close()
p.terminate()
sock.close()
if __name__ == "__main__":
main()