diff --git a/main.py b/main.py index 6b1acf5..df6fffd 100644 --- a/main.py +++ b/main.py @@ -1,6 +1,7 @@ -import logging from typing import List, Dict +import logging import queue +import threading import pyaudio import numpy @@ -32,18 +33,18 @@ def get_supported_sample_rates(pyaudio_device: int, pyaudio_object = pyaudio.PyAudio() supported_sample_rates = [] - devinfo = pyaudio_object.get_device_info_by_index(device) + devinfo = pyaudio_object.get_device_info_by_index(pyaudio_device) for rate in standard_sample_rates: try: if pyaudio_object.is_format_supported(rate, - input_device=device, + input_device=pyaudio_device, input_channels=devinfo['maxInputChannels'], input_format=pyaudio.paInt16): supported_sample_rates.append(rate) except ValueError: pass supported_sample_rates = numpy.array(supported_sample_rates) - logger.info('Supported sample rates for device {}: {}'.format(device, supported_sample_rates)) + return supported_sample_rates class AudioAnalyzer: @@ -61,7 +62,8 @@ class AudioAnalyzer: stop = None - def __init__(pyaudio_device: int, + def __init__(self, + pyaudio_device: int, min_freq: float = 20, max_freq: float = 20e3, samples_per_buffer: int = 1024, @@ -81,6 +83,7 @@ class AudioAnalyzer: supported_sample_rates = get_supported_sample_rates(pyaudio_device, self._pyaudio_object) rate_is_acceptable = supported_sample_rates >= 2 * max_freq sample_rate = numpy.min(supported_sample_rates[rate_is_acceptable]).astype(int) + logger.info('Supported sample rates for device {}: {}'.format(pyaudio_device, supported_sample_rates)) num_buffers = numpy.ceil(sample_rate / (samples_per_buffer * freq_resolution)).astype(int) samples_per_fft = samples_per_buffer * num_buffers @@ -94,73 +97,74 @@ class AudioAnalyzer: self._fft_lock = threading.Lock() self.frame_queue = queue.Queue() - self._stream = audio.open(format=pyaudio.paInt16, - channels=1, - rate=sample_rate, - input=True, - frames_per_buffer=samples_per_buffer, - stream_callback=self.update) + print(list(self.__dict__.keys())) + self._stream = self._pyaudio_object.open(format=pyaudio.paInt16, + channels=1, + rate=sample_rate, + input=True, + frames_per_buffer=samples_per_buffer, + stream_callback=self.update) - logger.info('Opened device {} with {} buffers,'.format(device, num_buffers) + + logger.info('Opened device {} with {} buffers,'.format(pyaudio_device, num_buffers) + ' {} sample rate, {} samples per buffer'.format( - device, num_buffers, sample_rate, samples_per_buffer)) + pyaudio_device, num_buffers, sample_rate, samples_per_buffer)) logger.info('Buffers take {:.3g} sec to fully clear'.format(samples_per_fft / sample_rate)) - @property - def fft_freqs(self) -> float: - return self._fft_freqs + @property + def fft_freqs(self) -> float: + return self._fft_freqs - def start(self): - self._stream.start_stream() + def start(self): + self._stream.start_stream() - def close(self): - self.stop = True - self._stream.close() - self._pyaudio_object.terminate() + def close(self): + self.stop = True + self._stream.close() + self._pyaudio_object.terminate() - def update(self, - in_data: bytes, - frame_count: int, - time_info: Dict, - status_flags, - ): - #TODO deal with exceptions happening in the callback! + def update(self, + in_data: bytes, + frame_count: int, + time_info: Dict, + status_flags, + ): + #TODO deal with exceptions happening in the callback! - in_buffer = numpy.fromstring(in_data, numpy.int16) - samples_per_buffer = in_buffer.size + in_buffer = numpy.fromstring(in_data, numpy.int16) + samples_per_buffer = in_buffer.size - with self._fft_lock: - self._fft_buffer[:-samples_per_buffer] = self._fft_buffer[samples_per_buffer:] - self._fft_buffer[-samples_per_buffer:] = in_buffer - fft = numpy.fft.rfft(self._fft_buffer * self._hanning_window) + with self._fft_lock: + self._fft_buffer[:-samples_per_buffer] = self._fft_buffer[samples_per_buffer:] + self._fft_buffer[-samples_per_buffer:] = in_buffer + fft = numpy.fft.rfft(self._fft_buffer * self._hanning_window) - fft_argmax = numpy.abs(fft[1:]).argmax() + 1 # excluding 0-frequency - frame_data = { - 'fft': fft, - 'fft_argmax': fft_argmax, - 'frequency': self.fft_freqs[fft_argmax], - 'magnitude': numpy.abs(fft[fft_argmax]), - } + fft_argmax = numpy.abs(fft[1:]).argmax() + 1 # excluding 0-frequency + frame_data = { + 'fft': fft, + 'fft_argmax': fft_argmax, + 'frequency': self.fft_freqs[fft_argmax], + 'magnitude': numpy.abs(fft[fft_argmax]), + } - time_per_buffer = self._samples_per_buffer / self._sample_rate - try: - self.frame_queue.put(frame_data, timeout=time_per_buffer * 10) - except queue.Full: - logger.warning('Frame queue was full for more than 10 buffer periods!') + time_per_buffer = self._samples_per_buffer / self._sample_rate + try: + self.frame_queue.put(frame_data, timeout=time_per_buffer * 10) + except queue.Full: + logger.warning('Frame queue was full for more than 10 buffer periods!') - if self.stop: - return None, pyaudio.paComplete - else: - return None, pyaudio.paContinue + if self.stop: + return None, pyaudio.paComplete + else: + return None, pyaudio.paContinue -def monitor_pitch(device: int = 5, +def monitor_pitch(pyaudio_device: int, min_freq: float = 10, max_freq: float = 6000, ): - analyzer = AudioAnalyzer(device=device, + analyzer = AudioAnalyzer(pyaudio_device=pyaudio_device, min_freq=min_freq, max_freq=max_freq) @@ -170,7 +174,7 @@ def monitor_pitch(device: int = 5, while True: frame_data = analyzer.frame_queue.get() - if frame_data['magnitude'] <= prev_magnitude / 2: + if frame_data['magnitude'] <= 10**6: continue prev_magnitude = frame_data['magnitude'] @@ -178,26 +182,29 @@ def monitor_pitch(device: int = 5, mnote_error = mnote - mnote_base logger.info('freq: {:7.2f} Hz mag:{:7.2f} note: {:>3s} {:+.2f}'.format( - freq, numpy.log10(mag), note_names[base_mnote % 12] + str(base_mnote//12 - 1), mnote_error)) + frame_data['frequency'], + numpy.log10(frame_data['magnitude']), + note_names[mnote_base % 12] + str(mnote_base // 12 - 1), + mnote_error)) - max_num_symbols = 5 + max_num_symbols = 10 num_symbols = int(mnote_error // (0.5 / max_num_symbols)) if num_symbols > 0: - signal = ' ' * max_num_symbols + ' ' + '+' * num_symbols + signal = ' ' * max_num_symbols + '|' + '+' * num_symbols elif num_symbols == 0: - signal = ' ' * max_num_symbols + '|' + signal = ' ' * max_num_symbols + '#' elif num_symbols < 0: - signal = ' ' * (max_num_symbols - num_symbols) + '-' * num_symbols + signal = ' ' * (max_num_symbols + num_symbols) + '-' * -num_symbols + '|' logger.info(' {}'.format(signal)) if __name__ == '__main__': audio = pyaudio.PyAudio() - logger.info("Available devices:") + logger.info(" Available devices:") for device in range(audio.get_device_count()): devinfo = audio.get_device_info_by_index(device) if devinfo['maxInputChannels'] > 0: - logger.info('{}: {}'.format(device, devinfo['name'])) + logger.info(' {}: {}'.format(device, devinfo['name'])) - monitor_pitch(device=5, min_freq=20) + monitor_pitch(pyaudio_device=7, min_freq=20)