|
|
| """
|
| Vocoder伪影修复 - 针对呼吸音电音和长音撕裂
|
| 基于RVC社区反馈和研究文献
|
| """
|
| import numpy as np
|
| from scipy import signal
|
| from typing import Optional
|
|
|
|
|
| def fix_phase_discontinuity(audio: np.ndarray, sr: int, chunk_boundaries: Optional[list] = None) -> np.ndarray:
|
| """
|
| 修复相位不连续导致的撕裂
|
|
|
| 参考: "Prosody-Guided Harmonic Attention for Phase-Coherent Neural Vocoding" (arXiv:2601.14472)
|
| Vocoder在长音时会产生相位不连续,导致撕裂
|
|
|
| Args:
|
| audio: 音频数据
|
| sr: 采样率
|
| chunk_boundaries: 分块边界位置(样本索引)
|
|
|
| Returns:
|
| 修复后的音频
|
| """
|
|
|
| analytic_signal = signal.hilbert(audio)
|
| instantaneous_phase = np.unwrap(np.angle(analytic_signal))
|
| amplitude = np.abs(analytic_signal)
|
|
|
|
|
| phase_diff = np.diff(instantaneous_phase)
|
| phase_diff_threshold = np.percentile(np.abs(phase_diff), 99) * 2.5
|
|
|
|
|
| discontinuities = np.where(np.abs(phase_diff) > phase_diff_threshold)[0]
|
|
|
| if len(discontinuities) == 0:
|
| return audio
|
|
|
|
|
| result = audio.copy()
|
| phase_corrected = instantaneous_phase.copy()
|
|
|
| for disc_idx in discontinuities:
|
|
|
| phase_jump = phase_diff[disc_idx]
|
|
|
|
|
| correction_length = min(int(0.02 * sr), len(phase_corrected) - disc_idx - 1)
|
| if correction_length > 0:
|
|
|
| correction_curve = np.linspace(phase_jump, 0, correction_length)
|
| phase_corrected[disc_idx + 1:disc_idx + 1 + correction_length] -= correction_curve
|
|
|
|
|
| corrected_signal = amplitude * np.exp(1j * phase_corrected)
|
| result = np.real(corrected_signal).astype(np.float32)
|
|
|
| return result
|
|
|
|
|
| def reduce_breath_electric_noise(audio: np.ndarray, sr: int, f0: Optional[np.ndarray] = None) -> np.ndarray:
|
| """
|
| 减少呼吸音中的电音
|
|
|
| 参考: GitHub Issue #65 "Artefacting when speech has breath"
|
| 问题: Vocoder在F0=0的区域会产生电子噪声
|
|
|
| Args:
|
| audio: 音频数据
|
| sr: 采样率
|
| f0: F0序列(可选,用于定位呼吸音)
|
|
|
| Returns:
|
| 处理后的音频
|
| """
|
|
|
|
|
| from scipy import signal as scipy_signal
|
|
|
|
|
| nyquist = sr / 2
|
| cutoff = 80 / nyquist
|
|
|
|
|
| sos = scipy_signal.butter(4, cutoff, btype='highpass', output='sos')
|
| audio = scipy_signal.sosfilt(sos, audio)
|
|
|
|
|
|
|
| frame_length = int(0.02 * sr)
|
| hop_length = int(0.01 * sr)
|
|
|
| n_frames = 1 + (len(audio) - frame_length) // hop_length
|
|
|
|
|
| energy = np.zeros(n_frames)
|
| spectral_flatness = np.zeros(n_frames)
|
| high_freq_ratio = np.zeros(n_frames)
|
|
|
| for i in range(n_frames):
|
| start = i * hop_length
|
| end = start + frame_length
|
| if end > len(audio):
|
| break
|
|
|
| frame = audio[start:end]
|
|
|
|
|
| energy[i] = np.sum(frame ** 2)
|
|
|
|
|
| fft = np.abs(np.fft.rfft(frame))
|
| if np.sum(fft) > 1e-10:
|
| geometric_mean = np.exp(np.mean(np.log(fft + 1e-10)))
|
| arithmetic_mean = np.mean(fft)
|
| spectral_flatness[i] = geometric_mean / (arithmetic_mean + 1e-10)
|
|
|
|
|
| freqs = np.fft.rfftfreq(len(frame), 1/sr)
|
| high_freq_mask = freqs >= 4000
|
| high_freq_energy = np.sum(fft[high_freq_mask] ** 2)
|
| total_freq_energy = np.sum(fft ** 2)
|
| high_freq_ratio[i] = high_freq_energy / (total_freq_energy + 1e-10)
|
|
|
|
|
| energy_db = 10 * np.log10(energy + 1e-10)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| candidate_threshold = np.percentile(energy_db, 5)
|
|
|
|
|
|
|
|
|
| is_candidate = energy_db < candidate_threshold
|
| is_wideband_noise = is_candidate & (spectral_flatness > 0.35)
|
| is_highfreq_noise = is_candidate & (high_freq_ratio > 0.15)
|
|
|
|
|
| is_noise = is_wideband_noise | is_highfreq_noise
|
|
|
|
|
| noise_ratio = is_noise.sum() / len(is_noise)
|
| if noise_ratio < 0.01:
|
| return audio
|
|
|
|
|
| if f0 is not None and len(f0) > 0:
|
|
|
| f0_per_audio_frame = len(f0) / n_frames
|
| for i in range(n_frames):
|
| if not is_noise[i]:
|
| continue
|
|
|
| f0_idx = int(i * f0_per_audio_frame)
|
| if f0_idx < len(f0):
|
|
|
| if f0[f0_idx] > 0:
|
| is_noise[i] = False
|
|
|
|
|
| is_breath = is_noise
|
|
|
|
|
|
|
| if noise_ratio < 0.05:
|
|
|
| spectral_threshold_percentile = 85
|
| magnitude_attenuation = 0.2
|
| mix_ratio = 0.5
|
| elif noise_ratio < 0.15:
|
|
|
| spectral_threshold_percentile = 90
|
| magnitude_attenuation = 0.1
|
| mix_ratio = 0.7
|
| else:
|
|
|
| spectral_threshold_percentile = 95
|
| magnitude_attenuation = 0.05
|
| mix_ratio = 0.85
|
|
|
|
|
| result = audio.copy()
|
|
|
| for i in range(n_frames):
|
| if is_breath[i]:
|
| start = i * hop_length
|
| end = start + frame_length
|
| if end > len(audio):
|
| break
|
|
|
|
|
| frame = audio[start:end]
|
|
|
|
|
| fft = np.fft.rfft(frame)
|
| magnitude = np.abs(fft)
|
| phase = np.angle(fft)
|
| freqs = np.fft.rfftfreq(len(frame), 1/sr)
|
|
|
|
|
| high_freq_mask = freqs >= 4000
|
| high_freq_energy = np.sum(magnitude[high_freq_mask] ** 2)
|
| total_freq_energy = np.sum(magnitude ** 2)
|
| frame_high_ratio = high_freq_energy / (total_freq_energy + 1e-10)
|
|
|
| if frame_high_ratio > 0.15:
|
|
|
| magnitude[high_freq_mask] *= 0.05
|
|
|
| mid_freq_mask = (freqs >= 1000) & (freqs < 4000)
|
| magnitude[mid_freq_mask] *= 0.3
|
| else:
|
|
|
| threshold = np.percentile(magnitude, spectral_threshold_percentile)
|
| magnitude = np.where(magnitude > threshold, magnitude, magnitude * magnitude_attenuation)
|
|
|
|
|
| fft_cleaned = magnitude * np.exp(1j * phase)
|
| frame_cleaned = np.fft.irfft(fft_cleaned, n=len(frame))
|
|
|
|
|
| fade_length = min(hop_length // 2, len(frame) // 4)
|
| if fade_length > 0:
|
| fade_in = np.linspace(0, 1, fade_length)
|
| fade_out = np.linspace(1, 0, fade_length)
|
|
|
| frame_cleaned[:fade_length] *= fade_in
|
| frame_cleaned[-fade_length:] *= fade_out
|
|
|
|
|
| result[start:end] = frame * (1 - mix_ratio) + frame_cleaned * mix_ratio
|
|
|
| return result
|
|
|
|
|
| def stabilize_sustained_notes(audio: np.ndarray, sr: int, f0: Optional[np.ndarray] = None) -> np.ndarray:
|
| """
|
| 稳定长音,防止撕裂
|
|
|
| 参考: "Mel Spectrogram Inversion with Stable Pitch" - Apple Research
|
| 长音时vocoder容易产生相位漂移
|
|
|
| Args:
|
| audio: 音频数据
|
| sr: 采样率
|
| f0: F0序列(用于检测长音)
|
|
|
| Returns:
|
| 稳定后的音频
|
| """
|
| if f0 is None or len(f0) == 0:
|
| return audio
|
|
|
|
|
| frame_length = int(0.02 * sr)
|
| hop_length = int(0.01 * sr)
|
|
|
|
|
| n_audio_frames = 1 + (len(audio) - frame_length) // hop_length
|
| f0_per_audio_frame = len(f0) / n_audio_frames
|
|
|
| is_sustained = np.zeros(n_audio_frames, dtype=bool)
|
|
|
|
|
| window_size = 20
|
| for i in range(window_size, n_audio_frames - window_size):
|
| f0_idx = int(i * f0_per_audio_frame)
|
| if f0_idx >= len(f0):
|
| break
|
|
|
|
|
| f0_window_start = max(0, f0_idx - window_size)
|
| f0_window_end = min(len(f0), f0_idx + window_size)
|
| f0_window = f0[f0_window_start:f0_window_end]
|
|
|
|
|
| f0_voiced = f0_window[f0_window > 0]
|
|
|
| if len(f0_voiced) > window_size * 0.8:
|
|
|
| f0_std = np.std(f0_voiced)
|
| f0_mean = np.mean(f0_voiced)
|
|
|
|
|
| if f0_std / (f0_mean + 1e-6) < 0.05:
|
| is_sustained[i] = True
|
|
|
|
|
| result = audio.copy()
|
|
|
| i = 0
|
| while i < n_audio_frames:
|
| if is_sustained[i]:
|
|
|
| start_frame = i
|
| while i < n_audio_frames and is_sustained[i]:
|
| i += 1
|
| end_frame = i
|
|
|
|
|
| start_sample = start_frame * hop_length
|
| end_sample = min(end_frame * hop_length + frame_length, len(audio))
|
|
|
| if end_sample - start_sample < frame_length:
|
| continue
|
|
|
|
|
| sustained_segment = audio[start_sample:end_sample]
|
|
|
|
|
| envelope = np.abs(signal.hilbert(sustained_segment))
|
|
|
|
|
| b, a = signal.butter(2, 50 / (sr / 2), btype='low')
|
| smoothed_envelope = signal.filtfilt(b, a, envelope)
|
|
|
|
|
|
|
| envelope_variation = np.abs(envelope - smoothed_envelope)
|
| variation_threshold = np.percentile(envelope_variation, 75)
|
|
|
|
|
| blend_mask = np.clip(envelope_variation / (variation_threshold + 1e-6), 0, 1)
|
|
|
|
|
| target_envelope = smoothed_envelope * blend_mask + envelope * (1 - blend_mask)
|
|
|
|
|
| if np.max(envelope) > 1e-6:
|
| gain = target_envelope / (envelope + 1e-6)
|
|
|
| gain = np.clip(gain, 0.5, 2.0)
|
| result[start_sample:end_sample] = sustained_segment * gain
|
|
|
| i += 1
|
|
|
| return result
|
|
|
|
|
| def apply_vocoder_artifact_fix(
|
| audio: np.ndarray,
|
| sr: int,
|
| f0: Optional[np.ndarray] = None,
|
| chunk_boundaries: Optional[list] = None,
|
| fix_phase: bool = True,
|
| fix_breath: bool = True,
|
| fix_sustained: bool = True
|
| ) -> np.ndarray:
|
| """
|
| 应用完整的vocoder伪影修复
|
|
|
| Args:
|
| audio: 音频数据
|
| sr: 采样率
|
| f0: F0序列
|
| chunk_boundaries: 分块边界
|
| fix_phase: 是否修复相位不连续
|
| fix_breath: 是否修复呼吸音电音
|
| fix_sustained: 是否稳定长音
|
|
|
| Returns:
|
| 修复后的音频
|
| """
|
| result = audio.copy()
|
|
|
|
|
| if fix_phase:
|
| result = fix_phase_discontinuity(result, sr, chunk_boundaries)
|
|
|
|
|
| if fix_breath:
|
| result = reduce_breath_electric_noise(result, sr, f0)
|
|
|
|
|
| if fix_sustained:
|
| result = stabilize_sustained_notes(result, sr, f0)
|
|
|
| return result
|
|
|