In [None]:
#export
from fastai2.test import *
from fastai2.data.all import *
from fastai2.text.core import *
from fastai2.text.models.awdlstm import *

In [None]:
from nbdev.showdoc import *

In [None]:
#default_exp text.models.core
#default_cls_lvl 3

# Core text modules

> Contain the modules common between different architectures and the generic functions to get models

In [None]:
#export 
_model_meta = {AWD_LSTM: {'hid_name':'emb_sz', 'url':URLs.WT103_FWD, 'url_bwd':URLs.WT103_BWD,
 'config_lm':awd_lstm_lm_config, 'split_lm': awd_lstm_lm_split,
 'config_clas':awd_lstm_clas_config, 'split_clas': awd_lstm_clas_split},
 AWD_QRNN: {'hid_name':'emb_sz',
 'config_lm':awd_qrnn_lm_config, 'split_lm': awd_lstm_lm_split,
 'config_clas':awd_qrnn_clas_config, 'split_clas': awd_lstm_clas_split},}
 # Transformer: {'hid_name':'d_model', 'url':URLs.OPENAI_TRANSFORMER,
 # 'config_lm':tfmer_lm_config, 'split_lm': tfmer_lm_split,
 # 'config_clas':tfmer_clas_config, 'split_clas': tfmer_clas_split},
 # TransformerXL: {'hid_name':'d_model',
 # 'config_lm':tfmerXL_lm_config, 'split_lm': tfmerXL_lm_split,
 # 'config_clas':tfmerXL_clas_config, 'split_clas': tfmerXL_clas_split}}

## Language models

In [None]:
#export
class LinearDecoder(Module):
 "To go on top of a RNNCore module and create a Language Model."
 initrange=0.1

 def __init__(self, n_out, n_hid, output_p=0.1, tie_encoder=None, bias=True):
 self.decoder = nn.Linear(n_hid, n_out, bias=bias)
 self.decoder.weight.data.uniform_(-self.initrange, self.initrange)
 self.output_dp = RNNDropout(output_p)
 if bias: self.decoder.bias.data.zero_()
 if tie_encoder: self.decoder.weight = tie_encoder.weight

 def forward(self, input):
 raw_outputs, outputs = input
 decoded = self.decoder(self.output_dp(outputs[-1]))
 return decoded, raw_outputs, outputs

In [None]:
from fastai2.text.models.awdlstm import *
enc = AWD_LSTM(100, 20, 10, 2)
x = torch.randint(0, 100, (10,5))
r = enc(x)

tst = LinearDecoder(100, 20, 0.1)
y = tst(r)
test_eq(y[1], r[0])
test_eq(y[2], r[1])
test_eq(y[0].shape, [10, 5, 100])

tst = LinearDecoder(100, 20, 0.1, tie_encoder=enc.encoder)
test_eq(tst.decoder.weight, enc.encoder.weight)

In [None]:
#export
class SequentialRNN(nn.Sequential):
 "A sequential module that passes the reset call to its children."
 def reset(self):
 for c in self.children(): getattr(c, 'reset', noop)()

In [None]:
class _TstMod(Module):
 def reset(self): print('reset')

tst = SequentialRNN(_TstMod(), _TstMod())
test_stdout(tst.reset, 'reset\nreset')

In [None]:
#export
def get_language_model(arch, vocab_sz, config=None, drop_mult=1.):
 "Create a language model from `arch` and its `config`."
 meta = _model_meta[arch]
 config = ifnone(config, meta['config_lm']).copy()
 for k in config.keys():
 if k.endswith('_p'): config[k] *= drop_mult
 tie_weights,output_p,out_bias = map(config.pop, ['tie_weights', 'output_p', 'out_bias'])
 init = config.pop('init') if 'init' in config else None
 encoder = arch(vocab_sz, **config)
 enc = encoder.encoder if tie_weights else None
 decoder = LinearDecoder(vocab_sz, config[meta['hid_name']], output_p, tie_encoder=enc, bias=out_bias)
 model = SequentialRNN(encoder, decoder)
 return model if init is None else model.apply(init)

The default `config` used can be found in `_model_meta[arch]['config_lm']`. `drop_mult` is applied to all the probabilities of dropout in that config.

In [None]:
config = awd_lstm_lm_config.copy()
config.update({'n_hid':10, 'emb_sz':20})

tst = get_language_model(AWD_LSTM, 100, config=config)
x = torch.randint(0, 100, (10,5))
y = tst(x)
test_eq(y[0].shape, [10, 5, 100])
test_eq(tst[1].decoder.weight, tst[0].encoder.weight)
for i in range(1,3): test_eq([h_.shape for h_ in y[1]], [[10, 5, 10], [10, 5, 10], [10, 5, 20]])

In [None]:
#test drop_mult
tst = get_language_model(AWD_LSTM, 100, config=config, drop_mult=0.5)
test_eq(tst[1].output_dp.p, config['output_p']*0.5)
for rnn in tst[0].rnns: test_eq(rnn.weight_p, config['weight_p']*0.5)
for dp in tst[0].hidden_dps: test_eq(dp.p, config['hidden_p']*0.5)
test_eq(tst[0].encoder_dp.embed_p, config['embed_p']*0.5)
test_eq(tst[0].input_dp.p, config['input_p']*0.5)

## Classification models

In [None]:
#export
def _pad_tensor(t, bs, val=0.):
 if t.size(0) < bs: return torch.cat([t, val + t.new_zeros(bs-t.size(0), *t.shape[1:])])
 return t

In [None]:
#export
class SentenceEncoder(Module):
 "Create an encoder over `module` that can process a full sentence."
 def __init__(self, bptt, module, pad_idx=1): store_attr(self, 'bptt,module,pad_idx')

 def _concat(self, arrs, bs):
 return [torch.cat([_pad_tensor(l[si],bs) for l in arrs], dim=1) for si in range(len(arrs[0]))]

 def reset(self): getattr(self.module, 'reset', noop)()

 def forward(self, input):
 bs,sl = input.size()
 self.reset()
 raw_outputs,outputs,masks = [],[],[]
 for i in range(0, sl, self.bptt):
 r,o = self.module(input[:,i: min(i+self.bptt, sl)])
 masks.append(input[:,i: min(i+self.bptt, sl)] == self.pad_idx)
 raw_outputs.append(r)
 outputs.append(o)
 return self._concat(raw_outputs, bs),self._concat(outputs, bs),torch.cat(masks,dim=1)

In [None]:
class DoubleEmbedding(nn.Embedding):
 def forward(self, x): 
 y = super().forward(x)
 return ([y],[y+1])
 
mod = DoubleEmbedding(5, 10,)
tst = SentenceEncoder(5, mod, pad_idx=0)
x = torch.randint(1, 5, (3, 15))
x[2,10:]=0
raw,out,mask = tst(x) 
test_eq(raw[0], mod(x)[0][0])
test_eq(out[0], mod(x)[0][0]+1)
test_eq(mask, x==0)

In [None]:
class PoolingLinearClassifier(nn.Module):
 "Create a linear classifier with pooling."

 def __init__(self, layers, drops):
 super().__init__()
 mod_layers = []
 activs = [nn.ReLU(inplace=True)] * (len(layers) - 2) + [None]
 for n_in, n_out, p, actn in zip(layers[:-1], layers[1:], drops, activs):
 mod_layers += bn_drop_lin(n_in, n_out, p=p, actn=actn)
 self.layers = nn.Sequential(*mod_layers)

 def forward(self, input):
 raw_outputs,outputs,mask = input
 output = outputs[-1]
 lengths = output.size(1) - mask.long().sum(dim=1)
 avg_pool = output.masked_fill(mask[:,:,None], 0).sum(dim=1)
 avg_pool.div_(lengths.type(avg_pool.dtype)[:,None])
 max_pool = output.masked_fill(mask[:,:,None], -float('inf')).max(dim=1)[0]
 x = torch.cat([output[torch.arange(0, output.size(0)),lengths-1], max_pool, avg_pool], 1) #Concat pooling.
 x = self.layers(x)
 return x

In [None]:
#export
def masked_concat_pool(outputs, mask):
 "Pool `MultiBatchEncoder` outputs into one vector [last_hidden, max_pool, avg_pool]"
 output = outputs[-1]
 lens = output.size(1) - mask.long().sum(dim=1)
 avg_pool = output.masked_fill(mask[:, :, None], 0).sum(dim=1)
 avg_pool.div_(lens.type(avg_pool.dtype)[:,None])
 max_pool = output.masked_fill(mask[:,:,None], -float('inf')).max(dim=1)[0]
 x = torch.cat([output[torch.arange(0, output.size(0)),lens-1], max_pool, avg_pool], 1) #Concat pooling.
 return x

In [None]:
out = torch.randn(2,3,5)
mask = tensor([[False,False,True], [False,False,False]])
x = masked_concat_pool([out], mask)
test_close(x[0,:5], out[0,-2])
test_close(x[1,:5], out[1,-1])
test_close(x[0,5:10], out[0,:2].max(dim=0)[0])
test_close(x[1,5:10], out[1].max(dim=0)[0])
test_close(x[0,10:], out[0,:2].mean(dim=0))
test_close(x[1,10:], out[1].mean(dim=0))

In [None]:
#Test the result is independent of padding
out1 = torch.randn(2,4,5)
out1[:,:-1] = out.clone()
mask1 = tensor([[False,False,True,True], [False,False,False,True]])
x1 = masked_concat_pool([out1], mask1)
test_eq(x, x1)

In [None]:
#export
class PoolingLinearClassifier(Module):
 "Create a linear classifier with pooling"
 def __init__(self, dims, ps):
 mod_layers = []
 if len(ps) != len(dims)-1: raise ValueError("Number of layers and dropout values do not match.")
 acts = [nn.ReLU(inplace=True)] * (len(dims) - 2) + [None]
 layers = [LinBnDrop(i, o, p=p, act=a) for i,o,p,a in zip(dims[:-1], dims[1:], ps, acts)]
 self.layers = nn.Sequential(*layers)

 def forward(self, input):
 raw,out,mask = input
 x = masked_concat_pool(out, mask)
 x = self.layers(x)
 return x, raw, out

In [None]:
mod = DoubleEmbedding(5, 10)
tst = nn.Sequential(SentenceEncoder(5, mod, pad_idx=0), PoolingLinearClassifier([10*3,4], [0.]))

x = torch.randint(1, 5, (3, 14))
x[2,10:] = 0
res,raw,out = tst(x) 
test_eq(raw[0], mod(x)[0][0])
test_eq(out[0], mod(x)[0][0]+1)
test_eq(res.shape, [3,4])

x1 = torch.cat([x, tensor([0,0,0])[:,None]], dim=1)
res1,raw1,out1 = tst(x1) 
test_eq(res, res1)

In [None]:
#export
def get_text_classifier(arch, vocab_sz, n_class, bptt=72, config=None, drop_mult=1., lin_ftrs=None,
 ps=None, pad_idx=1):
 "Create a text classifier from `arch` and its `config`, maybe `pretrained`"
 meta = _model_meta[arch]
 config = ifnone(config, meta['config_clas']).copy()
 for k in config.keys():
 if k.endswith('_p'): config[k] *= drop_mult
 if lin_ftrs is None: lin_ftrs = [50]
 if ps is None: ps = [0.1]*len(lin_ftrs)
 layers = [config[meta['hid_name']] * 3] + lin_ftrs + [n_class]
 ps = [config.pop('output_p')] + ps
 init = config.pop('init') if 'init' in config else None
 encoder = SentenceEncoder(bptt, arch(vocab_sz, **config), pad_idx=pad_idx)
 model = SequentialRNN(encoder, PoolingLinearClassifier(layers, ps))
 return model if init is None else model.apply(init)

In [None]:
config = awd_lstm_clas_config.copy()
config.update({'n_hid':10, 'emb_sz':20})

tst = get_text_classifier(AWD_LSTM, 100, 3, config=config)
x = torch.randint(2, 100, (10,5))
y = tst(x)
test_eq(y[0].shape, [10, 3])
for i in range(1,3): test_eq([h_.shape for h_ in y[1]], [[10, 5, 10], [10, 5, 10], [10, 5, 20]])

In [None]:
#test padding gives same results
tst.eval()
y = tst(x)
x1 = torch.cat([x, tensor([2,1,1,1,1,1,1,1,1,1])[:,None]], dim=1)
y1 = tst(x1)
test_close(y[0][1:],y1[0][1:])

In [None]:
#test drop_mult
tst = get_text_classifier(AWD_LSTM, 100, 3, config=config, drop_mult=0.5)
test_eq(tst[1].layers[1][2].p, 0.1)
test_eq(tst[1].layers[0][3].p, config['output_p']*0.5)
for rnn in tst[0].module.rnns: test_eq(rnn.weight_p, config['weight_p']*0.5)
for dp in tst[0].module.hidden_dps: test_eq(dp.p, config['hidden_p']*0.5)
test_eq(tst[0].module.encoder_dp.embed_p, config['embed_p']*0.5)
test_eq(tst[0].module.input_dp.p, config['input_p']*0.5)

## Export -

In [None]:
#hide
from nbdev.export import notebook2script
notebook2script()

Converted 00_test.ipynb.
Converted 01_core_foundation.ipynb.
Converted 01a_core_utils.ipynb.
Converted 01b_core_dispatch.ipynb.
Converted 01c_core_transform.ipynb.
Converted 02_core_script.ipynb.
Converted 03_torchcore.ipynb.
Converted 03a_layers.ipynb.
Converted 04_data_load.ipynb.
Converted 05_data_core.ipynb.
Converted 06_data_transforms.ipynb.
Converted 07_data_block.ipynb.
Converted 08_vision_core.ipynb.
Converted 09_vision_augment.ipynb.
Converted 09a_vision_data.ipynb.
Converted 10_pets_tutorial.ipynb.
Converted 11_vision_models_xresnet.ipynb.
Converted 12_optimizer.ipynb.
Converted 13_learner.ipynb.
Converted 13a_metrics.ipynb.
Converted 14_callback_schedule.ipynb.
Converted 14a_callback_data.ipynb.
Converted 15_callback_hook.ipynb.
Converted 15a_vision_models_unet.ipynb.
Converted 16_callback_progress.ipynb.
Converted 17_callback_tracker.ipynb.
Converted 18_callback_fp16.ipynb.
Converted 19_callback_mixup.ipynb.
Converted 20_interpret.ipynb.
Converted 20a_distributed.ipynb.
Co