Merge pull request #229 from svc-develop-team/4.1-Latest

Updata new feature
This commit is contained in:
YuriHead 2023-06-09 19:12:26 +08:00 committed by GitHub
commit 6b5fe6547d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1833 additions and 187 deletions

View File

@ -106,7 +106,11 @@ wget -P pretrain/ http://obs.cstcloud.cn/share/obs/sankagenkeshi/checkpoint_best
- download model at [DPHuBERT-sp0.75.pth](https://huggingface.co/pyf98/DPHuBERT/resolve/main/DPHuBERT-sp0.75.pth)
- Place it under the `pretrain` director
##### **6. If OnnxHubert/ContentVec as the encoder**
##### **6. If WavLM is used as the encoder**
- download model at [WavLM-Base+.pt](https://valle.blob.core.windows.net/share/wavlm/WavLM-Base+.pt?sv=2020-08-04&st=2023-03-01T07%3A51%3A05Z&se=2033-03-02T07%3A51%3A00Z&sr=c&sp=rl&sig=QJXmSJG9DbMKf48UDIU1MfzIro8HQOf3sqlNXiflY1I%3D), the model fits `wavlmbase+`
- Place it under the `pretrain` director
##### **7. If OnnxHubert/ContentVec as the encoder**
- download model at [MoeSS-SUBModel](https://huggingface.co/NaruseMioShirakana/MoeSS-SUBModel/tree/main)
- Place it under the `pretrain` directory
@ -123,6 +127,7 @@ wget -P pretrain/ http://obs.cstcloud.cn/share/obs/sankagenkeshi/checkpoint_best
- "cnhubertlarge"
- "dphubert"
- "whisper-ppg-large"
- "wavlmbase+"
#### **Optional(Strongly recommend)**
@ -213,7 +218,7 @@ python resample.py --skip_loudnorm
python preprocess_flist_config.py --speech_encoder vec768l12
```
speech_encoder has 7 choices
speech_encoder has the following options
```
vec768l12
@ -223,6 +228,7 @@ whisper-ppg
cnhubertlarge
dphubert
whisper-ppg-large
wavlmbase+
```
If the speech_encoder argument is omitted, the default value is vec768l12

View File

@ -108,7 +108,11 @@ wget -P pretrain/ http://obs.cstcloud.cn/share/obs/sankagenkeshi/checkpoint_best
+ 下载模型 [DPHuBERT-sp0.75.pth](https://huggingface.co/pyf98/DPHuBERT/resolve/main/DPHuBERT-sp0.75.pth)
+ 放在`pretrain`目录下
##### **6. 若使用OnnxHubert/ContentVec作为声音编码器**
##### **6. 若使用WavLM作为声音编码器**
+ 下载模型 [WavLM-Base+.pt](https://valle.blob.core.windows.net/share/wavlm/WavLM-Base+.pt?sv=2020-08-04&st=2023-03-01T07%3A51%3A05Z&se=2033-03-02T07%3A51%3A00Z&sr=c&sp=rl&sig=QJXmSJG9DbMKf48UDIU1MfzIro8HQOf3sqlNXiflY1I%3D), 该模型适配`wavlmbase+`
+ 放在`pretrain`目录下
##### **7. 若使用OnnxHubert/ContentVec作为声音编码器**
+ 下载模型 [MoeSS-SUBModel](https://huggingface.co/NaruseMioShirakana/MoeSS-SUBModel/tree/main)
+ 放在`pretrain`目录下
@ -125,6 +129,7 @@ wget -P pretrain/ http://obs.cstcloud.cn/share/obs/sankagenkeshi/checkpoint_best
- "cnhubertlarge"
- "dphubert"
- "whisper-ppg-large"
- "wavlmbase+"
#### **可选项(强烈建议使用)**
@ -215,7 +220,7 @@ python resample.py --skip_loudnorm
python preprocess_flist_config.py --speech_encoder vec768l12
```
speech_encoder拥有七个选择
speech_encoder拥有以下选择
```
vec768l12
@ -225,6 +230,7 @@ whisper-ppg
whisper-ppg-large
cnhubertlarge
dphubert
wavlmbase+
```
如果省略speech_encoder参数默认值为vec768l12

View File

@ -35,7 +35,8 @@
"win_length": 2048,
"n_mel_channels": 80,
"mel_fmin": 0.0,
"mel_fmax": 22050
"mel_fmax": 22050,
"unit_interpolate_mode":"nearest"
},
"model": {
"inter_channels": 192,

View File

@ -11,6 +11,7 @@ data:
validation_files: "filelists/val.txt"
extensions: # List of extension included in the data collection
- wav
unit_interpolate_mode: "nearest"
model:
type: 'Diffusion'
n_layers: 20

View File

@ -31,6 +31,7 @@ class TextAudioSpeakerLoader(torch.utils.data.Dataset):
self.filter_length = hparams.data.filter_length
self.hop_length = hparams.data.hop_length
self.win_length = hparams.data.win_length
self.unit_interpolate_mode = hparams.data.unit_interpolate_mode
self.sampling_rate = hparams.data.sampling_rate
self.use_sr = hparams.train.use_sr
self.spec_len = hparams.train.max_speclen
@ -73,7 +74,7 @@ class TextAudioSpeakerLoader(torch.utils.data.Dataset):
uv = torch.FloatTensor(np.array(uv,dtype=float))
c = torch.load(filename+ ".soft.pt")
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[0])
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[0], mode=self.unit_interpolate_mode)
if self.vol_emb:
volume_path = filename + ".vol.npy"
volume = np.load(volume_path)

View File

@ -63,6 +63,7 @@ def get_data_loaders(args, whole_audio=False):
spk=args.spk,
device=args.train.cache_device,
fp16=args.train.cache_fp16,
unit_interpolate_mode = args.data.unit_interpolate_mode,
use_aug=True)
loader_train = torch.utils.data.DataLoader(
data_train ,
@ -81,6 +82,7 @@ def get_data_loaders(args, whole_audio=False):
whole_audio=True,
spk=args.spk,
extensions=args.data.extensions,
unit_interpolate_mode = args.data.unit_interpolate_mode,
n_spk=args.model.n_spk)
loader_valid = torch.utils.data.DataLoader(
data_valid,
@ -107,6 +109,7 @@ class AudioDataset(Dataset):
device='cpu',
fp16=False,
use_aug=False,
unit_interpolate_mode = 'left'
):
super().__init__()
@ -118,6 +121,7 @@ class AudioDataset(Dataset):
self.use_aug = use_aug
self.data_buffer={}
self.pitch_aug_dict = {}
self.unit_interpolate_mode = unit_interpolate_mode
# np.load(os.path.join(self.path_root, 'pitch_aug_dict.npy'), allow_pickle=True).item()
if load_all_data:
print('Load all the data filelists:', filelists)
@ -171,7 +175,7 @@ class AudioDataset(Dataset):
path_units = name_ext + ".soft.pt"
units = torch.load(path_units).to(device)
units = units[0]
units = repeat_expand_2d(units,f0.size(0)).transpose(0,1)
units = repeat_expand_2d(units,f0.size(0),unit_interpolate_mode).transpose(0,1)
if fp16:
mel = mel.half()
@ -263,7 +267,7 @@ class AudioDataset(Dataset):
path_units = name_ext + ".soft.pt"
units = torch.load(path_units)
units = units[0]
units = repeat_expand_2d(units,f0.size(0)).transpose(0,1)
units = repeat_expand_2d(units,f0.size(0),self.unit_interpolate_mode).transpose(0,1)
units = units[start_frame : start_frame + units_frame_len]

View File

@ -136,19 +136,14 @@ class Svc(object):
self.dev = torch.device(device)
self.net_g_ms = None
if not self.only_diffusion:
self.hps_ms = utils.get_hparams_from_file(config_path)
self.hps_ms = utils.get_hparams_from_file(config_path,True)
self.target_sample = self.hps_ms.data.sampling_rate
self.hop_size = self.hps_ms.data.hop_length
self.spk2id = self.hps_ms.spk
try:
self.vol_embedding = self.hps_ms.model.vol_embedding
except Exception as e:
self.vol_embedding = False
try:
self.speech_encoder = self.hps_ms.model.speech_encoder
except Exception as e:
self.speech_encoder = 'vec768l12'
self.unit_interpolate_mode = self.hps_ms.data.unit_interpolate_mode if self.hps_ms.data.unit_interpolate_mode is not None else 'left'
self.vol_embedding = self.hps_ms.model.vol_embedding if self.hps_ms.model.vol_embedding is not None else False
self.speech_encoder = self.hps_ms.model.speech_encoder if self.hps_ms.model.speech_encoder is not None else 'vec768l12'
self.nsf_hifigan_enhance = nsf_hifigan_enhance
if self.shallow_diffusion or self.only_diffusion:
if os.path.exists(diffusion_model_path) and os.path.exists(diffusion_model_path):
@ -158,6 +153,7 @@ class Svc(object):
self.hop_size = self.diffusion_args.data.block_size
self.spk2id = self.diffusion_args.spk
self.speech_encoder = self.diffusion_args.data.encoder
self.unit_interpolate_mode = self.diffusion_args.data.unit_interpolate_mode if self.diffusion_args.data.unit_interpolate_mode!=None else 'left'
if spk_mix_enable:
self.diffusion_model.init_spkmix(len(self.spk2id))
else:
@ -220,7 +216,7 @@ class Svc(object):
wav16k = librosa.resample(wav, orig_sr=self.target_sample, target_sr=16000)
wav16k = torch.from_numpy(wav16k).to(self.dev)
c = self.hubert_model.encoder(wav16k)
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1])
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1],self.unit_interpolate_mode)
if cluster_infer_ratio !=0:
if self.feature_retrieval:
@ -299,7 +295,7 @@ class Svc(object):
audio16k = librosa.resample(audio.detach().cpu().numpy(), orig_sr=self.target_sample, target_sr=16000)
audio16k = torch.from_numpy(audio16k).to(self.dev)
c = self.hubert_model.encoder(audio16k)
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1])
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1],self.unit_interpolate_mode)
f0 = f0[:,:,None]
c = c.transpose(-1,-2)
audio_mel = self.diffusion_model(

View File

@ -25,7 +25,7 @@ def main():
parser.add_argument('-m', '--model_path', type=str, default="logs/44k/G_37600.pth", help='模型路径')
parser.add_argument('-c', '--config_path', type=str, default="logs/44k/config.json", help='配置文件路径')
parser.add_argument('-cl', '--clip', type=float, default=0, help='音频强制切片默认0为自动切片单位为秒/s')
parser.add_argument('-n', '--clean_names', type=str, nargs='+', default=["test.wav"], help='wav文件名列表放在raw文件夹下')
parser.add_argument('-n', '--clean_names', type=str, nargs='+', default=["君の知らない物語-src.wav"], help='wav文件名列表放在raw文件夹下')
parser.add_argument('-t', '--trans', type=int, nargs='+', default=[0], help='音高调整,支持正负(半音)')
parser.add_argument('-s', '--spk_list', type=str, nargs='+', default=['buyizi'], help='合成目标说话人名称')

View File

@ -13,39 +13,25 @@ class DioF0Predictor(F0Predictor):
'''
对F0进行插值处理
'''
vuv_vector = np.zeros_like(f0, dtype=np.float32)
vuv_vector[f0 > 0.0] = 1.0
vuv_vector[f0 <= 0.0] = 0.0
data = np.reshape(f0, (f0.size, 1))
vuv_vector = np.zeros((data.size, 1), dtype=np.float32)
vuv_vector[data > 0.0] = 1.0
vuv_vector[data <= 0.0] = 0.0
ip_data = data
frame_number = data.size
last_value = 0.0
for i in range(frame_number):
if data[i] <= 0.0:
j = i + 1
for j in range(i + 1, frame_number):
if data[j] > 0.0:
break
if j < frame_number - 1:
if last_value > 0.0:
step = (data[j] - data[i - 1]) / float(j - i)
for k in range(i, j):
ip_data[k] = data[i - 1] + step * (k - i + 1)
else:
for k in range(i, j):
ip_data[k] = data[j]
else:
for k in range(i, frame_number):
ip_data[k] = last_value
else:
ip_data[i] = data[i] #这里可能存在一个没有必要的拷贝
last_value = data[i]
return ip_data[:,0], vuv_vector[:,0]
nzindex = np.nonzero(f0)[0]
data = f0[nzindex]
nzindex = nzindex.astype(np.float32)
time_org = self.hop_length / self.sampling_rate * nzindex
time_frame = np.arange(f0.shape[0]) * self.hop_length / self.sampling_rate
if data.shape[0] <= 0:
return np.zeros(f0.shape[0], dtype=np.float32),vuv_vector
if data.shape[0] == 1:
return np.ones(f0.shape[0], dtype=np.float32) * f0[0],vuv_vector
f0 = np.interp(time_frame, time_org, data, left=data[0], right=data[-1])
return f0,vuv_vector
def resize_f0(self,x, target_len):
source = np.array(x)

View File

@ -13,40 +13,25 @@ class HarvestF0Predictor(F0Predictor):
'''
对F0进行插值处理
'''
vuv_vector = np.zeros_like(f0, dtype=np.float32)
vuv_vector[f0 > 0.0] = 1.0
vuv_vector[f0 <= 0.0] = 0.0
data = np.reshape(f0, (f0.size, 1))
vuv_vector = np.zeros((data.size, 1), dtype=np.float32)
vuv_vector[data > 0.0] = 1.0
vuv_vector[data <= 0.0] = 0.0
ip_data = data
frame_number = data.size
last_value = 0.0
for i in range(frame_number):
if data[i] <= 0.0:
j = i + 1
for j in range(i + 1, frame_number):
if data[j] > 0.0:
break
if j < frame_number - 1:
if last_value > 0.0:
step = (data[j] - data[i - 1]) / float(j - i)
for k in range(i, j):
ip_data[k] = data[i - 1] + step * (k - i + 1)
else:
for k in range(i, j):
ip_data[k] = data[j]
else:
for k in range(i, frame_number):
ip_data[k] = last_value
else:
ip_data[i] = data[i] #这里可能存在一个没有必要的拷贝
last_value = data[i]
return ip_data[:,0], vuv_vector[:,0]
nzindex = np.nonzero(f0)[0]
data = f0[nzindex]
nzindex = nzindex.astype(np.float32)
time_org = self.hop_length / self.sampling_rate * nzindex
time_frame = np.arange(f0.shape[0]) * self.hop_length / self.sampling_rate
if data.shape[0] <= 0:
return np.zeros(f0.shape[0], dtype=np.float32),vuv_vector
if data.shape[0] == 1:
return np.ones(f0.shape[0], dtype=np.float32) * f0[0],vuv_vector
f0 = np.interp(time_frame, time_org, data, left=data[0], right=data[-1])
return f0,vuv_vector
def resize_f0(self,x, target_len):
source = np.array(x)
source[source<0.001] = np.nan

View File

@ -14,39 +14,26 @@ class PMF0Predictor(F0Predictor):
'''
对F0进行插值处理
'''
vuv_vector = np.zeros_like(f0, dtype=np.float32)
vuv_vector[f0 > 0.0] = 1.0
vuv_vector[f0 <= 0.0] = 0.0
data = np.reshape(f0, (f0.size, 1))
nzindex = np.nonzero(f0)[0]
data = f0[nzindex]
nzindex = nzindex.astype(np.float32)
time_org = self.hop_length / self.sampling_rate * nzindex
time_frame = np.arange(f0.shape[0]) * self.hop_length / self.sampling_rate
if data.shape[0] <= 0:
return np.zeros(f0.shape[0], dtype=np.float32),vuv_vector
if data.shape[0] == 1:
return np.ones(f0.shape[0], dtype=np.float32) * f0[0],vuv_vector
f0 = np.interp(time_frame, time_org, data, left=data[0], right=data[-1])
return f0,vuv_vector
vuv_vector = np.zeros((data.size, 1), dtype=np.float32)
vuv_vector[data > 0.0] = 1.0
vuv_vector[data <= 0.0] = 0.0
ip_data = data
frame_number = data.size
last_value = 0.0
for i in range(frame_number):
if data[i] <= 0.0:
j = i + 1
for j in range(i + 1, frame_number):
if data[j] > 0.0:
break
if j < frame_number - 1:
if last_value > 0.0:
step = (data[j] - data[i - 1]) / float(j - i)
for k in range(i, j):
ip_data[k] = data[i - 1] + step * (k - i + 1)
else:
for k in range(i, j):
ip_data[k] = data[j]
else:
for k in range(i, frame_number):
ip_data[k] = last_value
else:
ip_data[i] = data[i] #这里可能存在一个没有必要的拷贝
last_value = data[i]
return ip_data[:,0], vuv_vector[:,0]
def compute_f0(self,wav,p_len=None):
x = wav

View File

@ -97,19 +97,19 @@ class BasePitchExtractor:
f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy()
time_org = self.hop_length / sampling_rate * nzindex.cpu().numpy()
time_frame = np.arange(pad_to) * self.hop_length / sampling_rate
vuv_vector = F.interpolate(vuv_vector[None,None,:],size=pad_to)[0][0]
if f0.shape[0] <= 0:
return torch.zeros(pad_to, dtype=torch.float, device=x.device),torch.zeros(pad_to, dtype=torch.float, device=x.device)
return torch.zeros(pad_to, dtype=torch.float, device=x.device),vuv_vector.cpu().numpy()
if f0.shape[0] == 1:
return torch.ones(pad_to, dtype=torch.float, device=x.device) * f0[0],torch.ones(pad_to, dtype=torch.float, device=x.device)
return torch.ones(pad_to, dtype=torch.float, device=x.device) * f0[0],vuv_vector.cpu().numpy()
# 大概可以用 torch 重写?
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1])
vuv_vector = vuv_vector.cpu().numpy()
vuv_vector = np.ceil(scipy.ndimage.zoom(vuv_vector,pad_to/len(vuv_vector),order = 0))
#vuv_vector = np.ceil(scipy.ndimage.zoom(vuv_vector,pad_to/len(vuv_vector),order = 0))
return f0,vuv_vector
return f0,vuv_vector.cpu().numpy()
class MaskedAvgPool1d(nn.Module):
@ -323,7 +323,7 @@ class CrepePitchExtractor(BasePitchExtractor):
else:
pd = torchcrepe.filter.median(pd, 3)
pd = torchcrepe.threshold.Silence(-60.0)(pd, x, sampling_rate, 512)
pd = torchcrepe.threshold.Silence(-60.0)(pd, x, sampling_rate, self.hop_length)
f0 = torchcrepe.threshold.At(self.threshold)(f0, pd)
if self.use_fast_filters:

View File

@ -83,30 +83,7 @@ def spec_to_mel_torch(spec, n_fft, num_mels, sampling_rate, fmin, fmax):
def mel_spectrogram_torch(y, n_fft, num_mels, sampling_rate, hop_size, win_size, fmin, fmax, center=False):
if torch.min(y) < -1.:
print('min value is ', torch.min(y))
if torch.max(y) > 1.:
print('max value is ', torch.max(y))
global mel_basis, hann_window
dtype_device = str(y.dtype) + '_' + str(y.device)
fmax_dtype_device = str(fmax) + '_' + dtype_device
wnsize_dtype_device = str(win_size) + '_' + dtype_device
if fmax_dtype_device not in mel_basis:
mel = librosa_mel_fn(sr=sampling_rate, n_fft=n_fft, n_mels=num_mels, fmin=fmin, fmax=fmax)
mel_basis[fmax_dtype_device] = torch.from_numpy(mel).to(dtype=y.dtype, device=y.device)
if wnsize_dtype_device not in hann_window:
hann_window[wnsize_dtype_device] = torch.hann_window(win_size).to(dtype=y.dtype, device=y.device)
y = torch.nn.functional.pad(y.unsqueeze(1), (int((n_fft-hop_size)/2), int((n_fft-hop_size)/2)), mode='reflect')
y = y.squeeze(1)
spec = torch.stft(y, n_fft, hop_length=hop_size, win_length=win_size, window=hann_window[wnsize_dtype_device],
center=center, pad_mode='reflect', normalized=False, onesided=True, return_complex=False)
spec = torch.sqrt(spec.pow(2).sum(-1) + 1e-6)
spec = torch.matmul(mel_basis[fmax_dtype_device], spec)
spec = spectral_normalize_torch(spec)
spec = spectrogram_torch(y, n_fft, sampling_rate, hop_size, win_size, center)
spec = spec_to_mel_torch(spec, n_fft, num_mels, sampling_rate, fmin, fmax)
return spec

View File

@ -28,7 +28,7 @@ if __name__ == "__main__":
parser.add_argument("--train_list", type=str, default="./filelists/train.txt", help="path to train list")
parser.add_argument("--val_list", type=str, default="./filelists/val.txt", help="path to val list")
parser.add_argument("--source_dir", type=str, default="./dataset/44k", help="path to source dir")
parser.add_argument("--speech_encoder", type=str, default="vec768l12", help="choice a speech encoder|'vec768l12','vec256l9','hubertsoft','whisper-ppg','cnhubertlarge','dphubert','whisper-ppg-large'")
parser.add_argument("--speech_encoder", type=str, default="vec768l12", help="choice a speech encoder|'vec768l12','vec256l9','hubertsoft','whisper-ppg','cnhubertlarge','dphubert','whisper-ppg-large','wavlmbase+'")
parser.add_argument("--vol_aug", action="store_true", help="Whether to use volume embedding and volume augmentation")
args = parser.parse_args()
@ -81,7 +81,7 @@ if __name__ == "__main__":
config_template["model"]["n_speakers"] = spk_id
config_template["model"]["speech_encoder"] = args.speech_encoder
if args.speech_encoder == "vec768l12" or args.speech_encoder == "dphubert":
if args.speech_encoder == "vec768l12" or args.speech_encoder == "dphubert" or args.speech_encoder == "wavlmbase+":
config_template["model"]["ssl_dim"] = config_template["model"]["filter_channels"] = config_template["model"]["gin_channels"] = 768
d_config_template["data"]["encoder_out_channels"] = 768
elif args.speech_encoder == "vec256l9" or args.speech_encoder == 'hubertsoft':

View File

@ -139,6 +139,9 @@ def get_speech_encoder(speech_encoder,device=None,**kargs):
elif speech_encoder == "whisper-ppg-large":
from vencoder.WhisperPPGLarge import WhisperPPGLarge
speech_encoder_object = WhisperPPGLarge(device = device)
elif speech_encoder == "wavlmbase+":
from vencoder.WavLMBasePlus import WavLMBasePlus
speech_encoder_object = WavLMBasePlus(device = device)
else:
raise Exception("Unknown speech encoder")
return speech_encoder_object
@ -334,11 +337,11 @@ def get_hparams_from_dir(model_dir):
return hparams
def get_hparams_from_file(config_path):
def get_hparams_from_file(config_path, infer_mode = False):
with open(config_path, "r") as f:
data = f.read()
config = json.loads(data)
hparams =HParams(**config)
hparams =HParams(**config) if not infer_mode else InferHParams(**config)
return hparams
@ -377,7 +380,13 @@ def get_logger(model_dir, filename="train.log"):
return logger
def repeat_expand_2d(content, target_len):
def repeat_expand_2d(content, target_len, mode = 'left'):
# content : [h, t]
return repeat_expand_2d_left(content, target_len) if mode == 'left' else repeat_expand_2d_other(content, target_len, mode)
def repeat_expand_2d_left(content, target_len):
# content : [h, t]
src_len = content.shape[-1]
@ -394,6 +403,14 @@ def repeat_expand_2d(content, target_len):
return target
# mode : 'nearest'| 'linear'| 'bilinear'| 'bicubic'| 'trilinear'| 'area'
def repeat_expand_2d_other(content, target_len, mode = 'nearest'):
# content : [h, t]
content = content[None,:,:]
target = F.interpolate(content,size=target_len,mode=mode)[0]
return target
def mix_model(model_paths,mix_rate,mode):
mix_rate = torch.FloatTensor(mix_rate)/100
model_tem = torch.load(model_paths[0])
@ -495,6 +512,18 @@ class HParams():
def get(self,index):
return self.__dict__.get(index)
class InferHParams(HParams):
def __init__(self, **kwargs):
for k, v in kwargs.items():
if type(v) == dict:
v = InferHParams(**v)
self[k] = v
def __getattr__(self,index):
return self.get(index)
class Volume_Extractor:
def __init__(self, hop_size = 512):
self.hop_size = hop_size

View File

@ -292,11 +292,11 @@ class Generator(torch.nn.Module):
c_cur = h["upsample_initial_channel"] // (2 ** (i + 1))
self.ups.append(weight_norm(
ConvTranspose1d(h["upsample_initial_channel"] // (2 ** i), h["upsample_initial_channel"] // (2 ** (i + 1)),
k, u, padding=(k - u) // 2)))
k, u, padding=(k - u +1 ) // 2)))
if i + 1 < len(h["upsample_rates"]): #
stride_f0 = np.prod(h["upsample_rates"][i + 1:])
self.noise_convs.append(Conv1d(
1, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=stride_f0 // 2))
1, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=(stride_f0+1) // 2))
else:
self.noise_convs.append(Conv1d(1, c_cur, kernel_size=1))
self.resblocks = nn.ModuleList()

View File

@ -304,11 +304,11 @@ class Generator(torch.nn.Module):
c_cur = h["upsample_initial_channel"] // (2 ** (i + 1))
self.ups.append(weight_norm(
ConvTranspose1d(h["upsample_initial_channel"] // (2 ** i), h["upsample_initial_channel"] // (2 ** (i + 1)),
k, u, padding=(k - u) // 2)))
k, u, padding=(k - u + 1) // 2)))
if i + 1 < len(h["upsample_rates"]): #
stride_f0 = np.prod(h["upsample_rates"][i + 1:])
self.noise_convs.append(Conv1d(
1, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=stride_f0 // 2))
1, c_cur, kernel_size=stride_f0 * 2, stride=stride_f0, padding=(stride_f0+ 1) // 2))
else:
self.noise_convs.append(Conv1d(1, c_cur, kernel_size=1))
self.resblocks = nn.ModuleList()

29
vencoder/WavLMBasePlus.py Normal file
View File

@ -0,0 +1,29 @@
from vencoder.encoder import SpeechEncoder
import torch
from vencoder.wavlm.WavLM import WavLM, WavLMConfig
class WavLMBasePlus(SpeechEncoder):
def __init__(self,vec_path = "pretrain/WavLM-Base+.pt",device=None):
print("load model(s) from {}".format(vec_path))
checkpoint = torch.load(vec_path)
self.cfg = WavLMConfig(checkpoint['cfg'])
if device is None:
self.dev = torch.device("cuda" if torch.cuda.is_available() else "cpu")
else:
self.dev = torch.device(device)
self.hidden_dim = self.cfg.encoder_embed_dim
self.model = WavLM(self.cfg)
self.model.load_state_dict(checkpoint['model'])
self.model.to(self.dev).eval()
def encoder(self, wav):
feats = wav
if feats.dim() == 2: # double channels
feats = feats.mean(-1)
assert feats.dim() == 1, feats.dim()
if self.cfg.normalize:
feats = torch.nn.functional.layer_norm(feats , feats.shape)
with torch.no_grad():
with torch.inference_mode():
units = self.model.extract_features(feats[None,:])[0]
return units.transpose(1,2)

743
vencoder/wavlm/WavLM.py Normal file
View File

@ -0,0 +1,743 @@
# --------------------------------------------------------
# WavLM: Large-Scale Self-Supervised Pre-training for Full Stack Speech Processing (https://arxiv.org/abs/2110.13900.pdf)
# Github source: https://github.com/microsoft/unilm/tree/master/wavlm
# Copyright (c) 2021 Microsoft
# Licensed under The MIT License [see LICENSE for details]
# Based on fairseq code bases
# https://github.com/pytorch/fairseq
# --------------------------------------------------------
import math
import logging
from typing import List, Optional, Tuple
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import LayerNorm
from vencoder.wavlm.modules import (
Fp32GroupNorm,
Fp32LayerNorm,
GradMultiply,
MultiheadAttention,
SamePad,
init_bert_params,
get_activation_fn,
TransposeLast,
GLU_Linear,
)
logger = logging.getLogger(__name__)
def compute_mask_indices(
shape: Tuple[int, int],
padding_mask: Optional[torch.Tensor],
mask_prob: float,
mask_length: int,
mask_type: str = "static",
mask_other: float = 0.0,
min_masks: int = 0,
no_overlap: bool = False,
min_space: int = 0,
) -> np.ndarray:
"""
Computes random mask spans for a given shape
Args:
shape: the the shape for which to compute masks.
should be of size 2 where first element is batch size and 2nd is timesteps
padding_mask: optional padding mask of the same size as shape, which will prevent masking padded elements
mask_prob: probability for each token to be chosen as start of the span to be masked. this will be multiplied by
number of timesteps divided by length of mask span to mask approximately this percentage of all elements.
however due to overlaps, the actual number will be smaller (unless no_overlap is True)
mask_type: how to compute mask lengths
static = fixed size
uniform = sample from uniform distribution [mask_other, mask_length*2]
normal = sample from normal distribution with mean mask_length and stdev mask_other. mask is min 1 element
poisson = sample from possion distribution with lambda = mask length
min_masks: minimum number of masked spans
no_overlap: if false, will switch to an alternative recursive algorithm that prevents spans from overlapping
min_space: only used if no_overlap is True, this is how many elements to keep unmasked between spans
"""
bsz, all_sz = shape
mask = np.full((bsz, all_sz), False)
all_num_mask = int(
# add a random number for probabilistic rounding
mask_prob * all_sz / float(mask_length)
+ np.random.rand()
)
all_num_mask = max(min_masks, all_num_mask)
mask_idcs = []
for i in range(bsz):
if padding_mask is not None:
sz = all_sz - padding_mask[i].long().sum().item()
num_mask = int(
# add a random number for probabilistic rounding
mask_prob * sz / float(mask_length)
+ np.random.rand()
)
num_mask = max(min_masks, num_mask)
else:
sz = all_sz
num_mask = all_num_mask
if mask_type == "static":
lengths = np.full(num_mask, mask_length)
elif mask_type == "uniform":
lengths = np.random.randint(mask_other, mask_length * 2 + 1, size=num_mask)
elif mask_type == "normal":
lengths = np.random.normal(mask_length, mask_other, size=num_mask)
lengths = [max(1, int(round(x))) for x in lengths]
elif mask_type == "poisson":
lengths = np.random.poisson(mask_length, size=num_mask)
lengths = [int(round(x)) for x in lengths]
else:
raise Exception("unknown mask selection " + mask_type)
if sum(lengths) == 0:
lengths[0] = min(mask_length, sz - 1)
if no_overlap:
mask_idc = []
def arrange(s, e, length, keep_length):
span_start = np.random.randint(s, e - length)
mask_idc.extend(span_start + i for i in range(length))
new_parts = []
if span_start - s - min_space >= keep_length:
new_parts.append((s, span_start - min_space + 1))
if e - span_start - keep_length - min_space > keep_length:
new_parts.append((span_start + length + min_space, e))
return new_parts
parts = [(0, sz)]
min_length = min(lengths)
for length in sorted(lengths, reverse=True):
lens = np.fromiter(
(e - s if e - s >= length + min_space else 0 for s, e in parts),
np.int,
)
l_sum = np.sum(lens)
if l_sum == 0:
break
probs = lens / np.sum(lens)
c = np.random.choice(len(parts), p=probs)
s, e = parts.pop(c)
parts.extend(arrange(s, e, length, min_length))
mask_idc = np.asarray(mask_idc)
else:
min_len = min(lengths)
if sz - min_len <= num_mask:
min_len = sz - num_mask - 1
mask_idc = np.random.choice(sz - min_len, num_mask, replace=False)
mask_idc = np.asarray(
[
mask_idc[j] + offset
for j in range(len(mask_idc))
for offset in range(lengths[j])
]
)
mask_idcs.append(np.unique(mask_idc[mask_idc < sz]))
min_len = min([len(m) for m in mask_idcs])
for i, mask_idc in enumerate(mask_idcs):
if len(mask_idc) > min_len:
mask_idc = np.random.choice(mask_idc, min_len, replace=False)
mask[i, mask_idc] = True
return mask
class WavLMConfig:
def __init__(self, cfg=None):
self.extractor_mode: str = "default" # mode for feature extractor. default has a single group norm with d groups in the first conv block, whereas layer_norm has layer norms in every block (meant to use with normalize=True)
self.encoder_layers: int = 12 # num encoder layers in the transformer
self.encoder_embed_dim: int = 768 # encoder embedding dimension
self.encoder_ffn_embed_dim: int = 3072 # encoder embedding dimension for FFN
self.encoder_attention_heads: int = 12 # num encoder attention heads
self.activation_fn: str = "gelu" # activation function to use
self.layer_norm_first: bool = False # apply layernorm first in the transformer
self.conv_feature_layers: str = "[(512,10,5)] + [(512,3,2)] * 4 + [(512,2,2)] * 2" # string describing convolutional feature extraction layers in form of a python list that contains [(dim, kernel_size, stride), ...]
self.conv_bias: bool = False # include bias in conv encoder
self.feature_grad_mult: float = 1.0 # multiply feature extractor var grads by this
self.normalize: bool = False # normalize input to have 0 mean and unit variance during training
# dropouts
self.dropout: float = 0.1 # dropout probability for the transformer
self.attention_dropout: float = 0.1 # dropout probability for attention weights
self.activation_dropout: float = 0.0 # dropout probability after activation in FFN
self.encoder_layerdrop: float = 0.0 # probability of dropping a tarnsformer layer
self.dropout_input: float = 0.0 # dropout to apply to the input (after feat extr)
self.dropout_features: float = 0.0 # dropout to apply to the features (after feat extr)
# masking
self.mask_length: int = 10 # mask length
self.mask_prob: float = 0.65 # probability of replacing a token with mask
self.mask_selection: str = "static" # how to choose mask length
self.mask_other: float = 0 # secondary mask argument (used for more complex distributions), see help in compute_mask_indicesh
self.no_mask_overlap: bool = False # whether to allow masks to overlap
self.mask_min_space: int = 1 # min space between spans (if no overlap is enabled)
# channel masking
self.mask_channel_length: int = 10 # length of the mask for features (channels)
self.mask_channel_prob: float = 0.0 # probability of replacing a feature with 0
self.mask_channel_selection: str = "static" # how to choose mask length for channel masking
self.mask_channel_other: float = 0 # secondary mask argument (used for more complex distributions), see help in compute_mask_indices
self.no_mask_channel_overlap: bool = False # whether to allow channel masks to overlap
self.mask_channel_min_space: int = 1 # min space between spans (if no overlap is enabled)
# positional embeddings
self.conv_pos: int = 128 # number of filters for convolutional positional embeddings
self.conv_pos_groups: int = 16 # number of groups for convolutional positional embedding
# relative position embedding
self.relative_position_embedding: bool = False # apply relative position embedding
self.num_buckets: int = 320 # number of buckets for relative position embedding
self.max_distance: int = 1280 # maximum distance for relative position embedding
self.gru_rel_pos: bool = False # apply gated relative position embedding
if cfg is not None:
self.update(cfg)
def update(self, cfg: dict):
self.__dict__.update(cfg)
class WavLM(nn.Module):
def __init__(
self,
cfg: WavLMConfig,
) -> None:
super().__init__()
logger.info(f"WavLM Config: {cfg.__dict__}")
self.cfg = cfg
feature_enc_layers = eval(cfg.conv_feature_layers)
self.embed = feature_enc_layers[-1][0]
self.feature_extractor = ConvFeatureExtractionModel(
conv_layers=feature_enc_layers,
dropout=0.0,
mode=cfg.extractor_mode,
conv_bias=cfg.conv_bias,
)
self.post_extract_proj = (
nn.Linear(self.embed, cfg.encoder_embed_dim)
if self.embed != cfg.encoder_embed_dim
else None
)
self.mask_prob = cfg.mask_prob
self.mask_selection = cfg.mask_selection
self.mask_other = cfg.mask_other
self.mask_length = cfg.mask_length
self.no_mask_overlap = cfg.no_mask_overlap
self.mask_min_space = cfg.mask_min_space
self.mask_channel_prob = cfg.mask_channel_prob
self.mask_channel_selection = cfg.mask_channel_selection
self.mask_channel_other = cfg.mask_channel_other
self.mask_channel_length = cfg.mask_channel_length
self.no_mask_channel_overlap = cfg.no_mask_channel_overlap
self.mask_channel_min_space = cfg.mask_channel_min_space
self.dropout_input = nn.Dropout(cfg.dropout_input)
self.dropout_features = nn.Dropout(cfg.dropout_features)
self.feature_grad_mult = cfg.feature_grad_mult
self.mask_emb = nn.Parameter(
torch.FloatTensor(cfg.encoder_embed_dim).uniform_()
)
self.encoder = TransformerEncoder(cfg)
self.layer_norm = LayerNorm(self.embed)
def apply_mask(self, x, padding_mask):
B, T, C = x.shape
if self.mask_prob > 0:
mask_indices = compute_mask_indices(
(B, T),
padding_mask,
self.mask_prob,
self.mask_length,
self.mask_selection,
self.mask_other,
min_masks=2,
no_overlap=self.no_mask_overlap,
min_space=self.mask_min_space,
)
mask_indices = torch.from_numpy(mask_indices).to(x.device)
x[mask_indices] = self.mask_emb
else:
mask_indices = None
if self.mask_channel_prob > 0:
mask_channel_indices = compute_mask_indices(
(B, C),
None,
self.mask_channel_prob,
self.mask_channel_length,
self.mask_channel_selection,
self.mask_channel_other,
no_overlap=self.no_mask_channel_overlap,
min_space=self.mask_channel_min_space,
)
mask_channel_indices = (
torch.from_numpy(mask_channel_indices)
.to(x.device)
.unsqueeze(1)
.expand(-1, T, -1)
)
x[mask_channel_indices] = 0
return x, mask_indices
def forward_padding_mask(
self, features: torch.Tensor, padding_mask: torch.Tensor,
) -> torch.Tensor:
extra = padding_mask.size(1) % features.size(1)
if extra > 0:
padding_mask = padding_mask[:, :-extra]
padding_mask = padding_mask.view(
padding_mask.size(0), features.size(1), -1
)
padding_mask = padding_mask.all(-1)
return padding_mask
def extract_features(
self,
source: torch.Tensor,
padding_mask: Optional[torch.Tensor] = None,
mask: bool = False,
ret_conv: bool = False,
output_layer: Optional[int] = None,
ret_layer_results: bool = False,
):
if self.feature_grad_mult > 0:
features = self.feature_extractor(source)
if self.feature_grad_mult != 1.0:
features = GradMultiply.apply(features, self.feature_grad_mult)
else:
with torch.no_grad():
features = self.feature_extractor(source)
features = features.transpose(1, 2)
features = self.layer_norm(features)
if padding_mask is not None:
padding_mask = self.forward_padding_mask(features, padding_mask)
if self.post_extract_proj is not None:
features = self.post_extract_proj(features)
features = self.dropout_input(features)
if mask:
x, mask_indices = self.apply_mask(
features, padding_mask
)
else:
x = features
# feature: (B, T, D), float
# target: (B, T), long
# x: (B, T, D), float
# padding_mask: (B, T), bool
# mask_indices: (B, T), bool
x, layer_results = self.encoder(
x,
padding_mask=padding_mask,
layer=None if output_layer is None else output_layer - 1
)
res = {"x": x, "padding_mask": padding_mask, "features": features, "layer_results": layer_results}
feature = res["features"] if ret_conv else res["x"]
if ret_layer_results:
feature = (feature, res["layer_results"])
return feature, res["padding_mask"]
class ConvFeatureExtractionModel(nn.Module):
def __init__(
self,
conv_layers: List[Tuple[int, int, int]],
dropout: float = 0.0,
mode: str = "default",
conv_bias: bool = False,
conv_type: str = "default"
):
super().__init__()
assert mode in {"default", "layer_norm"}
def block(
n_in,
n_out,
k,
stride,
is_layer_norm=False,
is_group_norm=False,
conv_bias=False,
):
def make_conv():
conv = nn.Conv1d(n_in, n_out, k, stride=stride, bias=conv_bias)
nn.init.kaiming_normal_(conv.weight)
return conv
assert (
is_layer_norm and is_group_norm
) == False, "layer norm and group norm are exclusive"
if is_layer_norm:
return nn.Sequential(
make_conv(),
nn.Dropout(p=dropout),
nn.Sequential(
TransposeLast(),
Fp32LayerNorm(dim, elementwise_affine=True),
TransposeLast(),
),
nn.GELU(),
)
elif is_group_norm:
return nn.Sequential(
make_conv(),
nn.Dropout(p=dropout),
Fp32GroupNorm(dim, dim, affine=True),
nn.GELU(),
)
else:
return nn.Sequential(make_conv(), nn.Dropout(p=dropout), nn.GELU())
self.conv_type = conv_type
if self.conv_type == "default":
in_d = 1
self.conv_layers = nn.ModuleList()
for i, cl in enumerate(conv_layers):
assert len(cl) == 3, "invalid conv definition: " + str(cl)
(dim, k, stride) = cl
self.conv_layers.append(
block(
in_d,
dim,
k,
stride,
is_layer_norm=mode == "layer_norm",
is_group_norm=mode == "default" and i == 0,
conv_bias=conv_bias,
)
)
in_d = dim
elif self.conv_type == "conv2d":
in_d = 1
self.conv_layers = nn.ModuleList()
for i, cl in enumerate(conv_layers):
assert len(cl) == 3
(dim, k, stride) = cl
self.conv_layers.append(
torch.nn.Conv2d(in_d, dim, k, stride)
)
self.conv_layers.append(torch.nn.ReLU())
in_d = dim
elif self.conv_type == "custom":
in_d = 1
idim = 80
self.conv_layers = nn.ModuleList()
for i, cl in enumerate(conv_layers):
assert len(cl) == 3
(dim, k, stride) = cl
self.conv_layers.append(
torch.nn.Conv2d(in_d, dim, k, stride, padding=1)
)
self.conv_layers.append(
torch.nn.LayerNorm([dim, idim])
)
self.conv_layers.append(torch.nn.ReLU())
in_d = dim
if (i + 1) % 2 == 0:
self.conv_layers.append(
torch.nn.MaxPool2d(2, stride=2, ceil_mode=True)
)
idim = int(math.ceil(idim / 2))
else:
pass
def forward(self, x, mask=None):
# BxT -> BxCxT
x = x.unsqueeze(1)
if self.conv_type == "custom":
for conv in self.conv_layers:
if isinstance(conv, nn.LayerNorm):
x = x.transpose(1, 2)
x = conv(x).transpose(1, 2)
else:
x = conv(x)
x = x.transpose(2, 3).contiguous()
x = x.view(x.size(0), -1, x.size(-1))
else:
for conv in self.conv_layers:
x = conv(x)
if self.conv_type == "conv2d":
b, c, t, f = x.size()
x = x.transpose(2, 3).contiguous().view(b, c * f, t)
return x
class TransformerEncoder(nn.Module):
def __init__(self, args):
super().__init__()
self.dropout = args.dropout
self.embedding_dim = args.encoder_embed_dim
self.pos_conv = nn.Conv1d(
self.embedding_dim,
self.embedding_dim,
kernel_size=args.conv_pos,
padding=args.conv_pos // 2,
groups=args.conv_pos_groups,
)
dropout = 0
std = math.sqrt((4 * (1.0 - dropout)) / (args.conv_pos * self.embedding_dim))
nn.init.normal_(self.pos_conv.weight, mean=0, std=std)
nn.init.constant_(self.pos_conv.bias, 0)
self.pos_conv = nn.utils.weight_norm(self.pos_conv, name="weight", dim=2)
self.pos_conv = nn.Sequential(self.pos_conv, SamePad(args.conv_pos), nn.GELU())
if hasattr(args, "relative_position_embedding"):
self.relative_position_embedding = args.relative_position_embedding
self.num_buckets = args.num_buckets
self.max_distance = args.max_distance
else:
self.relative_position_embedding = False
self.num_buckets = 0
self.max_distance = 0
self.layers = nn.ModuleList(
[
TransformerSentenceEncoderLayer(
embedding_dim=self.embedding_dim,
ffn_embedding_dim=args.encoder_ffn_embed_dim,
num_attention_heads=args.encoder_attention_heads,
dropout=self.dropout,
attention_dropout=args.attention_dropout,
activation_dropout=args.activation_dropout,
activation_fn=args.activation_fn,
layer_norm_first=args.layer_norm_first,
has_relative_attention_bias=(self.relative_position_embedding and i == 0),
num_buckets=self.num_buckets,
max_distance=self.max_distance,
gru_rel_pos=args.gru_rel_pos,
)
for i in range(args.encoder_layers)
]
)
self.layer_norm_first = args.layer_norm_first
self.layer_norm = LayerNorm(self.embedding_dim)
self.layerdrop = args.encoder_layerdrop
self.apply(init_bert_params)
def forward(self, x, padding_mask=None, streaming_mask=None, layer=None):
x, layer_results = self.extract_features(x, padding_mask, streaming_mask, layer)
if self.layer_norm_first and layer is None:
x = self.layer_norm(x)
return x, layer_results
def extract_features(self, x, padding_mask=None, streaming_mask=None, tgt_layer=None):
if padding_mask is not None:
x[padding_mask] = 0
x_conv = self.pos_conv(x.transpose(1, 2))
x_conv = x_conv.transpose(1, 2)
x = x + x_conv
if not self.layer_norm_first:
x = self.layer_norm(x)
x = F.dropout(x, p=self.dropout, training=self.training)
# B x T x C -> T x B x C
x = x.transpose(0, 1)
layer_results = []
z = None
if tgt_layer is not None:
layer_results.append((x, z))
r = None
pos_bias = None
for i, layer in enumerate(self.layers):
dropout_probability = np.random.random()
if not self.training or (dropout_probability > self.layerdrop):
x, z, pos_bias = layer(x, self_attn_padding_mask=padding_mask, need_weights=False,
self_attn_mask=streaming_mask, pos_bias=pos_bias)
if tgt_layer is not None:
layer_results.append((x, z))
if i == tgt_layer:
r = x
break
if r is not None:
x = r
# T x B x C -> B x T x C
x = x.transpose(0, 1)
return x, layer_results
class TransformerSentenceEncoderLayer(nn.Module):
"""
Implements a Transformer Encoder Layer used in BERT/XLM style pre-trained
models.
"""
def __init__(
self,
embedding_dim: float = 768,
ffn_embedding_dim: float = 3072,
num_attention_heads: float = 8,
dropout: float = 0.1,
attention_dropout: float = 0.1,
activation_dropout: float = 0.1,
activation_fn: str = "relu",
layer_norm_first: bool = False,
has_relative_attention_bias: bool = False,
num_buckets: int = 0,
max_distance: int = 0,
rescale_init: bool = False,
gru_rel_pos: bool = False,
) -> None:
super().__init__()
# Initialize parameters
self.embedding_dim = embedding_dim
self.dropout = dropout
self.activation_dropout = activation_dropout
# Initialize blocks
self.activation_name = activation_fn
self.activation_fn = get_activation_fn(activation_fn)
self.self_attn = MultiheadAttention(
self.embedding_dim,
num_attention_heads,
dropout=attention_dropout,
self_attention=True,
has_relative_attention_bias=has_relative_attention_bias,
num_buckets=num_buckets,
max_distance=max_distance,
rescale_init=rescale_init,
gru_rel_pos=gru_rel_pos,
)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(self.activation_dropout)
self.dropout3 = nn.Dropout(dropout)
self.layer_norm_first = layer_norm_first
# layer norm associated with the self attention layer
self.self_attn_layer_norm = LayerNorm(self.embedding_dim)
if self.activation_name == "glu":
self.fc1 = GLU_Linear(self.embedding_dim, ffn_embedding_dim, "swish")
else:
self.fc1 = nn.Linear(self.embedding_dim, ffn_embedding_dim)
self.fc2 = nn.Linear(ffn_embedding_dim, self.embedding_dim)
# layer norm associated with the position wise feed-forward NN
self.final_layer_norm = LayerNorm(self.embedding_dim)
def forward(
self,
x: torch.Tensor,
self_attn_mask: torch.Tensor = None,
self_attn_padding_mask: torch.Tensor = None,
need_weights: bool = False,
pos_bias=None
):
"""
LayerNorm is applied either before or after the self-attention/ffn
modules similar to the original Transformer imlementation.
"""
residual = x
if self.layer_norm_first:
x = self.self_attn_layer_norm(x)
x, attn, pos_bias = self.self_attn(
query=x,
key=x,
value=x,
key_padding_mask=self_attn_padding_mask,
need_weights=False,
attn_mask=self_attn_mask,
position_bias=pos_bias
)
x = self.dropout1(x)
x = residual + x
residual = x
x = self.final_layer_norm(x)
if self.activation_name == "glu":
x = self.fc1(x)
else:
x = self.activation_fn(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
x = self.dropout3(x)
x = residual + x
else:
x, attn, pos_bias = self.self_attn(
query=x,
key=x,
value=x,
key_padding_mask=self_attn_padding_mask,
need_weights=need_weights,
attn_mask=self_attn_mask,
position_bias=pos_bias
)
x = self.dropout1(x)
x = residual + x
x = self.self_attn_layer_norm(x)
residual = x
if self.activation_name == "glu":
x = self.fc1(x)
else:
x = self.activation_fn(self.fc1(x))
x = self.dropout2(x)
x = self.fc2(x)
x = self.dropout3(x)
x = residual + x
x = self.final_layer_norm(x)
return x, attn, pos_bias

827
vencoder/wavlm/modules.py Normal file
View File

@ -0,0 +1,827 @@
# --------------------------------------------------------
# WavLM: Large-Scale Self-Supervised Pre-training for Full Stack Speech Processing (https://arxiv.org/abs/2110.13900.pdf)
# Github source: https://github.com/microsoft/unilm/tree/master/wavlm
# Copyright (c) 2021 Microsoft
# Licensed under The MIT License [see LICENSE for details]
# Based on fairseq code bases
# https://github.com/pytorch/fairseq
# --------------------------------------------------------
import math
import warnings
from typing import Dict, Optional, Tuple
import torch
from torch import Tensor, nn
from torch.nn import Parameter
import torch.nn.functional as F
class TransposeLast(nn.Module):
def __init__(self, deconstruct_idx=None):
super().__init__()
self.deconstruct_idx = deconstruct_idx
def forward(self, x):
if self.deconstruct_idx is not None:
x = x[self.deconstruct_idx]
return x.transpose(-2, -1)
class Fp32LayerNorm(nn.LayerNorm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def forward(self, input):
output = F.layer_norm(
input.float(),
self.normalized_shape,
self.weight.float() if self.weight is not None else None,
self.bias.float() if self.bias is not None else None,
self.eps,
)
return output.type_as(input)
class Fp32GroupNorm(nn.GroupNorm):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def forward(self, input):
output = F.group_norm(
input.float(),
self.num_groups,
self.weight.float() if self.weight is not None else None,
self.bias.float() if self.bias is not None else None,
self.eps,
)
return output.type_as(input)
class GradMultiply(torch.autograd.Function):
@staticmethod
def forward(ctx, x, scale):
ctx.scale = scale
res = x.new(x)
return res
@staticmethod
def backward(ctx, grad):
return grad * ctx.scale, None
class SamePad(nn.Module):
def __init__(self, kernel_size, causal=False):
super().__init__()
if causal:
self.remove = kernel_size - 1
else:
self.remove = 1 if kernel_size % 2 == 0 else 0
def forward(self, x):
if self.remove > 0:
x = x[:, :, : -self.remove]
return x
class Swish(nn.Module):
"""Swish function
"""
def __init__(self):
"""Construct an MultiHeadedAttention object."""
super(Swish, self).__init__()
self.act = torch.nn.Sigmoid()
def forward(self, x):
return x * self.act(x)
class GLU_Linear(nn.Module):
def __init__(self, input_dim, output_dim, glu_type="sigmoid", bias_in_glu=True):
super(GLU_Linear, self).__init__()
self.glu_type = glu_type
self.output_dim = output_dim
if glu_type == "sigmoid":
self.glu_act = torch.nn.Sigmoid()
elif glu_type == "swish":
self.glu_act = Swish()
elif glu_type == "relu":
self.glu_act = torch.nn.ReLU()
elif glu_type == "gelu":
self.glu_act = torch.nn.GELU()
if bias_in_glu:
self.linear = nn.Linear(input_dim, output_dim * 2, True)
else:
self.linear = nn.Linear(input_dim, output_dim * 2, False)
def forward(self, x):
# to be consistent with GLU_Linear, we assume the input always has the #channel (#dim) in the last dimension of the tensor, so need to switch the dimension first for 1D-Conv case
x = self.linear(x)
if self.glu_type == "bilinear":
x = (x[:, :, 0:self.output_dim] * x[:, :, self.output_dim:self.output_dim * 2])
else:
x = (x[:, :, 0:self.output_dim] * self.glu_act(x[:, :, self.output_dim:self.output_dim * 2]))
return x
def gelu_accurate(x):
if not hasattr(gelu_accurate, "_a"):
gelu_accurate._a = math.sqrt(2 / math.pi)
return (
0.5 * x * (1 + torch.tanh(gelu_accurate._a * (x + 0.044715 * torch.pow(x, 3))))
)
def gelu(x: torch.Tensor) -> torch.Tensor:
return torch.nn.functional.gelu(x.float()).type_as(x)
def get_activation_fn(activation: str):
"""Returns the activation function corresponding to `activation`"""
if activation == "relu":
return F.relu
elif activation == "gelu":
return gelu
elif activation == "gelu_fast":
warnings.warn(
"--activation-fn=gelu_fast has been renamed to gelu_accurate"
)
return gelu_accurate
elif activation == "gelu_accurate":
return gelu_accurate
elif activation == "tanh":
return torch.tanh
elif activation == "linear":
return lambda x: x
elif activation == "glu":
return lambda x: x
else:
raise RuntimeError("--activation-fn {} not supported".format(activation))
def init_bert_params(module):
"""
Initialize the weights specific to the BERT Model.
This overrides the default initializations depending on the specified arguments.
1. If normal_init_linear_weights is set then weights of linear
layer will be initialized using the normal distribution and
bais will be set to the specified value.
2. If normal_init_embed_weights is set then weights of embedding
layer will be initialized using the normal distribution.
3. If normal_init_proj_weights is set then weights of
in_project_weight for MultiHeadAttention initialized using
the normal distribution (to be validated).
"""
def normal_(data):
# with FSDP, module params will be on CUDA, so we cast them back to CPU
# so that the RNG is consistent with and without FSDP
data.copy_(
data.cpu().normal_(mean=0.0, std=0.02).to(data.device)
)
if isinstance(module, nn.Linear):
normal_(module.weight.data)
if module.bias is not None:
module.bias.data.zero_()
if isinstance(module, nn.Embedding):
normal_(module.weight.data)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
if isinstance(module, MultiheadAttention):
normal_(module.q_proj.weight.data)
normal_(module.k_proj.weight.data)
normal_(module.v_proj.weight.data)
def quant_noise(module, p, block_size):
"""
Wraps modules and applies quantization noise to the weights for
subsequent quantization with Iterative Product Quantization as
described in "Training with Quantization Noise for Extreme Model Compression"
Args:
- module: nn.Module
- p: amount of Quantization Noise
- block_size: size of the blocks for subsequent quantization with iPQ
Remarks:
- Module weights must have the right sizes wrt the block size
- Only Linear, Embedding and Conv2d modules are supported for the moment
- For more detail on how to quantize by blocks with convolutional weights,
see "And the Bit Goes Down: Revisiting the Quantization of Neural Networks"
- We implement the simplest form of noise here as stated in the paper
which consists in randomly dropping blocks
"""
# if no quantization noise, don't register hook
if p <= 0:
return module
# supported modules
assert isinstance(module, (nn.Linear, nn.Embedding, nn.Conv2d))
# test whether module.weight has the right sizes wrt block_size
is_conv = module.weight.ndim == 4
# 2D matrix
if not is_conv:
assert (
module.weight.size(1) % block_size == 0
), "Input features must be a multiple of block sizes"
# 4D matrix
else:
# 1x1 convolutions
if module.kernel_size == (1, 1):
assert (
module.in_channels % block_size == 0
), "Input channels must be a multiple of block sizes"
# regular convolutions
else:
k = module.kernel_size[0] * module.kernel_size[1]
assert k % block_size == 0, "Kernel size must be a multiple of block size"
def _forward_pre_hook(mod, input):
# no noise for evaluation
if mod.training:
if not is_conv:
# gather weight and sizes
weight = mod.weight
in_features = weight.size(1)
out_features = weight.size(0)
# split weight matrix into blocks and randomly drop selected blocks
mask = torch.zeros(
in_features // block_size * out_features, device=weight.device
)
mask.bernoulli_(p)
mask = mask.repeat_interleave(block_size, -1).view(-1, in_features)
else:
# gather weight and sizes
weight = mod.weight
in_channels = mod.in_channels
out_channels = mod.out_channels
# split weight matrix into blocks and randomly drop selected blocks
if mod.kernel_size == (1, 1):
mask = torch.zeros(
int(in_channels // block_size * out_channels),
device=weight.device,
)
mask.bernoulli_(p)
mask = mask.repeat_interleave(block_size, -1).view(-1, in_channels)
else:
mask = torch.zeros(
weight.size(0), weight.size(1), device=weight.device
)
mask.bernoulli_(p)
mask = (
mask.unsqueeze(2)
.unsqueeze(3)
.repeat(1, 1, mod.kernel_size[0], mod.kernel_size[1])
)
# scale weights and apply mask
mask = mask.to(
torch.bool
) # x.bool() is not currently supported in TorchScript
s = 1 / (1 - p)
mod.weight.data = s * weight.masked_fill(mask, 0)
module.register_forward_pre_hook(_forward_pre_hook)
return module
class MultiheadAttention(nn.Module):
"""Multi-headed attention.
See "Attention Is All You Need" for more details.
"""
def __init__(
self,
embed_dim,
num_heads,
kdim=None,
vdim=None,
dropout=0.0,
bias=True,
add_bias_kv=False,
add_zero_attn=False,
self_attention=False,
encoder_decoder_attention=False,
q_noise=0.0,
qn_block_size=8,
has_relative_attention_bias=False,
num_buckets=32,
max_distance=128,
gru_rel_pos=False,
rescale_init=False,
):
super().__init__()
self.embed_dim = embed_dim
self.kdim = kdim if kdim is not None else embed_dim
self.vdim = vdim if vdim is not None else embed_dim
self.qkv_same_dim = self.kdim == embed_dim and self.vdim == embed_dim
self.num_heads = num_heads
self.dropout_module = nn.Dropout(dropout)
self.has_relative_attention_bias = has_relative_attention_bias
self.num_buckets = num_buckets
self.max_distance = max_distance
if self.has_relative_attention_bias:
self.relative_attention_bias = nn.Embedding(num_buckets, num_heads)
self.head_dim = embed_dim // num_heads
self.q_head_dim = self.head_dim
self.k_head_dim = self.head_dim
assert (
self.head_dim * num_heads == self.embed_dim
), "embed_dim must be divisible by num_heads"
self.scaling = self.head_dim ** -0.5
self.self_attention = self_attention
self.encoder_decoder_attention = encoder_decoder_attention
assert not self.self_attention or self.qkv_same_dim, (
"Self-attention requires query, key and " "value to be of the same size"
)
k_bias = True
if rescale_init:
k_bias = False
k_embed_dim = embed_dim
q_embed_dim = embed_dim
self.k_proj = quant_noise(
nn.Linear(self.kdim, k_embed_dim, bias=k_bias), q_noise, qn_block_size
)
self.v_proj = quant_noise(
nn.Linear(self.vdim, embed_dim, bias=bias), q_noise, qn_block_size
)
self.q_proj = quant_noise(
nn.Linear(embed_dim, q_embed_dim, bias=bias), q_noise, qn_block_size
)
self.out_proj = quant_noise(
nn.Linear(embed_dim, embed_dim, bias=bias), q_noise, qn_block_size
)
if add_bias_kv:
self.bias_k = Parameter(torch.Tensor(1, 1, embed_dim))
self.bias_v = Parameter(torch.Tensor(1, 1, embed_dim))
else:
self.bias_k = self.bias_v = None
self.add_zero_attn = add_zero_attn
self.gru_rel_pos = gru_rel_pos
if self.gru_rel_pos:
self.grep_linear = nn.Linear(self.q_head_dim, 8)
self.grep_a = nn.Parameter(torch.ones(1, num_heads, 1, 1))
self.reset_parameters()
def reset_parameters(self):
if self.qkv_same_dim:
# Empirically observed the convergence to be much better with
# the scaled initialization
nn.init.xavier_uniform_(self.k_proj.weight, gain=1 / math.sqrt(2))
nn.init.xavier_uniform_(self.v_proj.weight, gain=1 / math.sqrt(2))
nn.init.xavier_uniform_(self.q_proj.weight, gain=1 / math.sqrt(2))
else:
nn.init.xavier_uniform_(self.k_proj.weight)
nn.init.xavier_uniform_(self.v_proj.weight)
nn.init.xavier_uniform_(self.q_proj.weight)
nn.init.xavier_uniform_(self.out_proj.weight)
if self.out_proj.bias is not None:
nn.init.constant_(self.out_proj.bias, 0.0)
if self.bias_k is not None:
nn.init.xavier_normal_(self.bias_k)
if self.bias_v is not None:
nn.init.xavier_normal_(self.bias_v)
if self.has_relative_attention_bias:
nn.init.xavier_normal_(self.relative_attention_bias.weight)
def _relative_positions_bucket(self, relative_positions, bidirectional=True):
num_buckets = self.num_buckets
max_distance = self.max_distance
relative_buckets = 0
if bidirectional:
num_buckets = num_buckets // 2
relative_buckets += (relative_positions > 0).to(torch.long) * num_buckets
relative_positions = torch.abs(relative_positions)
else:
relative_positions = -torch.min(relative_positions, torch.zeros_like(relative_positions))
max_exact = num_buckets // 2
is_small = relative_positions < max_exact
relative_postion_if_large = max_exact + (
torch.log(relative_positions.float() / max_exact)
/ math.log(max_distance / max_exact)
* (num_buckets - max_exact)
).to(torch.long)
relative_postion_if_large = torch.min(
relative_postion_if_large, torch.full_like(relative_postion_if_large, num_buckets - 1)
)
relative_buckets += torch.where(is_small, relative_positions, relative_postion_if_large)
return relative_buckets
def compute_bias(self, query_length, key_length):
context_position = torch.arange(query_length, dtype=torch.long)[:, None]
memory_position = torch.arange(key_length, dtype=torch.long)[None, :]
relative_position = memory_position - context_position
relative_position_bucket = self._relative_positions_bucket(
relative_position,
bidirectional=True
)
relative_position_bucket = relative_position_bucket.to(self.relative_attention_bias.weight.device)
values = self.relative_attention_bias(relative_position_bucket)
values = values.permute([2, 0, 1])
return values
def forward(
self,
query,
key: Optional[Tensor],
value: Optional[Tensor],
key_padding_mask: Optional[Tensor] = None,
incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]] = None,
need_weights: bool = True,
static_kv: bool = False,
attn_mask: Optional[Tensor] = None,
before_softmax: bool = False,
need_head_weights: bool = False,
position_bias: Optional[Tensor] = None
) -> Tuple[Tensor, Optional[Tensor], Optional[Tensor]]:
"""Input shape: Time x Batch x Channel
Args:
key_padding_mask (ByteTensor, optional): mask to exclude
keys that are pads, of shape `(batch, src_len)`, where
padding elements are indicated by 1s.
need_weights (bool, optional): return the attention weights,
averaged over heads (default: False).
attn_mask (ByteTensor, optional): typically used to
implement causal attention, where the mask prevents the
attention from looking forward in time (default: None).
before_softmax (bool, optional): return the raw attention
weights and values before the attention softmax.
need_head_weights (bool, optional): return the attention
weights for each head. Implies *need_weights*. Default:
return the average attention weights over all heads.
"""
if need_head_weights:
need_weights = True
is_tpu = query.device.type == "xla"
tgt_len, bsz, embed_dim = query.size()
src_len = tgt_len
assert embed_dim == self.embed_dim
assert list(query.size()) == [tgt_len, bsz, embed_dim]
if key is not None:
src_len, key_bsz, _ = key.size()
if not torch.jit.is_scripting():
assert key_bsz == bsz
assert value is not None
assert src_len, bsz == value.shape[:2]
if self.has_relative_attention_bias and position_bias is None:
position_bias = self.compute_bias(tgt_len, src_len)
position_bias = position_bias.unsqueeze(0).repeat(bsz, 1, 1, 1).view(bsz * self.num_heads, tgt_len, src_len)
if (
not is_tpu # don't use PyTorch version on TPUs
and incremental_state is None
and not static_kv
# A workaround for quantization to work. Otherwise JIT compilation
# treats bias in linear module as method.
and not torch.jit.is_scripting()
and self.q_head_dim == self.head_dim
):
assert key is not None and value is not None
assert attn_mask is None
attn_mask_rel_pos = None
if position_bias is not None:
attn_mask_rel_pos = position_bias
if self.gru_rel_pos:
query_layer = query.transpose(0, 1)
new_x_shape = query_layer.size()[:-1] + (self.num_heads, -1)
query_layer = query_layer.view(*new_x_shape)
query_layer = query_layer.permute(0, 2, 1, 3)
_B, _H, _L, __ = query_layer.size()
gate_a, gate_b = torch.sigmoid(self.grep_linear(query_layer).view(
_B, _H, _L, 2, 4).sum(-1, keepdim=False)).chunk(2, dim=-1)
gate_a_1 = gate_a * (gate_b * self.grep_a - 1.0) + 2.0
attn_mask_rel_pos = gate_a_1.view(bsz * self.num_heads, -1, 1) * position_bias
attn_mask_rel_pos = attn_mask_rel_pos.view((-1, tgt_len, tgt_len))
k_proj_bias = self.k_proj.bias
if k_proj_bias is None:
k_proj_bias = torch.zeros_like(self.q_proj.bias)
x, attn = F.multi_head_attention_forward(
query,
key,
value,
self.embed_dim,
self.num_heads,
torch.empty([0]),
torch.cat((self.q_proj.bias, self.k_proj.bias, self.v_proj.bias)),
self.bias_k,
self.bias_v,
self.add_zero_attn,
self.dropout_module.p,
self.out_proj.weight,
self.out_proj.bias,
self.training,
# self.training or self.dropout_module.apply_during_inference,
key_padding_mask,
need_weights,
attn_mask_rel_pos,
use_separate_proj_weight=True,
q_proj_weight=self.q_proj.weight,
k_proj_weight=self.k_proj.weight,
v_proj_weight=self.v_proj.weight,
)
return x, attn, position_bias
if incremental_state is not None:
saved_state = self._get_input_buffer(incremental_state)
if saved_state is not None and "prev_key" in saved_state:
# previous time steps are cached - no need to recompute
# key and value if they are static
if static_kv:
assert self.encoder_decoder_attention and not self.self_attention
key = value = None
else:
saved_state = None
if self.self_attention:
q = self.q_proj(query)
k = self.k_proj(query)
v = self.v_proj(query)
elif self.encoder_decoder_attention:
# encoder-decoder attention
q = self.q_proj(query)
if key is None:
assert value is None
k = v = None
else:
k = self.k_proj(key)
v = self.v_proj(key)
else:
assert key is not None and value is not None
q = self.q_proj(query)
k = self.k_proj(key)
v = self.v_proj(value)
q *= self.scaling
if self.bias_k is not None:
assert self.bias_v is not None
k = torch.cat([k, self.bias_k.repeat(1, bsz, 1)])
v = torch.cat([v, self.bias_v.repeat(1, bsz, 1)])
if attn_mask is not None:
attn_mask = torch.cat(
[attn_mask, attn_mask.new_zeros(attn_mask.size(0), 1)], dim=1
)
if key_padding_mask is not None:
key_padding_mask = torch.cat(
[
key_padding_mask,
key_padding_mask.new_zeros(key_padding_mask.size(0), 1),
],
dim=1,
)
q = (
q.contiguous()
.view(tgt_len, bsz * self.num_heads, self.q_head_dim)
.transpose(0, 1)
)
if k is not None:
k = (
k.contiguous()
.view(-1, bsz * self.num_heads, self.k_head_dim)
.transpose(0, 1)
)
if v is not None:
v = (
v.contiguous()
.view(-1, bsz * self.num_heads, self.head_dim)
.transpose(0, 1)
)
if saved_state is not None:
# saved states are stored with shape (bsz, num_heads, seq_len, head_dim)
if "prev_key" in saved_state:
_prev_key = saved_state["prev_key"]
assert _prev_key is not None
prev_key = _prev_key.view(bsz * self.num_heads, -1, self.head_dim)
if static_kv:
k = prev_key
else:
assert k is not None
k = torch.cat([prev_key, k], dim=1)
src_len = k.size(1)
if "prev_value" in saved_state:
_prev_value = saved_state["prev_value"]
assert _prev_value is not None
prev_value = _prev_value.view(bsz * self.num_heads, -1, self.head_dim)
if static_kv:
v = prev_value
else:
assert v is not None
v = torch.cat([prev_value, v], dim=1)
prev_key_padding_mask: Optional[Tensor] = None
if "prev_key_padding_mask" in saved_state:
prev_key_padding_mask = saved_state["prev_key_padding_mask"]
assert k is not None and v is not None
key_padding_mask = MultiheadAttention._append_prev_key_padding_mask(
key_padding_mask=key_padding_mask,
prev_key_padding_mask=prev_key_padding_mask,
batch_size=bsz,
src_len=k.size(1),
static_kv=static_kv,
)
saved_state["prev_key"] = k.view(bsz, self.num_heads, -1, self.head_dim)
saved_state["prev_value"] = v.view(bsz, self.num_heads, -1, self.head_dim)
saved_state["prev_key_padding_mask"] = key_padding_mask
# In this branch incremental_state is never None
assert incremental_state is not None
incremental_state = self._set_input_buffer(incremental_state, saved_state)
assert k is not None
assert k.size(1) == src_len
# This is part of a workaround to get around fork/join parallelism
# not supporting Optional types.
if key_padding_mask is not None and key_padding_mask.dim() == 0:
key_padding_mask = None
if key_padding_mask is not None:
assert key_padding_mask.size(0) == bsz
assert key_padding_mask.size(1) == src_len
if self.add_zero_attn:
assert v is not None
src_len += 1
k = torch.cat([k, k.new_zeros((k.size(0), 1) + k.size()[2:])], dim=1)
v = torch.cat([v, v.new_zeros((v.size(0), 1) + v.size()[2:])], dim=1)
if attn_mask is not None:
attn_mask = torch.cat(
[attn_mask, attn_mask.new_zeros(attn_mask.size(0), 1)], dim=1
)
if key_padding_mask is not None:
key_padding_mask = torch.cat(
[
key_padding_mask,
torch.zeros(key_padding_mask.size(0), 1).type_as(
key_padding_mask
),
],
dim=1,
)
attn_weights = torch.bmm(q, k.transpose(1, 2))
attn_weights = self.apply_sparse_mask(attn_weights, tgt_len, src_len, bsz)
assert list(attn_weights.size()) == [bsz * self.num_heads, tgt_len, src_len]
if attn_mask is not None:
attn_mask = attn_mask.unsqueeze(0)
attn_weights += attn_mask
if key_padding_mask is not None:
# don't attend to padding symbols
attn_weights = attn_weights.view(bsz, self.num_heads, tgt_len, src_len)
if not is_tpu:
attn_weights = attn_weights.masked_fill(
key_padding_mask.unsqueeze(1).unsqueeze(2).to(torch.bool),
float("-inf"),
)
else:
attn_weights = attn_weights.transpose(0, 2)
attn_weights = attn_weights.masked_fill(key_padding_mask, float("-inf"))
attn_weights = attn_weights.transpose(0, 2)
attn_weights = attn_weights.view(bsz * self.num_heads, tgt_len, src_len)
if before_softmax:
return attn_weights, v, position_bias
if position_bias is not None:
if self.gru_rel_pos == 1:
query_layer = q.view(bsz, self.num_heads, tgt_len, self.q_head_dim)
_B, _H, _L, __ = query_layer.size()
gate_a, gate_b = torch.sigmoid(self.grep_linear(query_layer).view(
_B, _H, _L, 2, 4).sum(-1, keepdim=False)).chunk(2, dim=-1)
gate_a_1 = gate_a * (gate_b * self.grep_a - 1.0) + 2.0
position_bias = gate_a_1.view(bsz * self.num_heads, -1, 1) * position_bias
position_bias = position_bias.view(attn_weights.size())
attn_weights = attn_weights + position_bias
attn_weights_float = F.softmax(
attn_weights, dim=-1
)
attn_weights = attn_weights_float.type_as(attn_weights)
attn_probs = self.dropout_module(attn_weights)
assert v is not None
attn = torch.bmm(attn_probs, v)
assert list(attn.size()) == [bsz * self.num_heads, tgt_len, self.head_dim]
attn = attn.transpose(0, 1).contiguous().view(tgt_len, bsz, embed_dim)
attn = self.out_proj(attn)
attn_weights: Optional[Tensor] = None
if need_weights:
attn_weights = attn_weights_float.view(
bsz, self.num_heads, tgt_len, src_len
).transpose(1, 0)
if not need_head_weights:
# average attention weights over heads
attn_weights = attn_weights.mean(dim=0)
return attn, attn_weights, position_bias
@staticmethod
def _append_prev_key_padding_mask(
key_padding_mask: Optional[Tensor],
prev_key_padding_mask: Optional[Tensor],
batch_size: int,
src_len: int,
static_kv: bool,
) -> Optional[Tensor]:
# saved key padding masks have shape (bsz, seq_len)
if prev_key_padding_mask is not None and static_kv:
new_key_padding_mask = prev_key_padding_mask
elif prev_key_padding_mask is not None and key_padding_mask is not None:
new_key_padding_mask = torch.cat(
[prev_key_padding_mask.float(), key_padding_mask.float()], dim=1
)
# During incremental decoding, as the padding token enters and
# leaves the frame, there will be a time when prev or current
# is None
elif prev_key_padding_mask is not None:
if src_len > prev_key_padding_mask.size(1):
filler = torch.zeros(
(batch_size, src_len - prev_key_padding_mask.size(1)),
device=prev_key_padding_mask.device,
)
new_key_padding_mask = torch.cat(
[prev_key_padding_mask.float(), filler.float()], dim=1
)
else:
new_key_padding_mask = prev_key_padding_mask.float()
elif key_padding_mask is not None:
if src_len > key_padding_mask.size(1):
filler = torch.zeros(
(batch_size, src_len - key_padding_mask.size(1)),
device=key_padding_mask.device,
)
new_key_padding_mask = torch.cat(
[filler.float(), key_padding_mask.float()], dim=1
)
else:
new_key_padding_mask = key_padding_mask.float()
else:
new_key_padding_mask = prev_key_padding_mask
return new_key_padding_mask
def _get_input_buffer(
self, incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]]
) -> Dict[str, Optional[Tensor]]:
result = self.get_incremental_state(incremental_state, "attn_state")
if result is not None:
return result
else:
empty_result: Dict[str, Optional[Tensor]] = {}
return empty_result
def _set_input_buffer(
self,
incremental_state: Dict[str, Dict[str, Optional[Tensor]]],
buffer: Dict[str, Optional[Tensor]],
):
return self.set_incremental_state(incremental_state, "attn_state", buffer)
def apply_sparse_mask(self, attn_weights, tgt_len: int, src_len: int, bsz: int):
return attn_weights

126
webUI.py
View File

@ -22,6 +22,7 @@ import time
import traceback
from itertools import chain
from utils import mix_model
from compress_model import removeOptimizer
logging.getLogger('numba').setLevel(logging.WARNING)
logging.getLogger('markdown_it').setLevel(logging.WARNING)
@ -74,18 +75,38 @@ def updata_mix_info(files):
if debug: traceback.print_exc()
raise gr.Error(e)
def modelAnalysis(model_path,config_path,cluster_model_path,device,enhance):
def modelAnalysis(model_path,config_path,cluster_model_path,device,enhance,diff_model_path,diff_config_path,only_diffusion,use_spk_mix):
global model
try:
device = cuda[device] if "CUDA" in device else device
model = Svc(model_path.name, config_path.name, device=device if device!="Auto" else None, cluster_model_path = cluster_model_path.name if cluster_model_path != None else "",nsf_hifigan_enhance=enhance)
cluster_filepath = os.path.split(cluster_model_path.name) if cluster_model_path is not None else "no_cluster"
fr = ".pkl" in cluster_filepath[1]
#model = Svc(model_path.name, config_path.name, device=device if device!="Auto" else None, cluster_model_path = cluster_model_path.name if cluster_model_path != None else "",nsf_hifigan_enhance=enhance)
model = Svc(model_path.name,
config_path.name,
device=device if device != "Auto" else None,
cluster_model_path = cluster_model_path.name if cluster_model_path is not None else "",
nsf_hifigan_enhance=enhance,
diffusion_model_path = diff_model_path.name if diff_model_path is not None else "",
diffusion_config_path = diff_config_path.name if diff_config_path is not None else "",
shallow_diffusion = True if diff_model_path is not None else False,
only_diffusion = only_diffusion,
spk_mix_enable = use_spk_mix,
feature_retrieval = fr
)
spks = list(model.spk2id.keys())
device_name = torch.cuda.get_device_properties(model.dev).name if "cuda" in str(model.dev) else str(model.dev)
msg = f"成功加载模型到设备{device_name}\n"
if cluster_model_path is None:
msg += "未加载聚类模型\n"
msg += "未加载聚类模型或特征检索模型\n"
elif fr:
msg += f"特征检索模型{cluster_filepath[1]}加载成功\n"
else:
msg += f"聚类模型{cluster_model_path.name}加载成功\n"
msg += f"聚类模型{cluster_filepath[1]}加载成功\n"
if diff_model_path is None:
msg += "未加载扩散模型\n"
else:
msg += f"扩散模型{diff_model_path.name}加载成功\n"
msg += "当前模型的可用音色:\n"
for i in spks:
msg += i + " "
@ -105,39 +126,55 @@ def modelUnload():
torch.cuda.empty_cache()
return sid.update(choices = [],value=""),"模型卸载完毕!"
def vc_fn(sid, input_audio, vc_transform, auto_f0,cluster_ratio, slice_db, noise_scale,pad_seconds,cl_num,lg_num,lgr_num,f0_predictor,enhancer_adaptive_key,cr_threshold):
def vc_fn(sid, input_audio, vc_transform, auto_f0,cluster_ratio, slice_db, noise_scale,pad_seconds,cl_num,lg_num,lgr_num,f0_predictor,enhancer_adaptive_key,cr_threshold,k_step,use_spk_mix,second_encoding,loudness_envelope_adjustment):
global model
try:
if input_audio is None:
raise gr.Error("你需要上传音频")
return "You need to upload an audio", None
if model is None:
raise gr.Error("你需要指定模型")
return "You need to upload an model", None
print(input_audio)
sampling_rate, audio = input_audio
# print(audio.shape,sampling_rate)
print(audio.shape,sampling_rate)
audio = (audio / np.iinfo(audio.dtype).max).astype(np.float32)
print(audio.dtype)
if len(audio.shape) > 1:
audio = librosa.to_mono(audio.transpose(1, 0))
temp_path = "temp.wav"
soundfile.write(temp_path, audio, sampling_rate, format="wav")
_audio = model.slice_inference(temp_path, sid, vc_transform, slice_db, cluster_ratio, auto_f0, noise_scale,pad_seconds,cl_num,lg_num,lgr_num,f0_predictor,enhancer_adaptive_key,cr_threshold)
_audio = model.slice_inference(
temp_path,
sid,
vc_transform,
slice_db,
cluster_ratio,
auto_f0,
noise_scale,
pad_seconds,
cl_num,
lg_num,
lgr_num,
f0_predictor,
enhancer_adaptive_key,
cr_threshold,
k_step,
use_spk_mix,
second_encoding,
loudness_envelope_adjustment
)
model.clear_empty()
os.remove(temp_path)
#构建保存文件的路径并保存到results文件夹内
try:
timestamp = str(int(time.time()))
filename = sid + "_" + timestamp + ".wav"
output_file = os.path.join("./results", filename)
soundfile.write(output_file, _audio, model.target_sample, format="wav")
return f"推理成功音频文件保存为results/{filename}", (model.target_sample, _audio)
except Exception as e:
if debug: traceback.print_exc()
return f"文件保存失败,请手动保存", (model.target_sample, _audio)
timestamp = str(int(time.time()))
if not os.path.exists("results"):
os.makedirs("results")
output_file = os.path.join("results", sid + "_" + timestamp + ".wav")
soundfile.write(output_file, _audio, model.target_sample, format="wav")
return "Success", output_file
except Exception as e:
if debug: traceback.print_exc()
raise gr.Error(e)
def tts_func(_text,_rate,_voice):
#使用edge-tts把文字转成音频
# voice = "zh-CN-XiaoyiNeural"#女性,较高音
@ -189,6 +226,17 @@ def vc_fn2(sid, input_audio, vc_transform, auto_f0,cluster_ratio, slice_db, nois
os.remove(save_path2)
return a,b
def model_compression(_model):
if _model == "":
return "请先选择要压缩的模型"
else:
model_path = os.path.split(_model.name)
filename, extension = os.path.splitext(model_path[1])
output_model_name = f"{filename}_compressed{extension}"
output_path = os.path.join(os.getcwd(), output_model_name)
removeOptimizer(_model.name, output_path)
return f"模型已成功被保存在了{output_path}"
def debug_change():
global debug
debug = debug_button.value
@ -210,11 +258,16 @@ with gr.Blocks(
gr.Markdown(value="""
<font size=2> 模型设置</font>
""")
model_path = gr.File(label="选择模型文件")
config_path = gr.File(label="选择配置文件")
cluster_model_path = gr.File(label="选择聚类模型文件(没有可以不选)")
device = gr.Dropdown(label="推理设备默认为自动选择CPU和GPU", choices=["Auto",*cuda.keys(),"CPU"], value="Auto")
with gr.Row():
model_path = gr.File(label="选择模型文件")
config_path = gr.File(label="选择配置文件")
with gr.Row():
diff_model_path = gr.File(label="选择扩散模型文件")
diff_config_path = gr.File(label="选择扩散模型配置文件")
cluster_model_path = gr.File(label="选择聚类模型或特征检索文件(没有可以不选)")
device = gr.Dropdown(label="推理设备默认为自动选择CPU和GPU", choices=["Auto",*cuda.keys(),"cpu"], value="Auto")
enhance = gr.Checkbox(label="是否使用NSF_HIFIGAN增强,该选项对部分训练集少的模型有一定的音质增强效果,但是对训练好的模型有反面效果,默认关闭", value=False)
only_diffusion = gr.Checkbox(label="是否使用全扩散推理开启后将不使用So-VITS模型仅使用扩散模型进行完整扩散推理默认关闭", value=False)
with gr.Column():
gr.Markdown(value="""
<font size=3>左侧文件全部选择完毕后(全部文件模块显示download)点击加载模型进行解析</font>
@ -233,9 +286,10 @@ with gr.Blocks(
auto_f0 = gr.Checkbox(label="自动f0预测配合聚类模型f0预测效果更好,会导致变调功能失效(仅限转换语音,歌声勾选此项会究极跑调)", value=False)
f0_predictor = gr.Dropdown(label="选择F0预测器,可选择crepe,pm,dio,harvest,默认为pm(注意crepe为原F0使用均值滤波器)", choices=["pm","dio","harvest","crepe"], value="pm")
vc_transform = gr.Number(label="变调整数可以正负半音数量升高八度就是12", value=0)
cluster_ratio = gr.Number(label="聚类模型混合比例0-1之间0即不启用聚类。使用聚类模型能提升音色相似度但会导致咬字下降如果使用建议0.5左右)", value=0)
cluster_ratio = gr.Number(label="聚类模型/特征检索混合比例0-1之间0即不启用聚类/特征检索。使用聚类/特征检索能提升音色相似度但会导致咬字下降如果使用建议0.5左右)", value=0)
slice_db = gr.Number(label="切片阈值", value=-40)
noise_scale = gr.Number(label="noise_scale 建议不要动,会影响音质,玄学参数", value=0.4)
k_step = gr.Slider(label="浅扩散步数,只有使用了扩散模型才有效,步数越大越接近扩散模型的结果", value=100, minimum = 1, maximum = 1000)
with gr.Column():
pad_seconds = gr.Number(label="推理音频pad秒数由于未知原因开头结尾会有异响pad一小段静音段后就不会出现", value=0.5)
cl_num = gr.Number(label="音频自动切片0为不切片单位为秒(s)", value=0)
@ -243,6 +297,9 @@ with gr.Blocks(
lgr_num = gr.Number(label="自动音频切片后需要舍弃每段切片的头尾。该参数设置交叉长度保留的比例范围0-1,左开右闭", value=0.75)
enhancer_adaptive_key = gr.Number(label="使增强器适应更高的音域(单位为半音数)|默认为0", value=0)
cr_threshold = gr.Number(label="F0过滤阈值只有启动crepe时有效. 数值范围从0-1. 降低该值可减少跑调概率,但会增加哑音", value=0.05)
loudness_envelope_adjustment = gr.Number(label="输入源响度包络替换输出响度包络融合比例越靠近1越使用输出响度包络", value = 0)
second_encoding = gr.Checkbox(label = "二次编码,浅扩散前会对原始音频进行二次编码,玄学选项,效果时好时差,默认关闭", value=False)
use_spk_mix = gr.Checkbox(label = "动态声线融合", value = False, interactive = False)
with gr.Tabs():
with gr.TabItem("音频转音频"):
vc_input3 = gr.Audio(label="选择音频")
@ -278,7 +335,7 @@ with gr.Blocks(
</font>
""")
mix_model_path = gr.Files(label="选择需要混合模型文件")
mix_model_upload_button = gr.UploadButton("选择/追加需要混合模型文件", file_count="multiple", variant="primary")
mix_model_upload_button = gr.UploadButton("选择/追加需要混合模型文件", file_count="multiple")
mix_model_output1 = gr.Textbox(
label="混合比例调整,单位/%",
interactive = True
@ -291,6 +348,17 @@ with gr.Blocks(
mix_model_path.change(updata_mix_info,[mix_model_path],[mix_model_output1])
mix_model_upload_button.upload(upload_mix_append_file, [mix_model_upload_button,mix_model_path], [mix_model_path,mix_model_output1])
mix_submit.click(mix_submit_click, [mix_model_output1,mix_mode], [mix_model_output2])
with gr.TabItem("模型压缩工具"):
gr.Markdown(value="""
该工具可以实现对模型的体积压缩**不影响模型推理功能**的情况下将原本约600M的So-VITS模型压缩至约200M, 大大减少了硬盘的压力
**注意压缩后的模型将无法继续训练请在确认封炉后再压缩**
""")
model_to_compress = gr.File(label="模型上传")
compress_model_btn = gr.Button("压缩模型", variant="primary")
compress_model_output = gr.Textbox(label="输出信息", value="")
compress_model_btn.click(model_compression, [model_to_compress], [compress_model_output])
with gr.Tabs():
@ -300,12 +368,12 @@ with gr.Blocks(
<font size=2> WebUI设置</font>
""")
debug_button = gr.Checkbox(label="Debug模式如果向社区反馈BUG需要打开打开后控制台可以显示具体错误提示", value=debug)
vc_submit.click(vc_fn, [sid, vc_input3, vc_transform,auto_f0,cluster_ratio, slice_db, noise_scale,pad_seconds,cl_num,lg_num,lgr_num,f0_predictor,enhancer_adaptive_key,cr_threshold], [vc_output1, vc_output2])
vc_submit.click(vc_fn, [sid, vc_input3, vc_transform,auto_f0,cluster_ratio, slice_db, noise_scale,pad_seconds,cl_num,lg_num,lgr_num,f0_predictor,enhancer_adaptive_key,cr_threshold,k_step,use_spk_mix,second_encoding,loudness_envelope_adjustment], [vc_output1, vc_output2])
vc_submit2.click(vc_fn2, [sid, vc_input3, vc_transform,auto_f0,cluster_ratio, slice_db, noise_scale,pad_seconds,cl_num,lg_num,lgr_num,text2tts,tts_rate,tts_voice,f0_predictor,enhancer_adaptive_key,cr_threshold], [vc_output1, vc_output2])
debug_button.change(debug_change,[],[])
model_load_button.click(modelAnalysis,[model_path,config_path,cluster_model_path,device,enhance],[sid,sid_output])
model_load_button.click(modelAnalysis,[model_path,config_path,cluster_model_path,device,enhance,diff_model_path,diff_config_path,only_diffusion,use_spk_mix],[sid,sid_output])
model_unload_button.click(modelUnload,[],[sid,sid_output])
app.launch()