Finetuning a pretrained QuartzNet on TIMIT
I talk more about this project here on Twitter
Cloning the repository that contains .csv
's with processed labels and filepaths, courtesy of my coursemates.
!git clone https://github.com/mizoru/pronunciation-trainer.git
from kaggle import api
api.dataset_download_files('mfekadu/darpa-timit-acousticphonetic-continuous-speech')
import zipfile
archive = zipfile.ZipFile('darpa-timit-acousticphonetic-continuous-speech.zip')
archive.extractall()
This dataset is going to be used as noise
api.dataset_download_files('chrisfilo/urbansound8k')
import zipfile
archive = zipfile.ZipFile('urbansound8k.zip')
archive.extractall('data')
import thunder
from thunder.callbacks import FinetuneEncoderDecoder
from thunder.finetune import FinetuneCTCModule
from thunder.data.dataset import BaseSpeechDataset
from thunder.data.datamodule import BaseDataModule
from thunder.blocks import conv1d_decoder
from thunder.quartznet.compatibility import load_quartznet_checkpoint
from typing import Any, List, Sequence, Tuple, Union
import torch
from torch import Tensor, nn
from thunder.registry import load_pretrained
from thunder.quartznet.compatibility import QuartznetCheckpoint
from pathlib import Path
import pandas as pd
import librosa
import numpy as np
import torchaudio
import pytorch_lightning as pl
from math import ceil
from IPython.display import Audio
labels = pd.read_csv('pronunciation-trainer/dataDS.csv')
noise_files = pd.read_csv('data/UrbanSound8K.csv')
noise_files = list('data/fold1/' + noise_files[noise_files.fold==1].slice_file_name)
class TimitDataset(BaseSpeechDataset):
def __init__(
self, items: Sequence, force_mono: bool = True, sample_rate: int = 16000,
time_stretch = None, volume = None, pitch = None, noise_files = None
# 0.2, 0.2, 2
):
super().__init__(items, force_mono, sample_rate)
self.librosa_transforms = bool(time_stretch)
self.time_stretch = time_stretch
self.volume = volume
self.pitch = pitch
self.noise_files = noise_files
def open_audio(self, item) -> Tuple[Tensor, int]:
audio,sr = self.loader.open_audio(item.Path)
# adding noise
if self.noise_files:
idx = int(torch.randint(0, len(self.noise_files), (1,)))
noise = self.loader(self.noise_files[idx])
# this bit of code I got from a course, it gets the loudness ratio right
noize_level = torch.rand(1) * 40 # from 0 to 40
noize_energy = torch.norm(noise)
audio_energy = torch.norm(audio)
alpha = (audio_energy / noize_energy) * torch.pow(10, -noize_level / 20)
# repeat the noise as many times as wee need
if noise.shape[1] < audio.shape[1]:
noise = torch.cat([noise] * ceil(audio.shape[1] / noise.shape[1]), 1)
noise = noise[:,:audio.shape[1]]
audio = audio + alpha * noise
audio.clamp_(-1, 1)
# THIS TRANSFORM TAKES FOREVER
if self.pitch: # AND PROBABLY DOESN'T WORK
audio = torchaudio.functional.pitch_shift(audio, sr, self.pitch * torch.randn(1))
if self.volume: # this transform led to CUDA out of memory
audio = torchaudio.transforms.Vol(torch.abs(1+self.volume*torch.randn(1)))(audio)
# this works, but I didn't get better results with it, might need tuning
if self.librosa_transforms: audio = audio.numpy().squeeze()
if self.time_stretch:
audio = librosa.effects.time_stretch(audio, np.abs(1 + self.time_stretch * np.random.randn()))
if self.librosa_transforms: audio = torch.Tensor(audio).unsqueeze(0)
return audio, sr
def open_text(self, item) -> str:
return item.Transcription
def get_item(self, index: int) -> Any:
return self.items.iloc[index]
Audio(TimitDataset(labels, noise_files=noise_files)[159][0], rate=16000)
class TimtiDataModule(BaseDataModule):
def __init__(
self,
batch_size: int = 32,
num_workers: int = 2,
time_stretch = 0.2, volume = 0.2, pitch = 2, noise_files=None
):
super().__init__(batch_size, num_workers)
self.time_stretch = time_stretch
self.volume = volume
self.pitch = pitch
self.noise_files = noise_files
def get_dataset(self, split):
if split != "train":
return TimitDataset(labels[labels["is_valid"]], time_stretch = False, volume = False, pitch = False)
else:
return TimitDataset(labels[labels["is_valid"] == False],
time_stretch = self.time_stretch, volume = self.volume, pitch = self.pitch,
noise_files = self.noise_files)
dm = TimtiDataModule(batch_size=32, noise_files=noise_files)
Getting the tokens from the data
whole = '.'.join([t for t in labels.Transcription])
tokens = list(set(whole.split('.')))
len(tokens)
def dot_tokenizer(s:str):
return s.split('.')
model = FinetuneCTCModule(QuartznetCheckpoint.QuartzNet15x5Base_En,
decoder_class = conv1d_decoder, tokens = tokens,
text_kwargs={'custom_tokenizer_function':dot_tokenizer})
These next five cells import the weights of the decoder from a trained models and adapt them into the new decoder.
correspondences
is a dictionary that assigns every token in the new decoder the corresponding token in the trained decoder to take the model parameters from.
correspnodences = {'s': 's', 'n': 'n', 'dʒ': 'j', 'd̚': 'd', 'w': 'w', 'b': 'b', 'g': 'g', 'm': 'm',
'l̩': 'l', 'f': 'f', 'l': 'l', 'j': 'y', 'k': 'k', 'eɪ': 'a', 'p̚': 'p', 'm̩': 'm',
'r': 'r', 't': 't', 'h': 'h', 'aʊ': 'o', 'n̩': 'n', 'i': 'e', 'b̚': 'b', 'p': 'p',
'k̚': 'k', 'd': 'd', 'u': 'o', 't̚': 't', 'z': 'z', 'aɪ': 'i', 'v': 'v', 'tʃ': 'c',
'oʊ': 'o', '<blank>' : '<blank>', 'ɝ' : 'e', 'ʉ' : 'o', 'ð' : 't', 'θ' : 't', 'ɚ' : 'e',
'ɦ' : 'h', 'ŋ' : 'n', 'ʔ' : 't', 'ʒ' : 's', 'ʊ' : 'o', 'ɾ' : 't', 'ɪ' : 'i', 'ə̥' : 'u',
'ɑ' : 'a', 'ə' : 'e', 'ɛ' : 'e', 'ɔɪ' : 'o', 'ɡ̚' : 'g', 'ɔ' : 'o', 'ɨ̞' : 'i', 'ŋ̩' : 'n',
'ʌ' : 'u', 'ɾ̃' : 'n', 'ʃ' : 's', 'æ' : 'a'}
def adapt_into_new_decoder(decoder, old_vocab, new_vocab, correspnodences = None):
if correspnodences == None:
correspnodences = {k:k[0] for k in new_vocab.keys() if k and k[0] in old_vocab.keys()}
with torch.no_grad():
new_decoder = conv1d_decoder(1024, len(new_vocab))
weight = decoder.weight
bias = decoder.bias
for new_token,old_token in correspnodences.items():
new_decoder.weight[new_vocab[new_token]] = weight[old_vocab[old_token]]
new_decoder.bias[new_vocab[new_token]] = bias[old_vocab[old_token]]
return new_decoder
checkpoint_model = load_quartznet_checkpoint(QuartznetCheckpoint.QuartzNet15x5Base_En)
These vocab
dictionaries give the function adapt_into_new_decoder
the indices in the weight matrix of the decoder for the corresponding tokens.
old_vocab = checkpoint_model.text_transform.vocab.itos
old_vocab = {k:v for (v, k) in enumerate(old_vocab)}
new_vocab = {k:v for (v, k) in enumerate(model.text_transform.vocab.itos)}
model.decoder = adapt_into_new_decoder(checkpoint_model.decoder, old_vocab, new_vocab, correspnodences)
del checkpoint_model
import wandb
from pytorch_lightning.loggers import WandbLogger
wandb_logger = WandbLogger(project='pronunciation-trainer', name='transform-thunder')
Setting a higher encoder_initial_lr_div
led to less overfitting.
trainer = pl.Trainer(
gpus=-1, # Use all gpus
max_epochs=30,
callbacks=[FinetuneEncoderDecoder(unfreeze_encoder_at_epoch=15, encoder_initial_lr_div=100)],
logger = wandb_logger
)
trainer.fit(model = model, datamodule=dm)
trainer.validate(model = model, datamodule=dm)
let's save our model for inference
model.to_torchscript("QuartzNet_thunderspeech.pt")
wandb.save('QuartzNet_thunderspeech.pt', policy='now')
wandb.finish()
loader = AudioFileLoader(sample_rate=16000)
natives = pd.read_csv('pronunciation-trainer/natives.csv')
I came up with a small list of words that learners might struggle with differentiating.
subset = ["thin", "thing", "think", "fit", "feet", "bald", "bold", "food", "foot",
"death", "deaf", "worm", "warm"]
subset_df = natives[natives.replica.isin(subset)]
This dataset contains audio for single words.
!wget https://lingualibre.org/datasets/Q22-eng-English.zip
import zipfile
archive = zipfile.ZipFile('Q22-eng-English.zip')
archive.extractall()
I get the raw prediction tensors and then convert them into the format I need.
model.eval()
predicts = []
for i in range(len(subset_df)):
path = str(Path('Q22-eng-English') / '/'.join(subset_df.path.iloc[i].split('/')[2:]))
# print(path)
try:
audio = loader(path)
predicts.append(model(audio, torch.tensor(audio.shape[0] * [audio.shape[-1]], device=audio.device)))
except Exception:
predicts.append(None)
# print(predicts[-1])
vocab = model.text_transform.vocab.itos
vocab[-1] = ''
for i in range(len(predicts)):
if predicts[i] != None:
ids = predicts[i][0].argmax(1)[0]
s = []
# print(ids)
if vocab[ids[0]]: s.append(vocab[ids[0]])
for l in range(1,len(ids)):
if ids[l-1] != ids[l]:
new = vocab[ids[l]]
if new: s.append(new)
predicts[i] = '.'.join(s)
predicts
subset_df["transcription"] = predicts
subset_df.to_csv("native_words_subset.csv", index=False)