You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
7.5 KiB
Python

from typing import List, Dict
import logging
import queue
import threading
import pyaudio
import numpy
from numpy import pi
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
standard_sample_rates = 1000 * numpy.array([
8, 9.6, 11.025, 12, 16, 22.05, 24, 32,
44.1, 48, 88.2, 96, 192])
note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
def freq2note(freq: float):
log_freq = numpy.log2(freq / 440)
note = 12 * log_freq + 69
base_note = numpy.round(note).astype(int)
return log_freq, note, base_note
def get_supported_sample_rates(pyaudio_device: int,
pyaudio_object: pyaudio.PyAudio = None,
) -> List[int]:
if pyaudio_object is None:
pyaudio_object = pyaudio.PyAudio()
supported_sample_rates = []
devinfo = pyaudio_object.get_device_info_by_index(pyaudio_device)
for rate in standard_sample_rates:
try:
if pyaudio_object.is_format_supported(rate,
input_device=pyaudio_device,
input_channels=devinfo['maxInputChannels'],
input_format=pyaudio.paInt16):
supported_sample_rates.append(rate)
except ValueError:
pass
supported_sample_rates = numpy.array(supported_sample_rates)
return supported_sample_rates
class AudioAnalyzer:
frame_queue = None # type: queue.Queue
_hann_window = None
_fft_buffer = None
_fft_lock = None
_stream = None
_pyaudio_object = None
_fft_freqs = None
_sample_rate = None
_samples_per_buffer = None
stop = None
def __init__(self,
pyaudio_device: int,
min_freq: float = 20,
max_freq: float = 20e3,
samples_per_buffer: int = 1024,
freq_resolution: float = None,
):
self._pyaudio_object = pyaudio.PyAudio()
'''
max_freq < 2 * sample_rate
min_freq * 2**(1/12) > freq_resolution (for discrimination), more for accuracy...
freq_resolution <= sample_rate / (samples_per_buffer * num_buffers)
'''
if freq_resolution is None:
freq_resolution = min_freq * 2**(1/12) / 10
supported_sample_rates = get_supported_sample_rates(pyaudio_device, self._pyaudio_object)
rate_is_acceptable = supported_sample_rates >= 2 * max_freq
sample_rate = numpy.min(supported_sample_rates[rate_is_acceptable]).astype(int)
logger.info('Supported sample rates for device {}: {}'.format(pyaudio_device, supported_sample_rates))
num_buffers = numpy.ceil(sample_rate / (samples_per_buffer * freq_resolution)).astype(int)
samples_per_fft = samples_per_buffer * num_buffers
self._sample_rate = sample_rate
self._samples_per_buffer = samples_per_buffer
self._hann_window = (1 - numpy.cos(numpy.linspace(0, 2 * pi, samples_per_fft, False))) / 2
self._fft_freqs = numpy.fft.fftfreq(samples_per_fft, 1 / sample_rate)
self._fft_buffer = numpy.zeros(num_buffers * samples_per_buffer, dtype=numpy.float32)
self.stop = False
self._fft_lock = threading.Lock()
self.frame_queue = queue.Queue()
print(list(self.__dict__.keys()))
self._stream = self._pyaudio_object.open(format=pyaudio.paInt16,
channels=1,
rate=sample_rate,
input=True,
frames_per_buffer=samples_per_buffer,
stream_callback=self.update)
logger.info('Opened device {} with {} buffers,'.format(pyaudio_device, num_buffers) +
' {} sample rate, {} samples per buffer'.format(
pyaudio_device, num_buffers, sample_rate, samples_per_buffer))
logger.info('Buffers take {:.3g} sec to fully clear'.format(samples_per_fft / sample_rate))
@property
def fft_freqs(self) -> float:
return self._fft_freqs
def start(self):
self._stream.start_stream()
def close(self):
self.stop = True
self._stream.close()
self._pyaudio_object.terminate()
def update(self,
in_data: bytes,
frame_count: int,
time_info: Dict,
status_flags,
):
try:
in_buffer = numpy.fromstring(in_data, numpy.int16)
samples_per_buffer = in_buffer.size
with self._fft_lock:
self._fft_buffer[:-samples_per_buffer] = self._fft_buffer[samples_per_buffer:]
self._fft_buffer[-samples_per_buffer:] = in_buffer
fft = numpy.fft.rfft(self._fft_buffer * self._hann_window)
fft_argmax = numpy.abs(fft[1:]).argmax() + 1 # excluding 0-frequency
frame_data = {
'fft': fft,
'fft_argmax': fft_argmax,
'frequency': self.fft_freqs[fft_argmax],
'magnitude': numpy.abs(fft[fft_argmax]),
}
time_per_buffer = self._samples_per_buffer / self._sample_rate
try:
self.frame_queue.put(frame_data, timeout=time_per_buffer * 10)
except queue.Full:
logger.warning('Frame queue was full for more than 10 buffer periods!')
if self.stop:
return None, pyaudio.paComplete
return None, pyaudio.paContinue
except:
self.close()
def monitor_pitch(pyaudio_device: int,
min_freq: float = 10,
max_freq: float = 6000,
threshold: float = 1e6,
):
analyzer = AudioAnalyzer(pyaudio_device=pyaudio_device,
min_freq=min_freq,
max_freq=max_freq)
prev_magnitude = 0
analyzer.start()
while True:
frame_data = analyzer.frame_queue.get()
if frame_data['magnitude'] <= threshold:
continue
prev_magnitude = frame_data['magnitude']
_, mnote, mnote_base = freq2note(frame_data['frequency'])
mnote_error = mnote - mnote_base
logger.info('freq: {:7.2f} Hz mag:{:7.2f} note: {:>3s} {:+.2f}'.format(
frame_data['frequency'],
numpy.log10(frame_data['magnitude']),
note_names[mnote_base % 12] + str(mnote_base // 12 - 1),
mnote_error))
max_num_symbols = 10
num_symbols = int(mnote_error // (0.5 / max_num_symbols))
if num_symbols > 0:
signal = ' ' * max_num_symbols + '|' + '+' * num_symbols
elif num_symbols == 0:
signal = ' ' * max_num_symbols + '#'
elif num_symbols < 0:
signal = ' ' * (max_num_symbols + num_symbols) + '-' * -num_symbols + '|'
logger.info(' {}'.format(signal))
if __name__ == '__main__':
audio = pyaudio.PyAudio()
logger.info(" Available devices:")
for device in range(audio.get_device_count()):
devinfo = audio.get_device_info_by_index(device)
if devinfo['maxInputChannels'] > 0:
logger.info(' {}: {}'.format(device, devinfo['name']))
monitor_pitch(pyaudio_device=7, min_freq=20)