break analysis off into class, and do threaded analysis
This commit is contained in:
parent
7d7b0ebaf6
commit
6066b0f47f
186
main.py
186
main.py
@ -1,9 +1,12 @@
|
|||||||
import logging
|
import logging
|
||||||
|
from typing import List, Dict
|
||||||
|
import queue
|
||||||
|
|
||||||
import pyaudio
|
import pyaudio
|
||||||
import numpy
|
import numpy
|
||||||
from numpy import pi
|
from numpy import pi
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -12,20 +15,27 @@ standard_sample_rates = 1000 * numpy.array([
|
|||||||
8, 9.6, 11.025, 12, 16, 22.05, 24, 32,
|
8, 9.6, 11.025, 12, 16, 22.05, 24, 32,
|
||||||
44.1, 48, 88.2, 96, 192])
|
44.1, 48, 88.2, 96, 192])
|
||||||
|
|
||||||
def monitor_pitch(device: int = 5,
|
note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
|
||||||
max_freq: float = 6000,
|
|
||||||
min_freq: float = 10,
|
|
||||||
samples_per_buffer: int = 1024,
|
def freq2note(freq: float):
|
||||||
audio: pyaudio.PyAudio = None,
|
log_freq = numpy.log2(freq / 440)
|
||||||
):
|
note = 12 * log_freq + 69
|
||||||
if audio is None:
|
base_note = numpy.round(note).astype(int)
|
||||||
audio = pyaudio.PyAudio()
|
return log_freq, note, base_note
|
||||||
|
|
||||||
|
|
||||||
|
def get_supported_sample_rates(pyaudio_device: int,
|
||||||
|
pyaudio_object: pyaudio.PyAudio = None,
|
||||||
|
) -> List[int]:
|
||||||
|
if pyaudio_object is None:
|
||||||
|
pyaudio_object = pyaudio.PyAudio()
|
||||||
|
|
||||||
supported_sample_rates = []
|
supported_sample_rates = []
|
||||||
devinfo = audio.get_device_info_by_index(device)
|
devinfo = pyaudio_object.get_device_info_by_index(device)
|
||||||
for rate in standard_sample_rates:
|
for rate in standard_sample_rates:
|
||||||
try:
|
try:
|
||||||
if audio.is_format_supported(rate,
|
if pyaudio_object.is_format_supported(rate,
|
||||||
input_device=device,
|
input_device=device,
|
||||||
input_channels=devinfo['maxInputChannels'],
|
input_channels=devinfo['maxInputChannels'],
|
||||||
input_format=pyaudio.paInt16):
|
input_format=pyaudio.paInt16):
|
||||||
@ -33,69 +43,151 @@ def monitor_pitch(device: int = 5,
|
|||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
supported_sample_rates = numpy.array(supported_sample_rates)
|
supported_sample_rates = numpy.array(supported_sample_rates)
|
||||||
logger.info('Supported rates: {}'.format(supported_sample_rates))
|
logger.info('Supported sample rates for device {}: {}'.format(device, supported_sample_rates))
|
||||||
|
|
||||||
|
|
||||||
|
class AudioAnalyzer:
|
||||||
|
frame_queue = None # type: queue.Queue
|
||||||
|
|
||||||
|
_hanning_window = None
|
||||||
|
_fft_buffer = None
|
||||||
|
_fft_lock = None
|
||||||
|
_stream = None
|
||||||
|
_pyaudio_object = None
|
||||||
|
|
||||||
|
_fft_freqs = None
|
||||||
|
_sample_rate = None
|
||||||
|
_samples_per_buffer = None
|
||||||
|
|
||||||
|
stop = None
|
||||||
|
|
||||||
|
def __init__(pyaudio_device: int,
|
||||||
|
min_freq: float = 20,
|
||||||
|
max_freq: float = 20e3,
|
||||||
|
samples_per_buffer: int = 1024,
|
||||||
|
freq_resolution: float = None,
|
||||||
|
):
|
||||||
|
|
||||||
|
self._pyaudio_object = pyaudio.PyAudio()
|
||||||
|
|
||||||
'''
|
'''
|
||||||
max_freq < 2 * sample_rate
|
max_freq < 2 * sample_rate
|
||||||
min_freq * 2**(1/12) > freq_resolution (for discrimination), more for accuracy...
|
min_freq * 2**(1/12) > freq_resolution (for discrimination), more for accuracy...
|
||||||
freq_resolution <= sample_rate / (samples_per_buffer * num_buffers)
|
freq_resolution <= sample_rate / (samples_per_buffer * num_buffers)
|
||||||
'''
|
'''
|
||||||
|
if freq_resolution is None:
|
||||||
freq_resolution = min_freq * 2**(1/12) / 10
|
freq_resolution = min_freq * 2**(1/12) / 10
|
||||||
|
|
||||||
|
supported_sample_rates = get_supported_sample_rates(pyaudio_device, self._pyaudio_object)
|
||||||
rate_is_acceptable = supported_sample_rates >= 2 * max_freq
|
rate_is_acceptable = supported_sample_rates >= 2 * max_freq
|
||||||
sample_rate = int(numpy.min(supported_sample_rates[rate_is_acceptable]))
|
sample_rate = numpy.min(supported_sample_rates[rate_is_acceptable]).astype(int)
|
||||||
num_buffers = int(numpy.ceil(sample_rate / (samples_per_buffer * freq_resolution)))
|
|
||||||
|
num_buffers = numpy.ceil(sample_rate / (samples_per_buffer * freq_resolution)).astype(int)
|
||||||
samples_per_fft = samples_per_buffer * num_buffers
|
samples_per_fft = samples_per_buffer * num_buffers
|
||||||
|
|
||||||
logger.info('Running on device {} with {} buffers,'.format(device, num_buffers) +
|
self._sample_rate = sample_rate
|
||||||
|
self._samples_per_buffer = samples_per_buffer
|
||||||
|
self._hanning_window = (1 - numpy.cos(numpy.linspace(0, 2 * pi, samples_per_fft, False))) / 2
|
||||||
|
self._fft_freqs = numpy.fft.fftfreq(samples_per_fft, 1 / sample_rate)
|
||||||
|
self._fft_buffer = numpy.zeros(num_buffers * samples_per_buffer, dtype=numpy.float32)
|
||||||
|
self.stop = False
|
||||||
|
self._fft_lock = threading.Lock()
|
||||||
|
self.frame_queue = queue.Queue()
|
||||||
|
|
||||||
|
self._stream = audio.open(format=pyaudio.paInt16,
|
||||||
|
channels=1,
|
||||||
|
rate=sample_rate,
|
||||||
|
input=True,
|
||||||
|
frames_per_buffer=samples_per_buffer,
|
||||||
|
stream_callback=self.update)
|
||||||
|
|
||||||
|
logger.info('Opened device {} with {} buffers,'.format(device, num_buffers) +
|
||||||
' {} sample rate, {} samples per buffer'.format(
|
' {} sample rate, {} samples per buffer'.format(
|
||||||
device, num_buffers, sample_rate, samples_per_buffer))
|
device, num_buffers, sample_rate, samples_per_buffer))
|
||||||
logger.info('Buffers take {:.3g} sec to fully clear'.format(samples_per_fft / sample_rate))
|
logger.info('Buffers take {:.3g} sec to fully clear'.format(samples_per_fft / sample_rate))
|
||||||
|
|
||||||
stream = audio.open(format=pyaudio.paInt16,
|
@property
|
||||||
channels=1,
|
def fft_freqs(self) -> float:
|
||||||
rate=sample_rate,
|
return self._fft_freqs
|
||||||
input=True,
|
|
||||||
frames_per_buffer=samples_per_buffer)
|
def start(self):
|
||||||
stream.start_stream()
|
self._stream.start_stream()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self.stop = True
|
||||||
|
self._stream.close()
|
||||||
|
self._pyaudio_object.terminate()
|
||||||
|
|
||||||
|
def update(self,
|
||||||
|
in_data: bytes,
|
||||||
|
frame_count: int,
|
||||||
|
time_info: Dict,
|
||||||
|
status_flags,
|
||||||
|
):
|
||||||
|
#TODO deal with exceptions happening in the callback!
|
||||||
|
|
||||||
|
in_buffer = numpy.fromstring(in_data, numpy.int16)
|
||||||
|
samples_per_buffer = in_buffer.size
|
||||||
|
|
||||||
|
with self._fft_lock:
|
||||||
|
self._fft_buffer[:-samples_per_buffer] = self._fft_buffer[samples_per_buffer:]
|
||||||
|
self._fft_buffer[-samples_per_buffer:] = in_buffer
|
||||||
|
fft = numpy.fft.rfft(self._fft_buffer * self._hanning_window)
|
||||||
|
|
||||||
|
fft_argmax = numpy.abs(fft[1:]).argmax() + 1 # excluding 0-frequency
|
||||||
|
frame_data = {
|
||||||
|
'fft': fft,
|
||||||
|
'fft_argmax': fft_argmax,
|
||||||
|
'frequency': self.fft_freqs[fft_argmax],
|
||||||
|
'magnitude': numpy.abs(fft[fft_argmax]),
|
||||||
|
}
|
||||||
|
|
||||||
|
time_per_buffer = self._samples_per_buffer / self._sample_rate
|
||||||
|
try:
|
||||||
|
self.frame_queue.put(frame_data, timeout=time_per_buffer * 10)
|
||||||
|
except queue.Full:
|
||||||
|
logger.warning('Frame queue was full for more than 10 buffer periods!')
|
||||||
|
|
||||||
|
if self.stop:
|
||||||
|
return None, pyaudio.paComplete
|
||||||
|
else:
|
||||||
|
return None, pyaudio.paContinue
|
||||||
|
|
||||||
|
|
||||||
# Hanning window
|
|
||||||
window = (1 - numpy.cos(numpy.linspace(0, 2 * pi, samples_per_fft, False))) / 2
|
|
||||||
|
|
||||||
freqs = numpy.fft.fftfreq(samples_per_fft, 1 / sample_rate)
|
def monitor_pitch(device: int = 5,
|
||||||
|
min_freq: float = 10,
|
||||||
|
max_freq: float = 6000,
|
||||||
|
):
|
||||||
|
|
||||||
buf = numpy.zeros(num_buffers * samples_per_buffer, dtype=numpy.float32)
|
analyzer = AudioAnalyzer(device=device,
|
||||||
note_names = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
|
min_freq=min_freq,
|
||||||
|
max_freq=max_freq)
|
||||||
|
|
||||||
while stream.is_active():
|
prev_magnitude = 0
|
||||||
# Shift the buffer down and new data in
|
|
||||||
buf[:-samples_per_buffer] = buf[samples_per_buffer:]
|
|
||||||
buf[-samples_per_buffer:] = numpy.fromstring(stream.read(samples_per_buffer), numpy.int16)
|
|
||||||
|
|
||||||
fft = numpy.fft.rfft(buf * window)
|
analyzer.start()
|
||||||
|
while True:
|
||||||
|
frame_data = analyzer.frame_queue.get()
|
||||||
|
|
||||||
# Get frequency of maximum response in range
|
if frame_data['magnitude'] <= prev_magnitude / 2:
|
||||||
ind = numpy.abs(fft[1:]).argmax() + 1
|
continue
|
||||||
freq = freqs[ind]
|
|
||||||
mag = numpy.abs(fft[ind])
|
|
||||||
|
|
||||||
# Get note number and nearest note
|
prev_magnitude = frame_data['magnitude']
|
||||||
q = numpy.log2(freq/440)
|
_, mnote, mnote_base = freq2note(frame_data['frequency'])
|
||||||
n = 12 * q + 69
|
|
||||||
n0 = int(round(n))
|
|
||||||
|
|
||||||
delta = n - n0
|
mnote_error = mnote - mnote_base
|
||||||
logger.info('freq: {:7.2f} Hz mag:{:7.2f} note: {:>3s} {:+.2f}'.format(
|
logger.info('freq: {:7.2f} Hz mag:{:7.2f} note: {:>3s} {:+.2f}'.format(
|
||||||
freq, numpy.log10(mag), note_names[n0 % 12] + str(n0//12 - 1), delta))
|
freq, numpy.log10(mag), note_names[base_mnote % 12] + str(base_mnote//12 - 1), mnote_error))
|
||||||
|
|
||||||
delta_part = int(delta // 0.1)
|
max_num_symbols = 5
|
||||||
if delta_part > 0:
|
num_symbols = int(mnote_error // (0.5 / max_num_symbols))
|
||||||
signal = ' ' * 6 + '+' * delta_part
|
if num_symbols > 0:
|
||||||
elif delta_part == 0:
|
signal = ' ' * max_num_symbols + ' ' + '+' * num_symbols
|
||||||
signal = ' ' * 5 + '|'
|
elif num_symbols == 0:
|
||||||
elif delta_part < 0:
|
signal = ' ' * max_num_symbols + '|'
|
||||||
signal = ' ' * (5 + delta_part) + '-' * delta_part
|
elif num_symbols < 0:
|
||||||
|
signal = ' ' * (max_num_symbols - num_symbols) + '-' * num_symbols
|
||||||
logger.info(' {}'.format(signal))
|
logger.info(' {}'.format(signal))
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user