diff --git a/main.py b/main.py index d880ea6..6b1acf5 100644 --- a/main.py +++ b/main.py @@ -1,9 +1,12 @@ import logging +from typing import List, Dict +import queue import pyaudio import numpy from numpy import pi + logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -12,90 +15,179 @@ standard_sample_rates = 1000 * numpy.array([ 8, 9.6, 11.025, 12, 16, 22.05, 24, 32, 44.1, 48, 88.2, 96, 192]) -def monitor_pitch(device: int = 5, - max_freq: float = 6000, - min_freq: float = 10, - samples_per_buffer: int = 1024, - audio: pyaudio.PyAudio = None, - ): - if audio is None: - audio = pyaudio.PyAudio() +note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] + + +def freq2note(freq: float): + log_freq = numpy.log2(freq / 440) + note = 12 * log_freq + 69 + base_note = numpy.round(note).astype(int) + return log_freq, note, base_note + + +def get_supported_sample_rates(pyaudio_device: int, + pyaudio_object: pyaudio.PyAudio = None, + ) -> List[int]: + if pyaudio_object is None: + pyaudio_object = pyaudio.PyAudio() supported_sample_rates = [] - devinfo = audio.get_device_info_by_index(device) + devinfo = pyaudio_object.get_device_info_by_index(device) for rate in standard_sample_rates: try: - if audio.is_format_supported(rate, - input_device=device, - input_channels=devinfo['maxInputChannels'], - input_format=pyaudio.paInt16): + if pyaudio_object.is_format_supported(rate, + input_device=device, + input_channels=devinfo['maxInputChannels'], + input_format=pyaudio.paInt16): supported_sample_rates.append(rate) except ValueError: pass supported_sample_rates = numpy.array(supported_sample_rates) - logger.info('Supported rates: {}'.format(supported_sample_rates)) - - ''' - max_freq < 2 * sample_rate - min_freq * 2**(1/12) > freq_resolution (for discrimination), more for accuracy... - freq_resolution <= sample_rate / (samples_per_buffer * num_buffers) - ''' - freq_resolution = min_freq * 2**(1/12) / 10 - - rate_is_acceptable = supported_sample_rates >= 2 * max_freq - sample_rate = int(numpy.min(supported_sample_rates[rate_is_acceptable])) - num_buffers = int(numpy.ceil(sample_rate / (samples_per_buffer * freq_resolution))) - samples_per_fft = samples_per_buffer * num_buffers - - logger.info('Running on device {} with {} buffers,'.format(device, num_buffers) + - ' {} sample rate, {} samples per buffer'.format( - device, num_buffers, sample_rate, samples_per_buffer)) - logger.info('Buffers take {:.3g} sec to fully clear'.format(samples_per_fft / sample_rate)) - - stream = audio.open(format=pyaudio.paInt16, - channels=1, - rate=sample_rate, - input=True, - frames_per_buffer=samples_per_buffer) - stream.start_stream() + logger.info('Supported sample rates for device {}: {}'.format(device, supported_sample_rates)) - # Hanning window - window = (1 - numpy.cos(numpy.linspace(0, 2 * pi, samples_per_fft, False))) / 2 +class AudioAnalyzer: + frame_queue = None # type: queue.Queue - freqs = numpy.fft.fftfreq(samples_per_fft, 1 / sample_rate) + _hanning_window = None + _fft_buffer = None + _fft_lock = None + _stream = None + _pyaudio_object = None - buf = numpy.zeros(num_buffers * samples_per_buffer, dtype=numpy.float32) - note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B'] + _fft_freqs = None + _sample_rate = None + _samples_per_buffer = None - while stream.is_active(): - # Shift the buffer down and new data in - buf[:-samples_per_buffer] = buf[samples_per_buffer:] - buf[-samples_per_buffer:] = numpy.fromstring(stream.read(samples_per_buffer), numpy.int16) + stop = None - fft = numpy.fft.rfft(buf * window) + def __init__(pyaudio_device: int, + min_freq: float = 20, + max_freq: float = 20e3, + samples_per_buffer: int = 1024, + freq_resolution: float = None, + ): - # Get frequency of maximum response in range - ind = numpy.abs(fft[1:]).argmax() + 1 - freq = freqs[ind] - mag = numpy.abs(fft[ind]) + self._pyaudio_object = pyaudio.PyAudio() - # Get note number and nearest note - q = numpy.log2(freq/440) - n = 12 * q + 69 - n0 = int(round(n)) + ''' + max_freq < 2 * sample_rate + min_freq * 2**(1/12) > freq_resolution (for discrimination), more for accuracy... + freq_resolution <= sample_rate / (samples_per_buffer * num_buffers) + ''' + if freq_resolution is None: + freq_resolution = min_freq * 2**(1/12) / 10 - delta = n - n0 + supported_sample_rates = get_supported_sample_rates(pyaudio_device, self._pyaudio_object) + rate_is_acceptable = supported_sample_rates >= 2 * max_freq + sample_rate = numpy.min(supported_sample_rates[rate_is_acceptable]).astype(int) + + num_buffers = numpy.ceil(sample_rate / (samples_per_buffer * freq_resolution)).astype(int) + samples_per_fft = samples_per_buffer * num_buffers + + self._sample_rate = sample_rate + self._samples_per_buffer = samples_per_buffer + self._hanning_window = (1 - numpy.cos(numpy.linspace(0, 2 * pi, samples_per_fft, False))) / 2 + self._fft_freqs = numpy.fft.fftfreq(samples_per_fft, 1 / sample_rate) + self._fft_buffer = numpy.zeros(num_buffers * samples_per_buffer, dtype=numpy.float32) + self.stop = False + self._fft_lock = threading.Lock() + self.frame_queue = queue.Queue() + + self._stream = audio.open(format=pyaudio.paInt16, + channels=1, + rate=sample_rate, + input=True, + frames_per_buffer=samples_per_buffer, + stream_callback=self.update) + + logger.info('Opened device {} with {} buffers,'.format(device, num_buffers) + + ' {} sample rate, {} samples per buffer'.format( + device, num_buffers, sample_rate, samples_per_buffer)) + logger.info('Buffers take {:.3g} sec to fully clear'.format(samples_per_fft / sample_rate)) + + @property + def fft_freqs(self) -> float: + return self._fft_freqs + + def start(self): + self._stream.start_stream() + + def close(self): + self.stop = True + self._stream.close() + self._pyaudio_object.terminate() + + def update(self, + in_data: bytes, + frame_count: int, + time_info: Dict, + status_flags, + ): + #TODO deal with exceptions happening in the callback! + + in_buffer = numpy.fromstring(in_data, numpy.int16) + samples_per_buffer = in_buffer.size + + with self._fft_lock: + self._fft_buffer[:-samples_per_buffer] = self._fft_buffer[samples_per_buffer:] + self._fft_buffer[-samples_per_buffer:] = in_buffer + fft = numpy.fft.rfft(self._fft_buffer * self._hanning_window) + + fft_argmax = numpy.abs(fft[1:]).argmax() + 1 # excluding 0-frequency + frame_data = { + 'fft': fft, + 'fft_argmax': fft_argmax, + 'frequency': self.fft_freqs[fft_argmax], + 'magnitude': numpy.abs(fft[fft_argmax]), + } + + time_per_buffer = self._samples_per_buffer / self._sample_rate + try: + self.frame_queue.put(frame_data, timeout=time_per_buffer * 10) + except queue.Full: + logger.warning('Frame queue was full for more than 10 buffer periods!') + + if self.stop: + return None, pyaudio.paComplete + else: + return None, pyaudio.paContinue + + + +def monitor_pitch(device: int = 5, + min_freq: float = 10, + max_freq: float = 6000, + ): + + analyzer = AudioAnalyzer(device=device, + min_freq=min_freq, + max_freq=max_freq) + + prev_magnitude = 0 + + analyzer.start() + while True: + frame_data = analyzer.frame_queue.get() + + if frame_data['magnitude'] <= prev_magnitude / 2: + continue + + prev_magnitude = frame_data['magnitude'] + _, mnote, mnote_base = freq2note(frame_data['frequency']) + + mnote_error = mnote - mnote_base logger.info('freq: {:7.2f} Hz mag:{:7.2f} note: {:>3s} {:+.2f}'.format( - freq, numpy.log10(mag), note_names[n0 % 12] + str(n0//12 - 1), delta)) + freq, numpy.log10(mag), note_names[base_mnote % 12] + str(base_mnote//12 - 1), mnote_error)) - delta_part = int(delta // 0.1) - if delta_part > 0: - signal = ' ' * 6 + '+' * delta_part - elif delta_part == 0: - signal = ' ' * 5 + '|' - elif delta_part < 0: - signal = ' ' * (5 + delta_part) + '-' * delta_part + max_num_symbols = 5 + num_symbols = int(mnote_error // (0.5 / max_num_symbols)) + if num_symbols > 0: + signal = ' ' * max_num_symbols + ' ' + '+' * num_symbols + elif num_symbols == 0: + signal = ' ' * max_num_symbols + '|' + elif num_symbols < 0: + signal = ' ' * (max_num_symbols - num_symbols) + '-' * num_symbols logger.info(' {}'.format(signal))