利用TensorFlow和神經網絡來處理文本分類問題
在這篇文章中,作者討論了六個關于創(chuàng)建機器學習模型來進行文本分類的主要話題。
- TensorFlow 如何工作
- 機器學習模型是什么
- 神經網絡是什么
- 神經網絡怎樣進行學習
- 如何處理數據并且把它們傳輸給神經網絡的輸入
- 怎樣運行模型并且得到預測結果
作者也提供了可在Jupyter notebook上運行的代碼。我將回顧這六個話題并且與我自己的經驗相結合。
1. TensorFlow 概覽
TensorFlow 是***的開源 AI 庫之一。它的高計算效率,豐富的開發(fā)資源使它被企業(yè)和個人開發(fā)者廣泛采用。在我看來,學習 TensorFlow 的***的方法就是使用它的官網教程(https://www.tensorflow.org/)。在這個網站上,你可以瀏覽「getting started」教程。
我首先將會對 TensorFlow 的基本定義和主要特征進行介紹。張量(Tensor)是一種數據結構,它可以把原始值形成任意的多維數組【1】。張量的級別就是它的維度數。這里,我建議閱讀 Python 的應用編程接口 API,因為它對 TensorFlow 的初學者來說是很友好的。你可以安裝 TensorFlow 并且配置環(huán)境,緊隨官方網站上的指導就可以了。測試你是否成功安裝 TensorFlow 的方法就是導入(import)TensorFlow 庫。在 TensorFlow 中,計算圖(computational graph)是核心部件。數據流程圖形用來代表計算過程。在圖形下,操作(Operation)代表計算單位,張量代表數據單位。為了運行代碼,我們應該對階段函數(Session function)進行初始化。這里是執(zhí)行求和操作的完整代碼。
- #import the library
- import tensorflow as tf
- #build the graph and name as my_graph
- my_graph = tf.Graph()
- #tf.Session encapsulate the environment for my_graph
- with my_graph.as_default():
- x = tf.constant([1,3,6])
- y = tf.constant([1,1,1])
- #add function
- op = tf.add(x,y)
- #run it by fetches
- result = sess.run(fetches=op)
- #print it
- print(result)
你可以看見在 TensorFlow 中編譯是遵循一種模式的,并且很容易被記住。你將會導入庫,創(chuàng)建恒定張量(constant tensors)并且創(chuàng)建圖形。然后我們應該定義哪一個圖將會被在 Session 中使用,并且定義操作單元。最終你可以在 Session 中使用 run() 的方法,并且評估其中參數獲取的每一個張量。
2. 預測模型
預測模型可以很簡單。它把機器學習算法和數據集相結合。創(chuàng)建一個模型的過程程如下圖所示:
我們首先應該找到正確的數據作為輸入,并且使用一些數據處理函數來處理數據。然后,這些數據就可以與機器學習算法結合來創(chuàng)建模型了。在你得到模型后,你可以把模型當做一個預測器并且輸入需要的數據來預測,從而產生結果。整個進程如下圖所示:
在本文中,輸入是文本,輸出結果是類別(category)。這種機器學習算法叫做監(jiān)督學習,訓練數據集是已標注過種類的文本。這也是分類任務,而且是應用神經網絡來進行模型創(chuàng)建的。
3. 神經網絡
神經網絡的主要特征是自學(self-learning),而不是進行明確地程序化。它的靈感來源于人類中樞神經系統(tǒng)。***個神經網絡算法是感知機(Perceptron)。
為了理解神經網絡的工作機制,作者用 TensorFlow 創(chuàng)建了一個神經網絡結構。
(1) 神經網絡結構
這里作者使用了兩個隱蔽層(hidden layers),每一個隱蔽層的職責是把輸入轉換成輸出層可以使用的東西【1】。***個隱蔽層的節(jié)點的數量應該被定義。這些節(jié)點叫做神經元,和權值相乘。訓練階段是為了對這些值進行調節(jié),為了產生一個正確的輸出。網絡也引入了偏差(bias),這就可以讓你向左或向右移動激活函數,從而讓預測結果更加準確【2】。數據還會經過一個定義每個神經元最終輸出的激活函數。這里,作者使用的是修正線性單元(ReLU),可以增加非線性。這個函數被定義為:
- f(x) = max(0,x)(輸出是 x 或 0,無論 x 多大)
對第二個隱蔽層來說,輸入就是***層,函數與***個隱蔽層相同。
對于輸出層,作者使用的是 one-hot 編碼來得到結果。在 one-hot 編碼中,除了其中的一位值為 1 以外,所有的位元(bits)都會得到一個 0 值。這里使用三種類別作為范例,如下圖所示。
我們可以發(fā)現輸出節(jié)點的數量值就是類別的數量值。如果我們想要劃分不同的類別,我們可以使用 Softmax 函數來使每一個單元的輸出轉化成 0 到 1 間的值,并且使所有單元的總和為 1。它將會告訴我們每種類別的概率是多少。
上述過程由下列代碼實現:
- # Network Parameters
- n_hidden_1 = 10 # 1st layer number of features
- n_hidden_2 = 5 # 2nd layer number of features
- n_input = total_words # Words in vocab
- n_classes = 3 # Categories: graphics, space and baseball
- def multilayer_perceptron(input_tensor, weights, biases):
- layer_1_multiplication = tf.matmul(input_tensor, weights['h1'])
- layer_1_addition = tf.add(layer_1_multiplication, biases['b1'])
- layer_1_activation = tf.nn.relu(layer_1_addition)
- # Hidden layer with RELU activation
- layer_2_multiplication = tf.matmul(layer_1_activation, weights['h2'])
- layer_2_addition = tf.add(layer_2_multiplication, biases['b2'])
- layer_2_activation = tf.nn.relu(layer_2_addition)
- # Output layer with linear activation
- out_layer_multiplication = tf.matmul(layer_2_activation, weights['out'])
- out_layer_addition = out_layer_multiplication + biases['out']
- return out_layer_addition
在這里,它調用了 matmul()函數來實現矩陣之間的乘法函數,并調用 add()函數將偏差添加到函數中。
4. 神經網絡是如何訓練的
我們可以看到其中要點是構建一個合理的結構,并優(yōu)化網絡權重的預測。接下來我們需要訓練 TensorFlow 中的神經網絡。在 TensorFlow 中,我們使用 Variable 來存儲權重和偏差。在這里,我們應該將輸出值與預期值進行比較,并指導函數獲得最小損失結果。有很多方法來計算損失函數,由于它是一個分類任務,所以我們應該使用交叉熵誤差。此前 D. McCaffrey[3] 分析并認為交叉熵可以避免訓練停滯不前。我們在這里通過調用函數 tf.nn.softmax_cross_entropy_with_logits() 來使用交叉熵誤差,我們還將通過調用 function: tf.reduced_mean() 來計算誤差。
- # Construct model
- prediction = multilayer_perceptron(input_tensor, weights, biases)
- # Define loss
- entropy_loss = tf.nn.softmax_cross_entropy_with_logits(logits=prediction, labels=output_tensor)
- loss = tf.reduce_mean(entropy_loss)
我們應該找到***值來使輸出誤差最小化。這里我們使用隨機梯度下降(SGD)的方法:
通過多次迭代,我們將會得到接近于全局最小損失的權值。學習速率不應該太大。自適應瞬間評估函數(Adaptive Moment Estimation function)經常用于計算梯度下降。在這個優(yōu)化算法中,對梯度和梯度的二階矩量進行平滑處理【4】。
代碼如下所示,在其它項目中,學習速率可以是動態(tài)的,從而使訓練過程更加迅速。
- learning_rate = 0.001
- # Construct model
- prediction = multilayer_perceptron(input_tensor, weights, biases)
- # Define loss
- entropy_loss = tf.nn.softmax_cross_entropy_with_logits(logits=prediction, labels=output_tensor)
- loss = tf.reduce_mean(entropy_loss)
- optimizer = tf.train.AdamOptimizer(learning_ratelearning_rate=learning_rate).minimize(loss)
5. 數據操作
這一部分對于分類成功也很重要。機器學習的開發(fā)者們需要更加在意數據,這會為你節(jié)省大量時間,并讓結果更加準確,因為這可以讓你無需從頭開始更改配置。在這里,筆者需要指出兩個重點。首先,為每個單詞創(chuàng)建一個索引;然后為每個文本創(chuàng)建一個矩陣,如果單詞在文本中,則值為 1,否則為 0。以下代碼可以幫助你理解這個過程:
- import numpy as np #numpy is a package for scientific computing
- from collections import Counter
- vocab = Counter()
- text = "Hi from Brazil"
- #Get all words
- for word in text.split(' '):
- vocab[word]+=1
- #Convert words to indexes
- def get_word_2_index(vocab):
- word2index = {}
- for i,word in enumerate(vocab):
- word2index[word] = i
- return word2index
- #Now we have an index
- word2index = get_word_2_index(vocab)
- total_words = len(vocab)
- #This is how we create a numpy array (our matrix)
- matrix = np.zeros((total_words),dtype=float)
- #Now we fill the values
- for word in text.split():
- matrix[word2index[word]] += 1
- print(matrix)
- >>> [ 1. 1. 1.]
Python 中的 Counter() 是一個哈希表。當輸入是「Hi from Brazil」時,矩陣是 [1 ,1, 1]。如果輸入不同,比如「Hi」,矩陣會得到不同的結果:
- matrix = np.zeros((total_words),dtype=float)
- text = "Hi"
- for word in text.split():
- matrix[word2index[word.lower()]] += 1
- print(matrix)
- >>> [ 1. 0. 0.]
6. 運行模型,獲得結果
在這一部分里,我們將使用 20 Newsgroups 作為數據集。它包含有關 20 種話題的 18,000 篇文章。我們使用 scilit-learn 庫加載數據。在這里作者使用了 3 個類別:comp.graphics、sci.space 和 rec.sport.baseball。它有兩個子集,一個用于訓練,一個用于測試。下面是加載數據集的方式:
- from sklearn.datasets import fetch_20newsgroups
- categories = ["comp.graphics","sci.space","rec.sport.baseb
- newsgroups_train = fetch_20newsgroups(subset='train', categoriescategories=categories)
- newsgroups_test = fetch_20newsgroups(subset='test', categoriescategories=categories)
它遵循通用的模式,非常易于開發(fā)者使用。
在實驗中,epoch 設定為 10,這意味著會有 10 次正+反向遍歷整個數據集。在 TensorFlow 中,占位符的作用是用作 Feed 的目標,用于傳遞每個運行步驟的數據。
- n_input = total_words # Words in vocab
- n_classes = 3 # Categories: graphics, sci.space and baseball
- input_tensor = tf.placeholder(tf.float32,[None, n_input],name="input")
- output_tensor = tf.placeholder(tf.float32,[None, n_classes],name="output")
我們應該分批訓練數據,因為在測試模型時,我們會用更大的批次來輸入 dict。調用 get_batches() 函數來獲取具有批處理尺寸的文本數。接下來,我們就可以運行模型了。
- training_epochs = 10
- # Launch the graph
- with tf.Session() as sess:
- sess.run(init) #inits the variables (normal distribution, reme
- # Training cycle
- for epoch in range(training_epochs):
- avg_cost = 0.
- total_batch = int(len(newsgroups_train.data)/batch_size)
- # Loop over all batches
- for i in range(total_batch):
- batch_x,batch_y = get_batch(newsgroups_train,i,batch_size)
- # Run optimization op (backprop) and cost op (to get loss value)
- c,_ = sess.run([loss,optimizer], feed_dict={input_tensor: batch_x, output_tensor:batch_y})
在這里我們需要構建測試模型,并計算它的準確性。
- # Test model
- index_prediction = tf.argmax(prediction, 1)
- index_correct = tf.argmax(output_tensor, 1)
- correct_prediction = tf.equal(index_prediction, index_correct)
- # Calculate accuracy
- accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
- total_test_data = len(newsgroups_test.target)
- batch_x_test,batch_y_test = get_batch(newsgroups_test,0,total_test_data)
- print("Accuracy:", accuracy.eval({input_tensor: batch_x_test, output_tensor: batch_y_test}))
然后我們就可以得到結果:
結論
本文介紹了如何使用神經網絡和 TensorFlow 來處理文本分類任務。它介紹了與實驗有關的基礎信息,然而,在我自己運行的時候,效果就沒有作者那么好了。我們或許可以在這個架構的基礎上改進一番,在隱藏層中使用 dropout 肯定會提高準確性。
在運行代碼前,請確認你已安裝了***版本的 TensorFlow。有些時候你可能會無法導入 twenty_newsgroups 數據集。當這種情況發(fā)生時,請使用以下代碼來解決問題。
- # if you didn't download the twenty_newsgroups datasets, it will run with error
- # this logging can help to solve the error
- import logging
- logging.basicConfig()
以下是完整代碼:
- import pandas as pd
- import numpy as np
- import tensorflow as tf
- from collections import Counter
- from sklearn.datasets import fetch_20newsgroups
- # if you didn't download the twenty_newsgroups datasets, it will run with error
- # this logging can help to solve the error
- import logging
- logging.basicConfig()
- categories = ["comp.graphics","sci.space","rec.sport.baseball"]
- newsgroups_train = fetch_20newsgroups(subset='train', categoriescategories=categories)
- newsgroups_test = fetch_20newsgroups(subset='test', categoriescategories=categories)
- print('total texts in train:',len(newsgroups_train.data))
- print('total texts in test:',len(newsgroups_test.data))
- vocab = Counter()
- for text in newsgroups_train.data:
- for word in text.split(' '):
- vocab[word.lower()]+=1
- for text in newsgroups_test.data:
- for word in text.split(' '):
- vocab[word.lower()]+=1
- total_words = len(vocab)
- def get_word_2_index(vocab):
- word2index = {}
- for i,word in enumerate(vocab):
- word2index[word.lower()] = i
- return word2index
- word2index = get_word_2_index(vocab)
- def get_batch(df,i,batch_size):
- batches = []
- results = []
- texts = df.data[i*batch_size:i*batch_size+batch_size]
- categories = df.target[i*batch_size:i*batch_size+batch_size]
- for text in texts:
- layer = np.zeros(total_words,dtype=float)
- for word in text.split(' '):
- layer[word2index[word.lower()]] += 1
- batches.append(layer)
- for category in categories:
- y = np.zeros((3),dtype=float)
- if category == 0:
- y[0] = 1.
- elif category == 1:
- y[1] = 1.
- else:
- y[2] = 1.
- results.append(y)
- return np.array(batches),np.array(results)
- # Parameters
- learning_rate = 0.01
- training_epochs = 10
- batch_size = 150
- display_step = 1
- # Network Parameters
- n_hidden_1 = 100 # 1st layer number of features
- n_hidden_2 = 100 # 2nd layer number of features
- n_input = total_words # Words in vocab
- n_classes = 3 # Categories: graphics, sci.space and baseball
- input_tensor = tf.placeholder(tf.float32,[None, n_input],name="input")
- output_tensor = tf.placeholder(tf.float32,[None, n_classes],name="output")
- def multilayer_perceptron(input_tensor, weights, biases):
- layer_1_multiplication = tf.matmul(input_tensor, weights['h1'])
- layer_1_addition = tf.add(layer_1_multiplication, biases['b1'])
- layer_1 = tf.nn.relu(layer_1_addition)
- # Hidden layer with RELU activation
- layer_2_multiplication = tf.matmul(layer_1, weights['h2'])
- layer_2_addition = tf.add(layer_2_multiplication, biases['b2'])
- layer_2 = tf.nn.relu(layer_2_addition)
- # Output layer
- out_layer_multiplication = tf.matmul(layer_2, weights['out'])
- out_layer_addition = out_layer_multiplication + biases['out']
- return out_layer_addition
- # Store layers weight & bias
- weights = {
- 'h1': tf.Variable(tf.random_normal([n_input, n_hidden_1])),
- 'h2': tf.Variable(tf.random_normal([n_hidden_1, n_hidden_2])),
- 'out': tf.Variable(tf.random_normal([n_hidden_2, n_classes]))
- }
- biases = {
- 'b1': tf.Variable(tf.random_normal([n_hidden_1])),
- 'b2': tf.Variable(tf.random_normal([n_hidden_2])),
- 'out': tf.Variable(tf.random_normal([n_classes]))
- }
- # Construct model
- prediction = multilayer_perceptron(input_tensor, weights, biases)
- # Define loss and optimizer
- loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=prediction, labels=output_tensor))
- optimizer = tf.train.AdamOptimizer(learning_ratelearning_rate=learning_rate).minimize(loss)
- # Initializing the variables
- init = tf.initialize_all_variables()
- # Launch the graph
- with tf.Session() as sess:
- sess.run(init)
- # Training cycle
- for epoch in range(training_epochs):
- avg_cost = 0.
- total_batch = int(len(newsgroups_train.data)/batch_size)
- # Loop over all batches
- for i in range(total_batch):
- batch_x,batch_y = get_batch(newsgroups_train,i,batch_size)
- # Run optimization op (backprop) and cost op (to get loss value)
- c,_ = sess.run([loss,optimizer], feed_dict={input_tensor: batch_x,output_tensor:batch_y})
- # Compute average loss
- avg_cost += c / total_batch
- # Display logs per epoch step
- if epoch % display_step == 0:
- print("Epoch:", '%04d' % (epoch+1), "loss=", \
- "{:.9f}".format(avg_cost))
- print("Optimization Finished!")
- # Test model
- correct_prediction = tf.equal(tf.argmax(prediction, 1), tf.argmax(output_tensor, 1))
- # Calculate accuracy
- accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
- total_test_data = len(newsgroups_test.target)
- batch_x_test,batch_y_test = get_batch(newsgroups_test,0,total_test_data)
- print("Accuracy:", accuracy.eval({input_tensor: batch_x_test, output_tensor: batch_y_test}))
【本文是51CTO專欄機構“機器之心”的原創(chuàng)文章,微信公眾號“機器之心( id: almosthuman2014)”】