参考自Tensorflow的word2vec教程,本文主要包含了自己的实现理解。非常青涩时期写的

相关知识

传统方法

One-Hot Encoder 是一个词对应一个向量,向量中只有一个是1,其余是0,离散表达。$ $

Bag of Words 标识当前单词那一位不是1,而是变成了当前单词的出现次数。

存在的问题 需要大量的维数去表示,编码随机的,没有任何关联的信息。

向量空间模型

Vector Space Models可以把字词转化为连续值,并将意思相近的词被映射到向量空间相近的位置。

VSM在NLP中的一个重要假设是:在相同语境中出现的词,语义也相近。

有如下两种模型

  • 计数模型 统计语料库中相邻出现的词的频率,再把这些计数结果转为小而稠密的矩阵。
  • 预测模型 根据一个词周围相邻的词汇推测出这个词。

Word2Vec

Word2Vec是一种计算非常高效的、可以从原始语料中学习字词空间向量的预测模型。

有如下两种模型

  • CBOW Continuous Bag of Words 从语境推测目标词汇,适合小型数据。如“中国的首都是__”推测出“北京”。把一整段上下文信息当做一个观察对象
  • Skip-Gram 从目标词汇推测语境,适合大型语料。 把每一对上下文-目标词汇当做一个观察对象

Word2Vec的一些优点

  • 连续的词向量能够捕捉到更多的语义和关联信息

  • 意思相近的词语在向量空间中的位置也会比较近。如北京-成都、狗-猫等词汇会分别聚集在一起。
  • 能学会一些高阶语言的抽象概念。如"man-woman"和"king-queen"的向量很相似。

Word2Vec学习的抽象概念
Word2Vec学习的抽象概念

噪声对比训练

神经概率化语言模型通常使用极大似然法进行训练,再使用Softmax函数得到在给出上下文单词\(h\)的情况下,目标词\(w_t\)出现的最大概率,设为\(P(w_t|h)\)

\(score(w_t, h)\)为当前词\(w_t\)和上下文单词\(h\)的相容性,通常使用向量积获得。

\[ P(w_t|h) = Softmax(score(w_i, h))=\frac{e^{score(w_i, h)}} {\sum_{i=1}^v {e^{score(w_i, h)}}} \]

通过对数似然函数max likelihood来进行训练 \[ J_{ml}=\ln{P(w_t|h)}=score(w_t,h)-\ln{\sum_{i=1}^v e^{score(w_i, h)}} \] 这个方法看起来可行,但是消耗太大了,因为要对当前\(h\)与所有单词\(w\)的相容性\(score(w, h)\)

在使用word2vec模型中,我们并不需要对所有的特征进行学习。所以在CBOW模型和Skip-Gram模型中,会构造\(k\)个噪声单词,而我们只需要从这k个中找出真正目标单词\(w_t\)即可,使用了一个二分类器(lr)。下面是CBOW模型,对于Skip-Gram模型只需要相反就行了。

\(Q_\theta(D=1|w, h)\)是二元逻辑回归的概率,即在当前条件下出现词语\(w\)的概率。

  • \(\theta\) 输入的embedding vector
  • \(h\) 当前上下文
  • \(d\) 输入数据集
  • \(w\) 目标词汇(就是他出现的概率)

此时,最大化目标函数如下: \[ J_{NEG}=\ln{Q_\theta(D=1|w_t, h)} + \frac {\sum_{i=1}^{k}{\ln Q_\theta(D=0|w_I, h)}} {k} \] 前半部分为词\(w\)出现的概率,后面为\(k\)个噪声概率的期望值(如果写法有错误,希望提出,再改啦),有点像蒙特卡洛

负采样Negative Sampling

  • 当模型预测的真实目标词汇\(w_t\)的概率越高,其他噪声词汇概率越低,模型就得到优化了
  • 用编造的噪声词汇进行训练
  • 计算loss效率非常高,只需要随机选择\(k\)个,而不是全部词汇

实现Skip-Gram模型

数据说明

Skip-Gram模型是通过目标词汇预测语境词汇。如数据集如下

1
I hope you always find a reason to smile

从中我们可以得到很多目标单词和所对应的上下文信息(多个单词)。如假设设左右词的窗口距离为1,那么相应的信息如下

1
{'hope':['i', 'you'], 'you':['hope', 'alawys']...}

训练时,希望给出目标词汇就能够预测出语境词汇,所以需要这样的训练数据

1
2
3
4
5
# 前面是目标单词,后面是语境词汇,实际上相当于数据的label
('hope', 'i')
('hope', 'you')
('you', 'hope')
('you', 'always')

同时在训练时,制造一些随机单词作为负样本(噪声)。我们希望预测的概率分布在正样本上尽可能大,在负样本上尽可能小。

使用随机梯度下降算法(SGD)来进行最优化求解,并且使用mini-batch的方法,这样来更新embedding中的参数\(\theta\),让损失函数(NCE loss)尽可能小。这样,每个单词的词向量就会在训练的过程中不断调整,最后会处在一个最合适的语料空间位置。

例如,假设训练第\(t\)步,输入目标单词hope,希望预测出you,选择一个噪声词汇reason。则目标函数如下 \[ J_{NEG}^{(t)}=\ln {Q_\theta(D=1|hope, you)} + \ln{Q_\theta(D=0|hope, reason)} \] 目标是更新embedding的参数\(\theta\)以增大目标值,更新方式是计算损失函数对参数\(\theta\)的导数,使得参数\(\theta\)朝梯度方向进行调整。多次以后,模型就能够很好区别出真实语境单词和噪声词。

构建数据集

先来分析数据,对所有的词汇进行编码。对高频词汇给一个id,对于出现次数很少词汇,id就设置为0。高频是选择出现频率最高的50000个词汇。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def build_dataset(self, words):
''' 构建数据集
Args:
words: 单词列表
Returns:
word_code: 所有word的编码,top的单词:数量;其余的:0
topword_id: topword-id
id_topword: id-word
topcount: 包含所有word的一个Counter对象
'''
# 获取top50000频数的单词
unk = 'UNK'
topcount = [[unk, -1]]
topcount.extend(
collections.Counter(words).most_common(
self.__vocab_size - 1))
topword_id = {}
for word, _ in topcount:
topword_id[word] = len(topword_id)
# 构建单词的编码。top单词:出现次数;其余单词:0
word_code = []
unk_count = 0
for w in words:
if w in topword_id:
c = topword_id[w]
else:
c = 0
unk_count += 1
word_code.append(c)
topcount[0][1] = unk_count
id_topword = dict(zip(topword_id.values(), topword_id.keys()))
return word_code, topword_id, id_topword, topcount

产生batch训练样本

由于是使用mini-batch的训练方法,所以每次要产生一些样本。对于每个单词,要确定要产生多少个语境单词,和最多可以左右选择多远。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def generate_batch(self, batch_size, single_num, skip_window, word_code):
'''产生训练样本。Skip-Gram模型,从当前推测上下文
如 i love you. (love, i), (love, you)
Args:
batch_size: 每一个batch的大小,即多少个()
single_num: 对单个单词生成多少个样本
skip_window: 单词最远可以联系的距离
word_code: 所有单词,单词以code形式表示
Returns:
batch: 目标单词
labels: 语境单词
'''
# 条件判断
# 确保每个batch包含了一个词汇对应的所有样本
assert batch_size % single_num == 0
# 样本数量限制
assert single_num <= 2 * skip_window

# batch label
batch = np.ndarray(shape=(batch_size), dtype=np.int32)
labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
# 目标单词和相关单词
span = 2 * skip_window + 1
word_buffer = collections.deque(maxlen=span)
for _ in range(span):
word_buffer.append(word_code[self.__data_index])
self.__data_index = (self.__data_index + 1) % len(word_code)

# 遍历batchsize/samplenums次,每次一个目标词汇,一次samplenums个语境词汇
for i in range(batch_size // single_num):
target = skip_window # 当前的单词
targets_to_void = [skip_window] # 已经选过的单词+自己本身
# 为当前单词选取样本
for j in range(single_num):
while target in targets_to_void:
target = random.randint(0, span - 1)
targets_to_void.append(target)
batch[i * single_num + j] = word_buffer[skip_window]
labels[i * single_num + j, 0] = word_buffer[target]
# 当前单词已经选择完毕,输入下一个单词,skip_window单词也成为下一个
self.__data_index = (self.__data_index + 1) % len(word_code)
word_buffer.append(word_code[self.__data_index])
return batch, labels

一些配置信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 频率top50000个单词
vocab_size = 50000
# 一批样本的数量
batch_size = 128
# 将单词转化为稠密向量的维度
embedding_size = 128
# 为单词找相邻单词,向左向右最多能取得范围
skip_window = 1
# 每个单词的语境单词数量
single_num = 2

# 验证单词的数量
valid_size = 16
# 验证单词从频数最高的100个单词中抽取
valid_window = 100
# 从100个中随机选择16个
valid_examples = np.random.choice(valid_window, valid_size, replace=False)
# 负样本的噪声数量
noise_num = 64

计算图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
graph = tf.Graph()
with graph.as_default():
# 输入数据
train_inputs = tf.placeholder(tf.int32, shape=[batch_size])
train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
valid_dataset = tf.constant(valid_examples, dtype=tf.int32)

with tf.device('/cpu:0'):
# 随机生成单词的词向量,50000*128
embeddings = tf.Variable(
tf.random_uniform([vocab_size, embedding_size], -1.0, 1.0))
# 查找输入inputs对应的向量
embed = tf.nn.embedding_lookup(embeddings, train_inputs)
nce_weights = tf.Variable(
tf.truncated_normal([vocab_size, embedding_size],
stddev=1.0 / math.sqrt(embedding_size)))
nce_biases = tf.Variable(tf.zeros([vocab_size]))
# 为每个batch计算nceloss
loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weights,
biases=nce_biases,
labels = train_labels,
inputs=embed,
num_sampled=noise_num,
num_classes=vocab_size))
# sgd
optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)

# 计算embeddings的L2范式,各元素的平方和然后求平方根,防止过拟合
norm = tf.sqrt(
tf.reduce_sum(
tf.square(embeddings),
axis=1,
keep_dims=True))
# 标准化词向量
normalized_embeddings = embeddings / norm
valid_embeddings = tf.nn.embedding_lookup(
normalized_embeddings, valid_dataset)
# valid单词和所有单词的相似度计算,向量相乘
similarity = tf.matmul(
valid_embeddings,
normalized_embeddings,
transpose_b=True)

init = tf.global_variables_initializer()

训练过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
num_steps = 100001
with tf.Session(graph=graph) as sess:
init.run()
print('Initialized')
avg_loss = 0
for step in range(num_steps):
batch_inputs, batch_labels = wu.generate_batch(
batch_size, single_num, skip_window, word_code)
feed_dict = {train_inputs: batch_inputs, train_labels: batch_labels}
_, loss_val = sess.run([optimizer, loss], feed_dict=feed_dict)
avg_loss += loss_val

if step % 2000 == 0:
if step > 0:
avg_loss /= 2000
print ("avg loss at step %s : %s" % (step, avg_loss))
avg_loss = 0

if step % 10000 == 0:
# 相似度,16*50000
sim = similarity.eval()
for i in range(valid_size):
valid_word = id_topword[valid_examples[i]]
# 选相似的前8个
top_k = 8
# 排序,获得id
nearest = (-sim[i, :]).argsort()[1:top_k+1]
log_str = "Nearest to %s: " % valid_word
for k in range(top_k):
close_word = id_topword[nearest[k]]
log_str = "%s %s," % (log_str, close_word)
print log_str
final_embeddings = normalized_embeddings.eval()