Source code for musisep.dictsep.pursuit

#!python3

"""
Module for the sparse pursuit algorithm and its helper functions.
"""

from __future__ import absolute_import, division, print_function

import numpy as np
import scipy.optimize
import scipy.linalg
import scipy.fftpack

import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt

from . import exptool

[docs]def calc_harscale(minfreq, maxfreq, numfreqs): """ Calculate the scaling factor of the frequency axis for the log-frequency spectrogram. Parameters ---------- minfreq : float Minimum frequency to be represented (included) maxfreq : float Maximum frequency to be represented (excluded) numfreqs : int Intended height of the spectrogram Returns ------- harscale : float Scaling factor """ return numfreqs / np.log(maxfreq / minfreq)
[docs]class Peaks: """ Object to represent the parameters for the peaks in the spectrogram. Parameters ---------- amps : array_like Amplitudes shifts : array_like Fundamental frequencies sigmas : array_like Standard deviations (frequency) spreads : array_like Inharmonicities insts : array_like Instrument numbers """ def __init__(self, amps, shifts, sigmas, spreads, insts): self.amps = np.asarray(amps, dtype='double') self.shifts = np.asarray(shifts, dtype='double') self.sigmas = np.asarray(sigmas, dtype='double') self.spreads = np.asarray(spreads, dtype='double') self.insts = np.asarray(insts, dtype='int') @classmethod
[docs] def empty(cls): """ Construct an empty `Peaks` object. Returns ------- Peaks A `Peaks` object with zero peaks """ return cls(np.zeros(0), np.zeros(0), np.zeros(0), np.zeros(0), np.zeros(0))
@classmethod
[docs] def from_array(cls, array, insts): """ Construct a `Peaks` object from an array. Parameters ---------- array : array_like Array that contains, in consecutive order, the amplitudes, the fundamental frequencies, the standard deviations, and the inharmoniticies insts : array_like Instrument numbers """ array = np.asarray(array) insts = np.asarray(insts) return cls(*array.reshape((4, insts.size)), insts)
def __len__(self): return self.insts.size def __getitem__(self, key): new = Peaks(self.amps[key], self.shifts[key], self.sigmas[key], self.spreads[key], self.insts[key]) return new
[docs] def get_params(self): """ Returns ------- amps : array_like Amplitudes shifts : array_like Fundamental frequencies sigmas : array_like Standard deviations (frequency) spreads : array_like Inharmonicities insts : array_like Instrument numbers """ return self.amps, self.shifts, self.sigmas, self.spreads, self.insts
[docs] def get_array(self): """ Returns ------- array_like Array that contains, in consecutive order, the amplitudes, the fundamental frequencies, the standard deviations, and the inharmoniticies """ return np.concatenate((self.amps, self.shifts, self.sigmas, self.spreads))
[docs] def copy(self): """ Returns ------- Peaks Copy of the contained peak parameters """ new = Peaks(self.amps.copy(), self.shifts.copy(), self.sigmas.copy(), self.spreads.copy(), self.insts.copy()) return new
[docs] def merge(self, new): """ Merge the `Peaks` object with another `Peaks` object contained in `new` by concatenating the parameters. Parameters ---------- new : Peaks Object to merge with """ self.amps = np.concatenate((self.amps, new.amps)) self.shifts = np.concatenate((self.shifts, new.shifts)) self.sigmas = np.concatenate((self.sigmas, new.sigmas)) self.spreads = np.concatenate((self.spreads, new.spreads)) self.insts = np.concatenate((self.insts, new.insts))
[docs]def inst_shift(peaks, inst_dict, harscale, pexp, m, n): """ Synthesize the log-frequency spectrum. Parameters ---------- peaks : Peaks Peak parameters inst_dict : array_like Dictionary containing the relative amplitudes of the harmonics harscale : float Scaling factor pexp : float Exponent for the addition of sinusoids m : int Height of the spectrogram n : int Number of instruments Returns ------- ndarray Log-frequency spectrum """ inst_dict = np.asarray(inst_dict) reconstruction = exptool.inst_shift(*peaks.get_params(), inst_dict, harscale, pexp, m, n) return np.asarray(reconstruction) ** (1/pexp)
[docs]def inst_scale(peaks, inst_dict, pexp, m, n): """ Synthesize the linear-frequency spectrum. Parameters ---------- peaks : Peaks Peak parameters inst_dict : array_like Dictionary containing the relative amplitudes of the harmonics pexp : float Exponent for the addition of sinusoids m : int Height of the spectrogram n : int Number of instruments Returns ------- ndarray Linear-frequency spectrum """ reconstruction = exptool.inst_scale(*peaks.get_params(), inst_dict, pexp, m, n) return np.asarray(reconstruction) ** (1/pexp)
[docs]def inst_shift_obj(peak_array, insts, inst_dict, harscale, pexp, qexp, m, n, y): """ Least-squares objective function for the log-frequency spectrum. Parameters ---------- peak_array : array_like Peak parameters in array form insts : array_like Instrument numbers inst_dict : array_like Dictionary containing the relative amplitudes of the harmonics harscale : float Scaling factor pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum m : int Height of the spectrogram n : int Number of instruments y : array_like Spectrum to compare with Returns ------- obj : float Least-squares error """ y = np.asarray(y) reconstruction = inst_shift(Peaks.from_array(peak_array, insts), inst_dict, harscale, pexp, m, n) return np.sum(np.square(reconstruction**qexp - y**qexp)) / 2
[docs]def inst_shift_grad(peak_array, insts, inst_dict, harscale, pexp, qexp, m, n, y): """ Least-squares gradient function for the log-frequency spectrum w.r.t. the parameters. Parameters ---------- peak_array : array_like Peak parameters in array form insts : array_like Instrument numbers inst_dict : array_like Dictionary containing the relative amplitudes of the harmonics harscale : float Scaling factor pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum m : int Height of the spectrogram n : int Number of instruments y : array_like Spectrum to compare with Returns ------- grad : ndarray Least-squares gradient """ peaks = Peaks.from_array(peak_array, insts) reconstruction = inst_shift(peaks, inst_dict, harscale, pexp, m, n) reconstruction = np.asarray(reconstruction) expvec = ((reconstruction**qexp - y**qexp) * (reconstruction+1e-40) ** (qexp - pexp) * qexp) grad = exptool.inst_shift_grad(expvec, *peaks.get_params(), inst_dict, harscale, pexp-1, m, n) grad = np.asarray(grad) return grad
[docs]def inst_shift_dict_grad(peak_array, insts, inst_dict, harscale, pexp, qexp, m, n, y): """ Least-squares gradient function for the log-frequency spectrum w.r.t. the dictionary. Parameters ---------- peak_array : array_like Peak parameters in array form insts : array_like Instrument numbers inst_dict : array_like Dictionary containing the relative amplitudes of the harmonics harscale : float Scaling factor pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum m : int Height of the spectrogram n : int Number of instruments y : array_like Spectrum to compare with Returns ------- grad : ndarray Least-squares gradient w.r.t. the dictionary """ y = np.asarray(y) peaks = Peaks.from_array(peak_array, insts) reconstruction = inst_shift(peaks, inst_dict, harscale, pexp, m, n) expvec = ((reconstruction**qexp - y**qexp) * (reconstruction+1e-40) ** (qexp - pexp) * qexp) grad = exptool.inst_shift_dict_grad(expvec, *peaks.get_params(), inst_dict, harscale, pexp-1, m, n) grad = np.asarray(grad) return grad
[docs]def make_bounds(fsigma, length): """ Compute sensible bounds for the peak parameters. Parameters ---------- fsigma : float Standard deviation (frequency) length : int Number of instruments Returns ------- bounds : list of tuple Bounds for the optimizer """ return ([(0, None)] * length + [(None, None)] * length + [(fsigma*0.8, fsigma*1.5)] * length + [(0, 2e-3)] * length)
[docs]def max_selector(y, prenum, n): """ Callback selector to find peaks based on the local maxima which are dominant in a discrete interval, viewed from its midpoint. Parameters ---------- y : array_like Spectrum prenum : int Number of peaks to consider n : int Length of the interval Returns ------- amps : array_like Amplitudes shifts : array_like Frequencies insts : array_like Instrument numbers (always 0) """ y = np.asarray(y) A = scipy.linalg.toeplitz(y[:n], y) A = np.roll(A, -(n-1)//2, axis=1) keys, = np.where(np.logical_and(np.amax(A, axis=0) == y, y > 0)) idcs = np.argsort(-y[keys])[:prenum] return y[keys[idcs]], keys[idcs], np.zeros(idcs.size)
[docs]def fft_selector(y, prenum, baseshift, inst_spect, qexp): """ Callback selector to find fundamental frequencies based on the correlation of the spectrum with the instrument spectra. Parameters ---------- y : array_like Spectrum prenum : int Number of peaks to consider baseshift : int Length to add to the spectrum in order to avoid circular convolution inst_spect : array_like Spectra of the instruments, in the columns qexp : float Exponent to be applied on the spectrum Returns ------- amps : array_like Amplitudes shifts : array_like Fundamental frequencies insts : array_like Instrument numbers """ inst_spect = np.asarray(inst_spect) inst_spect_norms = np.linalg.norm(inst_spect, 2, axis=0) inst_spect = inst_spect / inst_spect_norms inst_spect_freq = scipy.fftpack.fft(inst_spect, axis=0) yaug = np.concatenate((np.zeros(baseshift), y)) yfreq = scipy.fftpack.fft(yaug) corrs = scipy.fftpack.ifft(np.conj(inst_spect_freq) * yfreq.reshape([-1, 1]), axis=0) corrs = np.real(corrs[:y.size, :]) keys = np.argsort(-corrs, axis=None)[:prenum] keys, insts = np.unravel_index(keys, corrs.shape) amps = corrs[keys, insts] / inst_spect_norms[insts] idcs, = np.where(amps > 0) return amps[idcs], keys[idcs], insts[idcs]
[docs]def peak_pursuit(y, num, prenum, runs, inst_dict, fsigma, harscale, selector, selector_args, pexp, qexp, beta=1, init=None): """ Sparse pursuit algorithm for the identification of peaks in a spectrum. Parameters ---------- y : array_like Spectrum num : int Maximum number of peaks prenum : int Number of new peaks to consider per step inst_dict : ndarray Dictionary containing the relative amplitudes of the harmonics fsigma : float Standard deviation (frequency) harscale : float Scaling factor selector : function Callback selector accepting `y` and `prenum` as arguments selector_args : sequence Extra arguments to pass to the selector pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum beta : float Residual reduction factor init : Peaks Initial value for the peaks Returns ------- peaks : Peaks Identified peaks reconstruction : ndarray Synthesized spectrum """ y = np.asarray(y) inst_dict = np.asarray(inst_dict) m = y.size n = inst_dict.shape[1] if init is None: reconstruction = None peaks = Peaks.empty() else: peaks = init reconstruction = inst_shift(peaks, inst_dict, harscale, pexp, m, len(peaks)) r = y**qexp - reconstruction**qexp for i in range(runs): amps, keys, new_insts = selector(r, prenum, *selector_args) if keys.size == 0: print("break in iteration {} [empty selection]".format(i)) break old_peaks = peaks.copy() if (amps < 0).any(): print("amps: {}".format(amps)) new_peaks = Peaks(amps ** (1/qexp), keys, np.repeat(fsigma, keys.size), np.zeros(keys.size), new_insts) peaks.merge(new_peaks) bounds = make_bounds(fsigma, len(peaks)) res = scipy.optimize.fmin_l_bfgs_b( inst_shift_obj, peaks.get_array(), args=(peaks.insts, inst_dict, harscale, pexp, qexp, m, len(peaks), y), bounds=bounds, fprime=inst_shift_grad, #factr=1e3, disp=0) peaks = Peaks.from_array(res[0], peaks.insts) reconstruction_new = inst_shift(peaks, inst_dict, harscale, pexp, m, len(peaks)) if len(peaks) > num: idcs = np.argsort(-peaks.amps)[:num] peaks = peaks[idcs] bounds = make_bounds(fsigma, len(peaks)) res = scipy.optimize.fmin_l_bfgs_b( inst_shift_obj, peaks.get_array(), args=(peaks.insts, inst_dict, harscale, pexp, qexp, m, len(peaks), y), bounds=bounds, fprime=inst_shift_grad, #factr=1e1, disp=0) peaks = Peaks.from_array(res[0], peaks.insts) reconstruction_new = inst_shift(peaks, inst_dict, harscale, pexp, m, len(peaks)) if (np.linalg.norm(reconstruction_new**qexp - y**qexp) < np.linalg.norm(reconstruction**qexp - y**qexp) * beta): reconstruction = reconstruction_new r = y**qexp - reconstruction_new**qexp else: peaks = old_peaks print("break in iteration {}".format(i)) break return peaks, reconstruction
[docs]def gen_inst_spect(baseshift, fsigma, inst_dict, harscale, pexp, qexp, m): """ Generate an instrument log-frequency spectrum. Parameters ---------- baseshift : int Length to add to the spectrum in order to avoid circular convolution fsigma : float Standard deviation (frequency) inst_dict : ndarray Dictionary containing the relative amplitudes of the harmonics harscale : float Scaling factor pexp : float Exponent for the addition of sinusoids qexp : float Exponent to be applied on the spectrum m : int Height of the spectrogram Returns ------- inst_spect : ndarray Spectra of the instruments, in the columns """ n = inst_dict.shape[1] inst_spect = np.zeros((m, n), order='F') for i in range(n): peaks = Peaks([1], [baseshift], [fsigma], [0], [i]) inst_spect[:, i] = inst_shift(peaks, inst_dict, harscale, pexp, m, 1) ** qexp return inst_spect