The VAE implemented here uses the setup found in most VAE papers: a multivariate Normal distribution for the conditional distribution of the latent vectors given and input image ($q_{\phi}(z | x_i)$ in the slides) and a multivariate Bernoulli distribution for the conditional distribution of images given the latent vector ($p_{\theta}(x | z)$ in the slides). Using a Bernoulli distribution, the reconstruction loss (negative log likelihood of a data point in the output distribution) reduces to the pixel-wise binary cross-entropy. See the original VAE paper, Appendix C.1 for details.
%%bash
pip install --upgrade pytorch-lightning
pip install tokenizers
from google.colab import drive
drive.mount('/content/drive')
%cd /content/drive/My\ Drive/6.864-final-project
import os
import sys
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, random_split
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.autograd import Variable
from nltk.tokenize import TweetTokenizer
import pytorch_lightning as pl
from pytorch_lightning import Trainer, seed_everything
from tokenizers import ByteLevelBPETokenizer, normalizers
from tokenizers.normalizers import NFKC, StripAccents, Lowercase, NFKD
import pandas as pd
import re
%matplotlib inline
df.head()
from ast import literal_eval
import re
def remove_emoji(string):
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
return emoji_pattern.sub(r'', string)
df = pd.read_csv("./100ktweets.csv", encoding='utf-8',header=None)
df.rename(columns = {0:'text',},
inplace = True)
df["text"] = df["text"].astype(str)
#df["text"] = df["text"].apply(remove_emoji)
df["text"] = df["text"].apply(lambda x : re.sub(r'http\S+', '', x)) #remove links
df["text"] = df["text"].apply(lambda x : re.sub(r'><', '> <', x)) #separate emojis
df = df.reset_index()
class TweetsDataset(Dataset):
def __init__(self, df):
df["text"] = df["text"].apply(lambda x : re.sub(r'http\S+', '', x))
df["text"] = df["text"].astype(str)
self.frame = df
self.tokenizer = ByteLevelBPETokenizer(end_of_word_suffix="</w>")
self.tokenizer.normalizer = normalizers.Sequence([NFKD(), StripAccents(), Lowercase()])
self.tokenizer.train_from_iterator(iter(self.frame["text"]), min_frequency=2, special_tokens=[
"<s>",
"<pad>",
"</s>",
"<unk>",
"</w>",
])
self.maxlen = 35
for ix, row in df.iterrows():
enc = self.tokenizer.encode(row["text"])
curlen = len(enc.ids)
if curlen > self.maxlen:
self.frame.drop(ix, inplace=True)
self.tokenizer.enable_padding(pad_id=1, pad_token="<pad>", length=self.maxlen+1)
self.tokenizer.save_model(".", "twittertok")
self.frame = self.frame.reset_index()
def __len__(self):
return len(self.frame)
def __getitem__(self, idx):
raw_tweet = self.frame.at[idx, "text"]
encoded_obj = self.tokenizer.encode(raw_tweet, is_pretokenized=False)
ids = encoded_obj.ids
first_pad_ix = ids.index(1)
ids = [0] + ids[:first_pad_ix] + [2] + ids[first_pad_ix:]
unpadded_toks = [x for x in ids if x != 1]
sample = {'ids': torch.tensor(ids).long(),
'len':len(unpadded_toks)}
return sample
td = TweetsDataset(df)
n_train, n_val, n_test = int(round(len(td)*.75)), int(round(len(td)*.15)), int(round(len(td)*.10))
train_dataset, val_dataset, test_dataset = random_split(td, [n_train, n_val, n_test])
class Encoder(nn.Module):
def __init__(self, vocab_size=256,
embed_size=256,
hidden_size=256,
nhead=8,
transformer_layers=8,
rnn_layers = 3,
dropout=0.15,
latent_dims=16):
super(Encoder, self).__init__()
encoder_layer = nn.TransformerEncoderLayer(d_model=embed_size, nhead=nhead, batch_first=True)
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=6)
self.rnn_layers = rnn_layers
self.hidden_size = hidden_size
self.rnn = nn.GRU(input_size=embed_size,
hidden_size=hidden_size,
num_layers=rnn_layers,
dropout=dropout,
bidirectional=True,
batch_first=True)
self.mu = nn.Linear(in_features=2*hidden_size,
out_features=latent_dims)
self.logvar = nn.Linear(in_features=2*hidden_size,
out_features=latent_dims)
def forward(self, x, lengths):
batch_size = x.shape[0]
x = self.transformer_encoder(x) #keeps dimensionality
packed = pack_padded_sequence(x, lengths.cpu(), batch_first=True,
enforce_sorted=False)
outputs, hidden = self.rnn(packed) # output is (batch_size x sent_len x 2*hidden_size), hidden is (2*n_layers x batch_size x hidden_size)
#no need to unpack, only using hiddens
last_hidden = hidden.view(self.rnn_layers, 2, batch_size, self.hidden_size) # 2 for bidirectional... now (n_layers x 2 x batch_size x hidden_size)
last_hidden_f = torch.squeeze(last_hidden[-1, 0, :, :]) # (batch_size x hidden_size)
last_hidden_b = torch.squeeze(last_hidden[-1, 1, :, :]) # (batch_size x hidden_size)
hid = torch.cat([last_hidden_f, last_hidden_b], dim=-1) # (batch_size x 2*hidden_size)
x_mu = self.mu(hid) # (batch_size x latent_dims)
x_logvar = self.logvar(hid) # (batch_size x latent_dims)
return x_mu, x_logvar
class Decoder(nn.Module):
def __init__(self, embedder,
embed_size=128,
hidden_size=256,
rnn_layers=3,
dropout=0.15,
vocab_size=1000
):
super(Decoder, self).__init__()
self.rnn = nn.GRU(input_size=embed_size,
hidden_size=hidden_size,
num_layers=rnn_layers,
batch_first=True,
dropout=dropout,
bidirectional=False)
self.embedding = embedder
#linear going from hidden size to the vocab size
self.hidden_to_vocab = nn.Linear(hidden_size, vocab_size)
def nucleus_sampling(self, outputs, top_p=0.5):
sm = nn.Softmax(dim=-1)
#initially receive (batch_size, 1, vocab_size) probs
logits = torch.squeeze(outputs, dim=1)
sorted_logits, sorted_indices = torch.sort(logits, descending=True, dim=-1) #batch_size, vocab_size
sorted_cumulative_probs = torch.cumsum(sm(sorted_logits), -1) #diff from sorted_probs
nucleus = torch.where(sorted_cumulative_probs < top_p, sorted_logits, torch.tensor(0, dtype=sorted_logits.dtype))
unsorted_nucleus = nucleus.gather(1, sorted_indices.argsort(1))
probs = sm(unsorted_nucleus) #batch_size, vocab_size
selected_tokens = torch.multinomial(probs, 1).view(outputs.shape[0], 1) #(batch_size,1)
return selected_tokens
def forward(self, init_hiddens, ground_truth=None):
"""Unroll the decoder one step at a time.
Inputs:
- `init_hiddens`: a 3d-tensor of shape
(n_layers, batch_size, hidden_size) representing the final
encoder hidden states used to initialize the decoder hidden
states.
- `ground_truth`: a 3d-tensor of shape (batch_size, max_seq_length, embed_size)
representing a batch of padded word vectors of target sentences [ONLY IF TEACHER FORCING]
Returns:
- `pre_output_vectors`: a 3d-tensor of shape
(batch_size, max_len, hidden_size) representing the raw decoder
outputs (before mapping to a `trg_vocab_size`-dim vector).
"""
num_enc_layers, batch_size, hidden_size = init_hiddens.shape
output_vectors = []
sequence = []
hidden = init_hiddens
cur_input = torch.zeros((batch_size,1)).long()
for i in range(td.maxlen+3): #one for the extra pad, one for sos token, one for eos token
if ground_truth is not None: #teacher forcing
cur_input = torch.unsqueeze(ground_truth[:,i,:], 1) #(batch_size, 1, embed_size)
else: #this is at test time... we embed the input we have
cur_input = self.embedding(cur_input) # (batch_size, 1, embed_size)
pre_output, hidden = self.rnn(cur_input, hidden) #pre-output is (batch_size, 1, hidden_size)
output = self.hidden_to_vocab(pre_output) #output is (batch_size, 1, vocab_size)
if ground_truth is None: #if we dont have ground truth (testing/validation), then select word w/ nucleus sampling
cur_input = self.nucleus_sampling(output) # (batch_size,1)
output_vectors.append(output)
sequence.append(cur_input)
output_vectors = torch.cat(output_vectors, dim=1)
token_sequence = torch.cat(sequence, dim=-1)
return output_vectors, token_sequence
def vae_loss(recon_x, x, mu, logvar, variational_beta, vocab_size):
batch_size = x.shape[0]
# recon_x is the probability of a multivariate Bernoulli distribution p.
# Averaging or not averaging the binary cross-entropy over all pixels here
# is a subtle detail with big effect on training, since it changes the weight
# we need to pick for the other loss term by several orders of magnitude.
# Not averaging is the direct implementation of the negative log likelihood,
# but averaging makes the weight of the other loss term independent of the image resolution.
loss = nn.CrossEntropyLoss(reduction='sum', ignore_index=1)
logit = recon_x.view(-1, vocab_size)
view_x = x.contiguous().view(-1)
recon_loss = loss(logit, view_x)
# KL-divergence between the prior distribution over latent vectors
# (the one we are going to sample from when generating new images)
# and the distribution estimated by the generator for the given image.
KLD_element = mu.pow(2).add_(logvar.exp()).mul_(-1).add_(1).add_(logvar)
KLD = torch.sum(KLD_element).mul_(-0.5)
return (recon_loss + variational_beta*KLD)/batch_size
class VariationalAutoEncoder(pl.LightningModule):
def __init__(self, vocab_size=256,
embed_size=256,
hidden_size=256,
nhead=8,
transformer_layers=8,
rnn_layers = 3,
dropout=0.15,
latent_dims=16,
variational_beta=10,
batch_size=16,
lr=2e-4):
super().__init__()
self.save_hyperparameters()
#combination of Cross-entropy loss for reconstruction and KLDivLoss for variational stability
self.loss = vae_loss
self.variational_beta = variational_beta
#some layers to find good embeddings and turn the latent reepresentation into teh right dimensions
self.embedding = nn.Embedding(vocab_size, embed_size, padding_idx=1)
self.latent_to_hidden = nn.Linear(latent_dims, hidden_size)
#encoder: for turning the input into a mu and a sigma to get a latent representation
self.encoder = Encoder(vocab_size=vocab_size,
embed_size=embed_size,
hidden_size=hidden_size,
nhead=nhead,
transformer_layers=transformer_layers,
rnn_layers = rnn_layers,
dropout=dropout,
latent_dims=latent_dims)
#decoder: for decoding of latent rpr back into a sequence
self.decoder = Decoder(embed_size=embed_size,
hidden_size=hidden_size,
rnn_layers=rnn_layers,
dropout=dropout,
embedder=self.embedding,
vocab_size=vocab_size)
def forward(self, n=1, z=None):
if not z:
z = torch.randn(n, self.hparams.latent_dims)
#decoder
z = self.latent_to_hidden(z) #n_batch x hidden_size
z = torch.unsqueeze(z, dim=0).repeat(self.hparams.rnn_layers, 1, 1) #num_layers x n_batch x hidden_size
x_hat, sequence = self.decoder(z) #(batch_size, max_len, hidden_size)
return sequence
def training_step(self, batch, batch_idx):
# training_step defined the train loop. It is independent of forward
x, lens = batch["ids"], batch["len"] # n_batch x sent_len
x_embedded = self.embedding(x) # n_batch x sent_len x embed_dims
#run through encoder
mu, logvar = self.encoder(x_embedded, lens)
#reparametrization trick
z = self.reparametrize(mu, logvar)
#decoder
z = self.latent_to_hidden(z) #n_batch x hidden_size
z = torch.unsqueeze(z, dim=0).repeat(self.hparams.rnn_layers, 1, 1) #num_layers x n_batch x hidden_size
x_hat, _ = self.decoder(z, ground_truth=x_embedded) #(batch_size, max_len, vocab_size)
#eval
loss = self.loss(x_hat, x, mu, logvar, self.variational_beta, self.hparams.vocab_size)
# Logging to TensorBoard by default
self.log("train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True)
return loss
def validation_step(self, batch, batch_idx):
# training_step defined the train loop. It is independent of forward
x, lens = batch["ids"], batch["len"] # n_batch x sent_len
x_embedded = self.embedding(x) # n_batch x sent_len x embed_dims
#run through encoder
mu, logvar = self.encoder(x_embedded, lens)
#reparametrization trick
z = self.reparametrize(mu, logvar)
#decoder
z = self.latent_to_hidden(z) #n_batch x hidden_size
z = torch.unsqueeze(z, dim=0).repeat(self.hparams.rnn_layers, 1, 1) #num_layers x n_batch x hidden_size
x_hat, _ = self.decoder(z, ground_truth=x_embedded) #(batch_size, max_len, vocab_size)
#eval
loss = self.loss(x_hat, x, mu, logvar, self.variational_beta, self.hparams.vocab_size)
# Logging to TensorBoard by default
self.log("val_loss", loss, on_step=True, on_epoch=True, logger=True)
return loss
def reparametrize(self, mu, log_var):
"""you generate a random distribution w.r.t. the mu and log_var from the embedding space.
In order for the back-propagation to work, we need to be able to calculate the gradient.
This reparameterization trick first generates a normal distribution, then shapes the distribution
with the mu and variance from the encoder.
This way, we can can calculate the gradient parameterized by this particular random instance.
"""
eps = Variable(torch.randn(mu.shape[0], self.hparams.latent_dims).type_as(mu))
std = log_var.mul(0.5).exp_()
return eps.mul(std).add_(mu)
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)
return optimizer
seed_everything(42, workers=True)
trainer = Trainer(check_val_every_n_epoch=1,
fast_dev_run=False, #true to "unit test"
max_epochs=50,
precision=16,
profiler="simple",
gpus=1,
log_every_n_steps=20,
limit_train_batches=1.0,
limit_test_batches=1.0,
limit_val_batches=1.0,
max_time="00:02:30:00")
model = VariationalAutoEncoder(vocab_size=td.tokenizer.get_vocab_size(with_added_tokens=True),
embed_size=128,
hidden_size=256,
nhead=8,
transformer_layers=6,
rnn_layers = 3,
dropout=0.2,
latent_dims=16,
variational_beta=10)
train_loader = DataLoader(train_dataset, batch_size=model.hparams.batch_size, num_workers=8, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=model.hparams.batch_size, num_workers=8, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=model.hparams.batch_size, num_workers=8, pin_memory=True)
trainer.tune(model)
trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader)
td.tokenizer.decode(list(model.forward(n=1)[0]))
%load_ext tensorboard
%tensorboard --logdir lightning_logs/