Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

21.2. 音频处理

21.2.1. 录音 WAV 文件

		
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-03
# Description: wav 文件录音
##############################################		
import datetime
import wave

import pyaudio

FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
FRAMES_PER_BUFFER = 2048

if __name__ == '__main__':
    filename = f"录音:time-{RATE}-" + str(datetime.datetime.now().strftime("%H%M%S")) + ".wav"
    print(filename)

    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=FRAMES_PER_BUFFER)

    isLoop = True
    buffer = []

    try:
        while (isLoop):
            raw_data = stream.read(FRAMES_PER_BUFFER)
            if len(raw_data) > 0:
                buffer.append(raw_data)

    except KeyboardInterrupt:
        isLoop = False
        print("停止录音")
    except Exception as e:
        print(f"录音出错:{e}")
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()

        wf = wave.open(filename, 'wb')
        wf.setnchannels(CHANNELS)
        wf.setsampwidth(p.get_sample_size(FORMAT))
        wf.setframerate(RATE)
        wf.writeframes(b''.join(buffer))
        wf.close()
		
		
		

21.2.2. 录音 mp3 文件

		
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-03
# Description: mp3 文件录音
##############################################
import datetime
import io

import pyaudio
from pydub import AudioSegment

FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
FRAMES_PER_BUFFER = 2048

if __name__ == '__main__':
    filename = f"test-" + str(datetime.datetime.now().strftime("%H%M%S")) + ".mp3"
    print(f"录音:{filename}")

    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=FRAMES_PER_BUFFER)

    isLoop = True
    pcm_buffer = []

    try:
        while (isLoop):
            raw_data = stream.read(FRAMES_PER_BUFFER)
            if len(raw_data) > 0:
                pcm_buffer.append(raw_data)

    except KeyboardInterrupt:
        isLoop = False
        print("停止录音")
    except Exception as e:
        print(f"录音出错:{e}")
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()

        # 内存里加载PCM数据流到 AudioSegment
        audio = AudioSegment.from_raw(io.BytesIO(b''.join(pcm_buffer)),
                                      sample_width=2,
                                      channels=CHANNELS,
                                      frame_rate=RATE
                                      )

        mp3_buffer = io.BytesIO()
        mp3_buffer.seek(0)
        audio.export(mp3_buffer, 'mp3')

        mp3_data = mp3_buffer.getbuffer().tobytes()

        file = open(filename, 'wb')
        file.write(mp3_data)
        file.close()
		
		
		

21.2.3. Opus 文件录音

		
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-03
# Description: opus 文件录音
##############################################
import datetime
import io

import pyaudio
from pydub import AudioSegment

FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
FRAMES_PER_BUFFER = 2048

if __name__ == '__main__':
    filename = f"test-" + str(datetime.datetime.now().strftime("%H%M%S")) + ".opus"
    print(f"录音:{filename}")

    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=FRAMES_PER_BUFFER)

    isLoop = True
    pcm_buffer = []

    try:
        while (isLoop):
            raw_data = stream.read(FRAMES_PER_BUFFER)
            if len(raw_data) > 0:
                pcm_buffer.append(raw_data)

    except KeyboardInterrupt:
        isLoop = False
        print("停止录音")
    except Exception as e:
        print(f"录音出错:{e}")
    finally:
        stream.stop_stream()
        stream.close()
        p.terminate()

        # 内存里加载PCM数据流到 AudioSegment
        audio = AudioSegment.from_raw(io.BytesIO(b''.join(pcm_buffer)),
                                      sample_width=2,
                                      channels=CHANNELS,
                                      frame_rate=RATE
                                      )

        mp3_buffer = io.BytesIO()
        mp3_buffer.seek(0)
        audio.export(mp3_buffer, 'opus')

        mp3_data = mp3_buffer.getbuffer().tobytes()

        file = open(filename, 'wb')
        file.write(mp3_data)
        file.close()
		
		
		

21.2.4. 网络播放音频 PCM 流

21.2.4.1. 安装

			
# dnf install portaudio-devel
# pip install pyaudio		
			
			

21.2.4.2. 服务端

			
#!/usr/bin/env python

import pyaudio
import socket
import select

FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 4096

audio = pyaudio.PyAudio()

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind(('', 4444))
serversocket.listen(5)


def callback(in_data, frame_count, time_info, status):
    for s in read_list[1:]:
        s.send(in_data)
    return (None, pyaudio.paContinue)


# start Recording
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK, stream_callback=callback)
# stream.start_stream()

read_list = [serversocket]
print "recording..."

try:
    while True:
        readable, writable, errored = select.select(read_list, [], [])
        for s in readable:
            if s is serversocket:
                (clientsocket, address) = serversocket.accept()
                read_list.append(clientsocket)
                print "Connection from", address
            else:
                data = s.recv(1024)
                if not data:
                    read_list.remove(s)
except KeyboardInterrupt:
    pass


print "finished recording"

serversocket.close()
# stop Recording
stream.stop_stream()
stream.close()
audio.terminate()			
			
			

21.2.4.3. 客户端

			
#!/usr/bin/env python

import pyaudio
import socket
import sys

FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 4096

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((sys.argv[1], int(sys.argv[2])))
audio = pyaudio.PyAudio()
stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=CHUNK)

try:
    while True:
        data = s.recv(CHUNK)
        stream.write(data)
except KeyboardInterrupt:
    pass

print('Shutting down')
s.close()
stream.close()
audio.terminate()		
			
			

21.2.5. 网络传输音频 Opus

21.2.5.1. 接收端

			
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-05
# Description: Opus UDP 接收端
##############################################
import socket
import struct

import opuslib_next
import pyaudio

# 音频配置(必须和发送端完全一致)
SAMPLING_RATE = 16000
CHANNELS = 1
SAMPLE_WIDTH = 2
FRAME_DURATION = 20
FRAMES_PER_BUFFER = int(SAMPLING_RATE * FRAME_DURATION / 1000)

# UDP配置
UDP_IP = "0.0.0.0"  # 监听所有网卡
UDP_PORT = 5005
BUFFER_SIZE = 4096  # UDP接收缓冲区


def main():
    # 1. 初始化UDP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind((UDP_IP, UDP_PORT))

    # 2. 初始化Opus解码器
    decoder = opuslib_next.Decoder(
        fs=SAMPLING_RATE,
        channels=CHANNELS
    )

    # 3. 初始化PyAudio
    p = pyaudio.PyAudio()

    # 4. 打开音频播放流
    stream = p.open(
        format=p.get_format_from_width(SAMPLE_WIDTH),
        channels=CHANNELS,
        rate=SAMPLING_RATE,
        output=True,
        frames_per_buffer=FRAMES_PER_BUFFER
    )

    print(f"开始监听UDP {UDP_IP}:{UDP_PORT}... (按Ctrl+C停止)")
    try:
        while True:
            # 接收UDP数据
            data, addr = sock.recvfrom(BUFFER_SIZE)

            # 解析数据:前4字节是opus数据长度
            opus_len = struct.unpack('!I', data[:4])[0]
            opus_data = data[4:]

            # 验证数据长度(防止传输异常)
            if len(opus_data) != opus_len:
                print(f"警告:接收到的数据长度异常,跳过该帧")
                continue

            # 将Opus解码为PCM
            pcm_data = decoder.decode(opus_data, FRAMES_PER_BUFFER)

            # 播放PCM音频
            stream.write(pcm_data)

    except KeyboardInterrupt:
        print("\n停止接收...")
    finally:
        # 清理资源
        stream.stop_stream()
        stream.close()
        p.terminate()
        sock.close()


if __name__ == "__main__":
    main()
			
			
			

21.2.5.2. 发送端

			
#! /usr/scripts/env python3
# -*- coding: UTF-8 -*-
##############################################
# Home	: http://netkiller.github.io
# Author: Neo <netkiller@msn.com>
# Data: 2026-02-05
# Description: Opus UDP 发送端
##############################################			
import socket
import struct

import opuslib_next
import pyaudio

# 音频配置参数
SAMPLING_RATE = 16000  # 16kHz 采样率
CHANNELS = 1  # 单声道
SAMPLE_WIDTH = 2  # 16bit = 2字节
FRAME_DURATION = 20  # 20ms 帧长 (Opus推荐)
FRAMES_PER_BUFFER = int(SAMPLING_RATE * FRAME_DURATION / 1000)  # 每帧采样数: 16000*20/1000=320

# UDP配置
UDP_IP = "127.0.0.1"  # 接收端IP,实际使用时改为对方IP
UDP_PORT = 5005


def main():
    # 1. 初始化UDP socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

    # 2. 初始化Opus编码器
    encoder = opuslib_next.Encoder(
        fs=SAMPLING_RATE,
        channels=CHANNELS,
        application=opuslib_next.APPLICATION_VOIP  # 针对语音优化
    )
    # 设置编码比特率 (语音推荐8000-48000)
    encoder.bitrate = 16000

    # 3. 初始化PyAudio
    p = pyaudio.PyAudio()

    # 4. 打开麦克风音频流
    stream = p.open(
        format=p.get_format_from_width(SAMPLE_WIDTH),
        channels=CHANNELS,
        rate=SAMPLING_RATE,
        input=True,
        frames_per_buffer=FRAMES_PER_BUFFER
    )

    print("开始录音并发送音频... (按Ctrl+C停止)")
    try:
        while True:
            # 读取麦克风的PCM音频数据
            pcm_data = stream.read(FRAMES_PER_BUFFER, exception_on_overflow=False)

            # 将PCM编码为Opus
            opus_data = encoder.encode(pcm_data, FRAMES_PER_BUFFER)

            # 发送前先打包数据长度(解决UDP粘包/拆包问题)
            # 用4字节存储数据长度,再拼接opus数据
            data_to_send = struct.pack('!I', len(opus_data)) + opus_data

            # 通过UDP发送
            sock.sendto(data_to_send, (UDP_IP, UDP_PORT))

    except KeyboardInterrupt:
        print("\n停止发送...")
    finally:
        # 清理资源
        stream.stop_stream()
        stream.close()
        p.terminate()
        sock.close()


if __name__ == "__main__":
    main()