import copy from pyaudio import PyAudio, paInt16 from threading import Thread import numpy as np import sys class AudioAnalyzer(Thread): """ This AudioAnalyzer reads the microphone and finds the frequency of the loudest tone. To use it, you also need the ProtectedList class from the file You need to created an instance of the ProtectedList, which acts as a queue, and you have to pass this queue to the AudioAnalyzer. Then you can read the values from the queue: queue = ProtectedList() analyzer = AudioAnalyzer(queue) analyzer.start() while True: freq = queue.get() print("loudest frequency:", q_data, "nearest note:", a.frequency_to_note_name(q_data, 440)) time.sleep(0.02) """ # settings: (are tuned for best detecting string instruments like guitar) SAMPLING_RATE = 48000 # mac hardware: 44100, 48000, 96000 CHUNK_SIZE = 1024 # number of samples BUFFER_TIMES = 50 # buffer length = CHUNK_SIZE * BUFFER_TIMES ZERO_PADDING = 3 # times the buffer length NUM_HPS = 3 # Harmonic Product Spectrum # overall frequency accuracy (step-size): SAMPLING_RATE / (CHUNK_SIZE * BUFFER_TIMES * (1 + ZERO_PADDING)) Hz # buffer length in seconds: (CHUNK_SIZE * BUFFER_TIMES) / SAMPLING_RATE sec NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] def __init__(self, queue, *args, **kwargs): Thread.__init__(self, *args, **kwargs) self.queue = queue # queue should be instance of ProtectedList (threading_helper.ProtectedList) self.buffer = np.zeros(self.CHUNK_SIZE * self.BUFFER_TIMES) self.hanning_window = np.hanning(len(self.buffer)) self.running = False try: self.audio_object = PyAudio() =, channels=1, rate=self.SAMPLING_RATE, input=True, output=False, frames_per_buffer=self.CHUNK_SIZE) except Exception as e: sys.stderr.write('Error: Line {} {} {}\n'.format(sys.exc_info()[-1].tb_lineno, type(e).__name__, e)) return @staticmethod def frequency_to_number(freq, a4_freq): """ converts a frequency to a note number (for example: A4 is 69)""" if freq == 0: sys.stderr.write("Error: No frequency data. Program has potentially no access to microphone\n") return 0 return 12 * np.log2(freq / a4_freq) + 69 @staticmethod def number_to_frequency(number, a4_freq): """ converts a note number (A4 is 69) back to a frequency """ return a4_freq * 2.0**((number - 69) / 12.0) @staticmethod def number_to_note_name(number): """ converts a note number to a note name (for example: 69 returns 'A', 70 returns 'A#', ... ) """ return AudioAnalyzer.NOTE_NAMES[int(round(number) % 12)] @staticmethod def frequency_to_note_name(frequency, a4_freq): """ converts frequency to note name (for example: 440 returns 'A') """ number = AudioAnalyzer.frequency_to_number(frequency, a4_freq) note_name = AudioAnalyzer.number_to_note_name(number) return note_name def run(self): """ Main function where the microphone buffer gets read and the fourier transformation gets applied """ self.running = True while self.running: try: # read microphone data data =, exception_on_overflow=False) data = np.frombuffer(data, dtype=np.int16) # append data to audio buffer self.buffer[:-self.CHUNK_SIZE] = self.buffer[self.CHUNK_SIZE:] self.buffer[-self.CHUNK_SIZE:] = data # apply the fourier transformation on the whole buffer (with zero-padding + hanning window) magnitude_data = abs(np.fft.fft(np.pad(self.buffer * self.hanning_window, (0, len(self.buffer) * self.ZERO_PADDING), "constant"))) # only use the first half of the fft output data magnitude_data = magnitude_data[:int(len(magnitude_data) / 2)] # HPS: multiply data by itself with different scalings (Harmonic Product Spectrum) magnitude_data_orig = copy.deepcopy(magnitude_data) for i in range(2, self.NUM_HPS+1, 1): hps_len = int(np.ceil(len(magnitude_data) / i)) magnitude_data[:hps_len] *= magnitude_data_orig[::i] # multiply every i element # get the corresponding frequency array frequencies = np.fft.fftfreq(int((len(magnitude_data) * 2) / 1), 1. / self.SAMPLING_RATE) # set magnitude of all frequencies below 60Hz to zero for i, freq in enumerate(frequencies): if freq > 60: magnitude_data[:i - 1] = 0 break # put the frequency of the loudest tone into the queue self.queue.put(round(frequencies[np.argmax(magnitude_data)], 2)) except Exception as e: sys.stderr.write('Error: Line {} {} {}\n'.format(sys.exc_info()[-1].tb_lineno, type(e).__name__, e)) self.audio_object.terminate() if __name__ == "__main__": # Only for testing: from tuner_audio.threading_helper import ProtectedList import time q = ProtectedList() a = AudioAnalyzer(q) a.start() while True: q_data = q.get() if q_data is not None: print("loudest frequency:", q_data, "nearest note:", a.frequency_to_note_name(q_data, 440)) time.sleep(0.02)