Cloning the repository that contains .csv's with processed labels and filepaths, courtesy of my coursemates.

!git clone

Getting the data and the imports ready

from kaggle import api
import zipfile
archive = zipfile.ZipFile('')

This dataset is going to be used as noise

import zipfile
archive = zipfile.ZipFile('')
import thunder
from thunder.callbacks import FinetuneEncoderDecoder
from thunder.finetune import FinetuneCTCModule
from import BaseSpeechDataset
from import BaseDataModule
from thunder.blocks import conv1d_decoder
from thunder.quartznet.compatibility import load_quartznet_checkpoint
from typing import Any, List, Sequence, Tuple, Union
import torch
from torch import Tensor, nn
from thunder.registry import load_pretrained
from thunder.quartznet.compatibility import QuartznetCheckpoint
from pathlib import Path
import pandas as pd
import librosa
import numpy as np
import torchaudio
import pytorch_lightning as pl
from math import ceil
from IPython.display import Audio
labels = pd.read_csv('pronunciation-trainer/dataDS.csv')
noise_files = pd.read_csv('data/UrbanSound8K.csv')
noise_files = list('data/fold1/' + noise_files[noise_files.fold==1].slice_file_name)

Setting up Dataset and DataModule for training

The commented out code is the transforms I tried.

class TimitDataset(BaseSpeechDataset):
  def __init__(
        self, items: Sequence, force_mono: bool = True, sample_rate: int = 16000,
        time_stretch = None, volume = None, pitch = None, noise_files = None
                #       0.2,          0.2,            2
    super().__init__(items, force_mono, sample_rate)
    self.librosa_transforms = bool(time_stretch)
    self.time_stretch = time_stretch
    self.volume = volume
    self.pitch = pitch
    self.noise_files = noise_files

  def open_audio(self, item) -> Tuple[Tensor, int]:
    audio,sr = self.loader.open_audio(item.Path)

# adding noise
    if self.noise_files:
      idx = int(torch.randint(0, len(self.noise_files), (1,)))
      noise = self.loader(self.noise_files[idx]) 
# this bit of code I got from a course, it gets the loudness ratio right
      noize_level = torch.rand(1) * 40    # from 0 to 40
      noize_energy = torch.norm(noise)
      audio_energy = torch.norm(audio)
      alpha = (audio_energy / noize_energy) * torch.pow(10, -noize_level / 20)
# repeat the noise as many times as wee need
      if noise.shape[1] < audio.shape[1]:
        noise =[noise] * ceil(audio.shape[1] / noise.shape[1]), 1)
      noise = noise[:,:audio.shape[1]]

      audio = audio + alpha * noise
      audio.clamp_(-1, 1)

    if self.pitch:             # AND PROBABLY DOESN'T WORK  
      audio = torchaudio.functional.pitch_shift(audio, sr, self.pitch * torch.randn(1))  
    if self.volume:    # this transform led to CUDA out of memory
      audio = torchaudio.transforms.Vol(torch.abs(1+self.volume*torch.randn(1)))(audio) 
    # this works, but I didn't get better results with it, might need tuning
    if self.librosa_transforms: audio = audio.numpy().squeeze()
    if self.time_stretch:
      audio = librosa.effects.time_stretch(audio, np.abs(1 + self.time_stretch * np.random.randn()))
    if self.librosa_transforms: audio = torch.Tensor(audio).unsqueeze(0)

    return audio, sr 

  def open_text(self, item) -> str:
    return item.Transcription
  def get_item(self, index: int) -> Any:
    return self.items.iloc[index]
Audio(TimitDataset(labels, noise_files=noise_files)[159][0], rate=16000)
class TimtiDataModule(BaseDataModule):
  def __init__(
        batch_size: int = 32,
        num_workers: int = 2,
        time_stretch = 0.2, volume = 0.2, pitch = 2, noise_files=None
    super().__init__(batch_size, num_workers)
    self.time_stretch = time_stretch
    self.volume = volume
    self.pitch = pitch
    self.noise_files = noise_files

  def get_dataset(self, split):
    if split != "train":
      return TimitDataset(labels[labels["is_valid"]], time_stretch = False, volume = False, pitch = False)
      return TimitDataset(labels[labels["is_valid"] == False],
                          time_stretch = self.time_stretch, volume = self.volume, pitch = self.pitch,
                          noise_files = self.noise_files)
dm = TimtiDataModule(batch_size=32, noise_files=noise_files)

Getting the tokens from the data

whole = '.'.join([t for t in labels.Transcription])
tokens = list(set(whole.split('.')))
def dot_tokenizer(s:str):
  return s.split('.')

Adapting pretrained weights

model = FinetuneCTCModule(QuartznetCheckpoint.QuartzNet15x5Base_En,
                          decoder_class = conv1d_decoder, tokens = tokens,

These next five cells import the weights of the decoder from a trained models and adapt them into the new decoder.

correspondences is a dictionary that assigns every token in the new decoder the corresponding token in the trained decoder to take the model parameters from.

correspnodences  = {'s': 's', 'n': 'n', 'dʒ': 'j', 'd̚': 'd', 'w': 'w', 'b': 'b', 'g': 'g', 'm': 'm',
                    'l̩': 'l', 'f': 'f', 'l': 'l', 'j': 'y', 'k': 'k', 'eɪ': 'a', 'p̚': 'p', 'm̩': 'm',
                    'r': 'r', 't': 't', 'h': 'h', 'aʊ': 'o', 'n̩': 'n', 'i': 'e', 'b̚': 'b', 'p': 'p',
                    'k̚': 'k', 'd': 'd', 'u': 'o', 't̚': 't', 'z': 'z', 'aɪ': 'i', 'v': 'v', 'tʃ': 'c',
                    'oʊ': 'o', '<blank>' : '<blank>', 'ɝ' : 'e', 'ʉ' : 'o', 'ð' : 't', 'θ' : 't', 'ɚ' : 'e',
                    'ɦ' : 'h', 'ŋ' : 'n', 'ʔ' : 't', 'ʒ' : 's', 'ʊ' : 'o', 'ɾ' : 't', 'ɪ' : 'i', 'ə̥' : 'u',
                    'ɑ' : 'a', 'ə' : 'e', 'ɛ' : 'e', 'ɔɪ' : 'o', 'ɡ̚' : 'g', 'ɔ' : 'o', 'ɨ̞' : 'i', 'ŋ̩' : 'n',
                    'ʌ' : 'u', 'ɾ̃' : 'n', 'ʃ' : 's', 'æ' : 'a'}
def adapt_into_new_decoder(decoder, old_vocab, new_vocab, correspnodences = None):
  if correspnodences == None:
    correspnodences = {k:k[0] for k in new_vocab.keys() if k and k[0] in old_vocab.keys()}
  with torch.no_grad():
    new_decoder = conv1d_decoder(1024, len(new_vocab))
    weight = decoder.weight
    bias = decoder.bias
    for new_token,old_token in correspnodences.items():
        new_decoder.weight[new_vocab[new_token]] = weight[old_vocab[old_token]]
        new_decoder.bias[new_vocab[new_token]] = bias[old_vocab[old_token]]
  return new_decoder
checkpoint_model = load_quartznet_checkpoint(QuartznetCheckpoint.QuartzNet15x5Base_En)

These vocab dictionaries give the function adapt_into_new_decoder the indices in the weight matrix of the decoder for the corresponding tokens.

old_vocab = checkpoint_model.text_transform.vocab.itos
old_vocab = {k:v for (v, k) in enumerate(old_vocab)}
new_vocab = {k:v for (v, k) in enumerate(model.text_transform.vocab.itos)}
model.decoder = adapt_into_new_decoder(checkpoint_model.decoder, old_vocab, new_vocab, correspnodences)
del checkpoint_model


import wandb
from pytorch_lightning.loggers import WandbLogger
wandb_logger = WandbLogger(project='pronunciation-trainer', name='transform-thunder')
Setting a higher encoder_initial_lr_div led to less overfitting.

trainer = pl.Trainer(
    gpus=-1, # Use all gpus
    callbacks=[FinetuneEncoderDecoder(unfreeze_encoder_at_epoch=15, encoder_initial_lr_div=100)],
   logger = wandb_logger
trainer.validate(model = model, datamodule=dm)

let's save our model for inference

model.to_torchscript("")'', policy='now')
Getting predictions for the app

loader = AudioFileLoader(sample_rate=16000)
natives = pd.read_csv('pronunciation-trainer/natives.csv')

I came up with a small list of words that learners might struggle with differentiating.

subset = ["thin", "thing", "think", "fit", "feet", "bald", "bold", "food", "foot",
          "death", "deaf", "worm", "warm"]
subset_df = natives[natives.replica.isin(subset)]

This dataset contains audio for single words.

I get the raw prediction tensors and then convert them into the format I need.

predicts = []
for i in range(len(subset_df)):
  path = str(Path('Q22-eng-English') / '/'.join(subset_df.path.iloc[i].split('/')[2:]))
  # print(path)
    audio = loader(path)
    predicts.append(model(audio, torch.tensor(audio.shape[0] * [audio.shape[-1]], device=audio.device)))
  except Exception:
  # print(predicts[-1])
vocab = model.text_transform.vocab.itos
vocab[-1] = ''
for i in range(len(predicts)):
  if predicts[i] != None:
    ids = predicts[i][0].argmax(1)[0]
    s = []
    # print(ids)
    if vocab[ids[0]]: s.append(vocab[ids[0]])
    for l in range(1,len(ids)):
      if ids[l-1] != ids[l]:
        new = vocab[ids[l]]
        if new: s.append(new)
    predicts[i] = '.'.join(s)
subset_df["transcription"] = predicts
subset_df.to_csv("native_words_subset.csv", index=False)