from typing import List, Dict import logging import queue import threading import pyaudio import numpy from numpy import pi logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) standard_sample_rates = 1000 * numpy.array([ 8, 9.6, 11.025, 12, 16, 22.05, 24, 32, 44.1, 48, 88.2, 96, 192]) note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] def freq2note(freq: float): log_freq = numpy.log2(freq / 440) note = 12 * log_freq + 69 base_note = numpy.round(note).astype(int) return log_freq, note, base_note def get_supported_sample_rates(pyaudio_device: int, pyaudio_object: pyaudio.PyAudio = None, ) -> List[int]: if pyaudio_object is None: pyaudio_object = pyaudio.PyAudio() supported_sample_rates = [] devinfo = pyaudio_object.get_device_info_by_index(pyaudio_device) for rate in standard_sample_rates: try: if pyaudio_object.is_format_supported(rate, input_device=pyaudio_device, input_channels=devinfo['maxInputChannels'], input_format=pyaudio.paInt16): supported_sample_rates.append(rate) except ValueError: pass supported_sample_rates = numpy.array(supported_sample_rates) return supported_sample_rates class AudioAnalyzer: frame_queue = None # type: queue.Queue _hann_window = None _fft_buffer = None _fft_lock = None _stream = None _pyaudio_object = None _fft_freqs = None _sample_rate = None _samples_per_buffer = None stop = None def __init__(self, pyaudio_device: int, min_freq: float = 20, max_freq: float = 20e3, samples_per_buffer: int = 1024, freq_resolution: float = None, ): self._pyaudio_object = pyaudio.PyAudio() ''' max_freq < 2 * sample_rate min_freq * 2**(1/12) > freq_resolution (for discrimination), more for accuracy... freq_resolution <= sample_rate / (samples_per_buffer * num_buffers) ''' if freq_resolution is None: freq_resolution = min_freq * 2**(1/12) / 10 supported_sample_rates = get_supported_sample_rates(pyaudio_device, self._pyaudio_object) rate_is_acceptable = supported_sample_rates >= 2 * max_freq sample_rate = numpy.min(supported_sample_rates[rate_is_acceptable]).astype(int) logger.info('Supported sample rates for device {}: {}'.format(pyaudio_device, supported_sample_rates)) num_buffers = numpy.ceil(sample_rate / (samples_per_buffer * freq_resolution)).astype(int) samples_per_fft = samples_per_buffer * num_buffers self._sample_rate = sample_rate self._samples_per_buffer = samples_per_buffer self._hann_window = (1 - numpy.cos(numpy.linspace(0, 2 * pi, samples_per_fft, False))) / 2 self._fft_freqs = numpy.fft.fftfreq(samples_per_fft, 1 / sample_rate) self._fft_buffer = numpy.zeros(num_buffers * samples_per_buffer, dtype=numpy.float32) self.stop = False self._fft_lock = threading.Lock() self.frame_queue = queue.Queue() print(list(self.__dict__.keys())) self._stream = self._pyaudio_object.open(format=pyaudio.paInt16, channels=1, rate=sample_rate, input=True, frames_per_buffer=samples_per_buffer, stream_callback=self.update) logger.info('Opened device {} with {} buffers,'.format(pyaudio_device, num_buffers) + ' {} sample rate, {} samples per buffer'.format( pyaudio_device, num_buffers, sample_rate, samples_per_buffer)) logger.info('Buffers take {:.3g} sec to fully clear'.format(samples_per_fft / sample_rate)) @property def fft_freqs(self) -> float: return self._fft_freqs def start(self): self._stream.start_stream() def close(self): self.stop = True self._stream.close() self._pyaudio_object.terminate() def update(self, in_data: bytes, frame_count: int, time_info: Dict, status_flags, ): try: in_buffer = numpy.fromstring(in_data, numpy.int16) samples_per_buffer = in_buffer.size with self._fft_lock: self._fft_buffer[:-samples_per_buffer] = self._fft_buffer[samples_per_buffer:] self._fft_buffer[-samples_per_buffer:] = in_buffer fft = numpy.fft.rfft(self._fft_buffer * self._hann_window) fft_argmax = numpy.abs(fft[1:]).argmax() + 1 # excluding 0-frequency frame_data = { 'fft': fft, 'fft_argmax': fft_argmax, 'frequency': self.fft_freqs[fft_argmax], 'magnitude': numpy.abs(fft[fft_argmax]), } time_per_buffer = self._samples_per_buffer / self._sample_rate try: self.frame_queue.put(frame_data, timeout=time_per_buffer * 10) except queue.Full: logger.warning('Frame queue was full for more than 10 buffer periods!') if self.stop: return None, pyaudio.paComplete return None, pyaudio.paContinue except: self.close() def monitor_pitch(pyaudio_device: int, min_freq: float = 10, max_freq: float = 6000, threshold: float = 1e6, ): analyzer = AudioAnalyzer(pyaudio_device=pyaudio_device, min_freq=min_freq, max_freq=max_freq) prev_magnitude = 0 analyzer.start() while True: frame_data = analyzer.frame_queue.get() if frame_data['magnitude'] <= threshold: continue prev_magnitude = frame_data['magnitude'] _, mnote, mnote_base = freq2note(frame_data['frequency']) mnote_error = mnote - mnote_base logger.info('freq: {:7.2f} Hz mag:{:7.2f} note: {:>3s} {:+.2f}'.format( frame_data['frequency'], numpy.log10(frame_data['magnitude']), note_names[mnote_base % 12] + str(mnote_base // 12 - 1), mnote_error)) max_num_symbols = 10 num_symbols = int(mnote_error // (0.5 / max_num_symbols)) if num_symbols > 0: signal = ' ' * max_num_symbols + '|' + '+' * num_symbols elif num_symbols == 0: signal = ' ' * max_num_symbols + '#' elif num_symbols < 0: signal = ' ' * (max_num_symbols + num_symbols) + '-' * -num_symbols + '|' logger.info(' {}'.format(signal)) if __name__ == '__main__': audio = pyaudio.PyAudio() logger.info(" Available devices:") for device in range(audio.get_device_count()): devinfo = audio.get_device_info_by_index(device) if devinfo['maxInputChannels'] > 0: logger.info(' {}: {}'.format(device, devinfo['name'])) monitor_pitch(pyaudio_device=7, min_freq=20)