Skip to content

利用tensorflow实现简版word2vec

📅 发表于 2017/07/15
🔄 更新于 2017/07/15
👁️ 次访问
📝 0 字
0 分钟
深度学习
#深度学习
#word2vec
#自然语言处理

参考自Tensorflow的word2vec教程,本文主要包含了自己的实现理解。非常青涩时期写的

相关知识

传统方法

One-Hot Encoder 是一个词对应一个向量,向量中只有一个是1,其余是0,离散表达。$ $

Bag of Words 标识当前单词那一位不是1,而是变成了当前单词的出现次数。

存在的问题 需要大量的维数去表示,编码随机的,没有任何关联的信息。

向量空间模型

Vector Space Models可以把字词转化为连续值,并将意思相近的词被映射到向量空间相近的位置。

VSM在NLP中的一个重要假设是:在相同语境中出现的词,语义也相近。

有如下两种模型

  • 计数模型 统计语料库中相邻出现的词的频率,再把这些计数结果转为小而稠密的矩阵。
  • 预测模型 根据一个词周围相邻的词汇推测出这个词。

Word2Vec

Word2Vec是一种计算非常高效的、可以从原始语料中学习字词空间向量的预测模型。

有如下两种模型

  • CBOW Continuous Bag of Words 从语境推测目标词汇,适合小型数据。如“中国的首都是__”推测出“北京”。把一整段上下文信息当做一个观察对象
  • Skip-Gram 从目标词汇推测语境,适合大型语料。 把每一对上下文-目标词汇当做一个观察对象

Word2Vec的一些优点

  • 连续的词向量能够捕捉到更多的语义和关联信息

  • 意思相近的词语在向量空间中的位置也会比较近。如北京-成都、狗-猫等词汇会分别聚集在一起。

  • 能学会一些高阶语言的抽象概念。如"man-woman"和"king-queen"的向量很相似。

Word2Vec学习的抽象概念

噪声对比训练

神经概率化语言模型通常使用极大似然法进行训练,再使用Softmax函数得到在给出上下文单词h的情况下,目标词wt出现的最大概率,设为P(wt|h)

score(wt,h)为当前词wt和上下文单词h的相容性,通常使用向量积获得。

P(wt|h)=Softmax(score(wi,h))=escore(wi,h)i=1vescore(wi,h)

通过对数似然函数max likelihood来进行训练

Jml=lnP(wt|h)=score(wt,h)lni=1vescore(wi,h)

这个方法看起来可行,但是消耗太大了,因为要对当前h与所有单词w的相容性score(w,h)

在使用word2vec模型中,我们并不需要对所有的特征进行学习。所以在CBOW模型和Skip-Gram模型中,会构造k个噪声单词,而我们只需要从这k个中找出真正目标单词wt即可,使用了一个二分类器(lr)。下面是CBOW模型,对于Skip-Gram模型只需要相反就行了。

Qθ(D=1|w,h)是二元逻辑回归的概率,即在当前条件下出现词语w的概率。

  • θ 输入的embedding vector
  • h 当前上下文
  • d 输入数据集
  • w 目标词汇(就是他出现的概率)

此时,最大化目标函数如下:

JNEG=lnQθ(D=1|wt,h)+i=1klnQθ(D=0|wI,h)k

前半部分为词w出现的概率,后面为k个噪声概率的期望值(如果写法有错误,希望提出,再改啦),有点像蒙特卡洛

负采样Negative Sampling

  • 当模型预测的真实目标词汇wt的概率越高,其他噪声词汇概率越低,模型就得到优化了
  • 用编造的噪声词汇进行训练
  • 计算loss效率非常高,只需要随机选择k个,而不是全部词汇

实现Skip-Gram模型

数据说明

Skip-Gram模型是通过目标词汇预测语境词汇。如数据集如下

python
I hope you always find a reason to smile

从中我们可以得到很多目标单词和所对应的上下文信息(多个单词)。如假设设左右词的窗口距离为1,那么相应的信息如下

python
{'hope':['i', 'you'], 'you':['hope', 'alawys']...}

训练时,希望给出目标词汇就能够预测出语境词汇,所以需要这样的训练数据

python
# 前面是目标单词,后面是语境词汇,实际上相当于数据的label
('hope', 'i')
('hope', 'you')
('you', 'hope')
('you', 'always')

同时在训练时,制造一些随机单词作为负样本(噪声)。我们希望预测的概率分布在正样本上尽可能大,在负样本上尽可能小。

使用随机梯度下降算法(SGD)来进行最优化求解,并且使用mini-batch的方法,这样来更新embedding中的参数θ,让损失函数(NCE loss)尽可能小。这样,每个单词的词向量就会在训练的过程中不断调整,最后会处在一个最合适的语料空间位置。

例如,假设训练第t步,输入目标单词hope,希望预测出you,选择一个噪声词汇reason。则目标函数如下

JNEG(t)=lnQθ(D=1|hope,you)+lnQθ(D=0|hope,reason)

目标是更新embedding的参数θ以增大目标值,更新方式是计算损失函数对参数θ的导数,使得参数θ朝梯度方向进行调整。多次以后,模型就能够很好区别出真实语境单词和噪声词。

构建数据集

先来分析数据,对所有的词汇进行编码。对高频词汇给一个id,对于出现次数很少词汇,id就设置为0。高频是选择出现频率最高的50000个词汇。

python
def build_dataset(self, words):
    ''' 构建数据集
        Args:
            words: 单词列表
        Returns:
            word_code: 所有word的编码,top的单词:数量;其余的:0
            topword_id: topword-id
            id_topword: id-word
            topcount: 包含所有word的一个Counter对象
        '''
    # 获取top50000频数的单词
    unk = 'UNK'
    topcount = [[unk, -1]]
    topcount.extend(
        collections.Counter(words).most_common(
            self.__vocab_size - 1))
    topword_id = {}
    for word, _ in topcount:
        topword_id[word] = len(topword_id)
        # 构建单词的编码。top单词:出现次数;其余单词:0
        word_code = []
        unk_count = 0
        for w in words:
            if w in topword_id:
                c = topword_id[w]
            else:
                c = 0
                unk_count += 1
            word_code.append(c)
    topcount[0][1] = unk_count
    id_topword = dict(zip(topword_id.values(), topword_id.keys()))
    return word_code, topword_id, id_topword, topcount

产生batch训练样本

由于是使用mini-batch的训练方法,所以每次要产生一些样本。对于每个单词,要确定要产生多少个语境单词,和最多可以左右选择多远。

python
def generate_batch(self, batch_size, single_num, skip_window, word_code):
    '''产生训练样本。Skip-Gram模型,从当前推测上下文
    如 i love you. (love, i), (love, you)
    Args:
        batch_size: 每一个batch的大小,即多少个()
        single_num: 对单个单词生成多少个样本
        skip_window: 单词最远可以联系的距离
        word_code: 所有单词,单词以code形式表示
    Returns:
        batch: 目标单词
        labels: 语境单词
    '''
    # 条件判断
    # 确保每个batch包含了一个词汇对应的所有样本
    assert batch_size % single_num == 0
    # 样本数量限制
    assert single_num <= 2 * skip_window

    # batch label
    batch = np.ndarray(shape=(batch_size), dtype=np.int32)
    labels = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
    # 目标单词和相关单词
    span = 2 * skip_window + 1
    word_buffer = collections.deque(maxlen=span)
    for _ in range(span):
        word_buffer.append(word_code[self.__data_index])
        self.__data_index = (self.__data_index + 1) % len(word_code)

    # 遍历batchsize/samplenums次,每次一个目标词汇,一次samplenums个语境词汇
    for i in range(batch_size // single_num):
        target = skip_window                # 当前的单词
        targets_to_void = [skip_window]     # 已经选过的单词+自己本身
        # 为当前单词选取样本
        for j in range(single_num):
            while target in targets_to_void:
                target = random.randint(0, span - 1)
            targets_to_void.append(target)
            batch[i * single_num + j] = word_buffer[skip_window]
            labels[i * single_num + j, 0] = word_buffer[target]
        # 当前单词已经选择完毕,输入下一个单词,skip_window单词也成为下一个
        self.__data_index = (self.__data_index + 1) % len(word_code)
        word_buffer.append(word_code[self.__data_index])
    return batch, labels

一些配置信息

python
# 频率top50000个单词
vocab_size = 50000
# 一批样本的数量
batch_size = 128
# 将单词转化为稠密向量的维度
embedding_size = 128
# 为单词找相邻单词,向左向右最多能取得范围
skip_window = 1
# 每个单词的语境单词数量
single_num = 2

# 验证单词的数量
valid_size = 16
# 验证单词从频数最高的100个单词中抽取
valid_window = 100
# 从100个中随机选择16个
valid_examples = np.random.choice(valid_window, valid_size, replace=False)
# 负样本的噪声数量
noise_num = 64

计算图

python
graph = tf.Graph()
with graph.as_default():
    # 输入数据
    train_inputs = tf.placeholder(tf.int32, shape=[batch_size])
    train_labels = tf.placeholder(tf.int32, shape=[batch_size, 1])
    valid_dataset = tf.constant(valid_examples, dtype=tf.int32)

    with tf.device('/cpu:0'):
        # 随机生成单词的词向量,50000*128
        embeddings = tf.Variable(
            tf.random_uniform([vocab_size, embedding_size], -1.0, 1.0))
        # 查找输入inputs对应的向量
        embed = tf.nn.embedding_lookup(embeddings, train_inputs)
        nce_weights = tf.Variable(
            tf.truncated_normal([vocab_size, embedding_size],
                                stddev=1.0 / math.sqrt(embedding_size)))
        nce_biases = tf.Variable(tf.zeros([vocab_size]))
    # 为每个batch计算nceloss
    loss = tf.reduce_mean(tf.nn.nce_loss(weights=nce_weights,
                                         biases=nce_biases,
                                         labels = train_labels,
                                         inputs=embed,
                                         num_sampled=noise_num,
                                         num_classes=vocab_size))
    # sgd
    optimizer = tf.train.GradientDescentOptimizer(1.0).minimize(loss)

    # 计算embeddings的L2范式,各元素的平方和然后求平方根,防止过拟合
    norm = tf.sqrt(
        tf.reduce_sum(
            tf.square(embeddings),
            axis=1,
            keep_dims=True))
    # 标准化词向量
    normalized_embeddings = embeddings / norm
    valid_embeddings = tf.nn.embedding_lookup(
                                normalized_embeddings, valid_dataset)
    # valid单词和所有单词的相似度计算,向量相乘
    similarity = tf.matmul(
        valid_embeddings,
        normalized_embeddings,
        transpose_b=True)

    init = tf.global_variables_initializer()

训练过程

python
num_steps = 100001
with tf.Session(graph=graph) as sess:
    init.run()
    print('Initialized')
    avg_loss = 0
    for step in range(num_steps):
        batch_inputs, batch_labels = wu.generate_batch(
                batch_size, single_num, skip_window,  word_code)
        feed_dict = {train_inputs: batch_inputs, train_labels: batch_labels}
        _, loss_val = sess.run([optimizer, loss], feed_dict=feed_dict)
        avg_loss += loss_val

        if step % 2000 == 0:
            if step > 0:
                avg_loss /= 2000
            print ("avg loss at step %s : %s" % (step, avg_loss)) 
            avg_loss = 0
        
        if step % 10000 == 0:
            # 相似度,16*50000
            sim = similarity.eval()
            for i in range(valid_size):
                valid_word = id_topword[valid_examples[i]]
                # 选相似的前8个
                top_k = 8
                # 排序,获得id
                nearest = (-sim[i, :]).argsort()[1:top_k+1]
                log_str = "Nearest to %s: " % valid_word
                for k in range(top_k):
                    close_word = id_topword[nearest[k]]
                    log_str = "%s %s," % (log_str, close_word)
                print log_str
    final_embeddings = normalized_embeddings.eval()
总访客数:   ·   总访问量:
PLM's Blog @ 2016 - 2025