摘 要: 為解決AprioriTid算法對大數據執行效率不高的問題,根據Hadoop平臺的MapReduce模型,分析了AprioriTid算法的并行化方法,給出了并行化的主要步驟和Map、Reduce函數的描述。與串行的AprioriTid算法相比,并行算法利用了多個節點的計算能力,縮短了從大數據集中挖掘關聯規則的時間。對并行算法的性能進行了測試,實驗結果表明,并行AprioriTid算法具有較高的執行效率和較好的可擴展性。
關鍵詞: AprioriTid算法;MapReduce;Hadoop;關聯規則
0 引言
AprioriTid算法[1]是AGRAWAL R等人提出的關聯規則挖掘算法,該算法在迭代計算頻繁k-項集的過程中使用Ck代替事務數據庫,通過逐步縮小Ck的規模來減少I/O讀取時間,進而提高算法的執行效率[2]。但是,AprioriTid算法的時間復雜度較高,對大數據而言,該算法在單機環境下的執行時間太長,難以取得較高的執行效率。云計算是解決這個問題的可行方法,由于云計算平臺具有強大的計算能力,突破了單機計算能力的限制,為用戶高效處理大數據提供了支持。MapReduce[3]是云計算環境下被廣泛采用的并行編程模型。本文根據Hadoop平臺[4]的MapReduce模型,給出了一種AprioriTid算法的并行化實現方法。并行算法利用計算機集群的多個節點并行計算候選項集的支持度,解決了單機環境下AprioriTid算法對大數據執行時間過長的問題,提高了挖掘關聯規則的效率。
1 相關知識
1.1 AprioriTid算法分析
AprioriTid算法是在Apriori算法[1]的基礎上改進而來的關聯規則挖掘算法,參考文獻[1]給出了該算法的描述和實例分析。AprioriTid算法計算頻繁-1項集和生成候選k-項集的方法與Apriori算法是相同的,但是在發現頻繁k-項集的過程中采用了Ck代替事務數據庫D。對于所有的候選k-項集Ck,數據庫D中的一個事務t在Ck中對應的記錄表示為:<t.TID,{c∈Ck|c contained in t}。例如:k=2,C2={{1 3},{1 4},{1 5},{3 4},{3 5},{4 5}},D的一個事務t=<100,{1 2 3 4}>,則t在中C2對應的記錄表示為<100,{{1 3},{1 4},{3 4}}>。
AprioriTid算法計算頻繁k-項集(k≥2)的時間復雜度為O(|Ck|×|Ck-1|×|t|),其中,|t|是Ck-1的記錄包含的候選(k-1)-項集的平均個數。在一般情況下,當k的取值較小時,Ck-1的規模會大于事務數據庫的規模,算法的時間復雜度較高??梢姡趩螜C環境下應用AprioriTid算法對大數據挖掘關聯規則難以取得較高的執行效率。在并行條件下,利用多臺計算機的處理能力能很好地解決運行效率低下的問題[5]。如果將Ck-1分解成p個子數據集,由p個節點對子數據集并行計算頻繁k-項集,則在單個節點上的時間復雜度為O(|Ck|×|Ck-1|×|t|/p),這就提高了算法的執行效率,這也是本文實現AprioriTid算法并行化的基本思想。
1.2 Hadoop平臺的MapReduce模型
MapReduce模型的基本思想是將處理的問題分解為映射和歸約操作,由用戶實現的Map和Reduce函數完成大規模的并行化計算[6]。開源的云計算平臺Hadoop實現了MapReduce模型,MapReduce任務在Hadoop平臺上的執行流程如圖1所示。數據文件被切分成多個數據分片存儲在Hadoop分布式文件系統HDFS中,在input階段,MapReduce支持庫將數據分片的記錄解析為<key,value>對輸入Map函數,Map函數對數據分片進行處理,產生一組新的<key,value>對中間結果。在shuffle階段,相同key的value值被合并為values集合,以<key,values>對傳遞給Reduce函數。Reduce函數對<key,values>集合作進一步處理,將最終結果輸出到HDFS中。
在實際的應用中,不同的數據文件通常采用不同的記錄格式,為了將記錄解析成合適的<key,value>對,Hadoop平臺為用戶提供了TextInputFormat、KeyValueTextInputFormat等類,使用這些類可以指定輸入Map函數的key和value。在Hadoop的MapReduce模型中,執行Map函數的各個節點分別處理不同的數據分片,如果所有節點都能夠讀取相同的文件,就需要借助Hadoop的分布式緩存機制[7]。在主程序中將待分發到所有節點的文件設置為分布式緩存文件,各個節點便可以同時讀取這些文件。
2 AprioriTid算法的MapReduce并行化
2.1 AprioriTid算法的并行化方法
AprioriTid算法的執行過程包括兩個階段:首先從事務數據庫中找出所有頻繁1-項集L1,然后采用迭代方法計算所有頻繁k-項集Lk,每次迭代計算的輸入數據是Lk-1和Ck-1,輸出結果是Lk和Ck。按照Hadoop的MapReduce模型,結合分布式緩存機制,這兩個階段都能實現并行化,方法如下:
?。?)將事務數據庫切分成多個數據分片,多個節點并行對數據分片統計各個項的支持度計數,從而實現了L1的并行計算。
?。?)將Lk-1設置為分布式緩存文件,Map函數讀取Lk-1,通過執行Ck=apriori-gen(Lk-1)生成所有候選k-項集Ck,將Ck存放在節點的內存中。將Ck-1切分成多個數據分片,多個節點根據Ck對Ck-1的分片并行統計候選k-項集的支持度計數生成Ck,從而實現了Lk和Ck的并行計算。
2.2 AprioriTid算法并行化的主要步驟
為了便于描述,設定事務數據庫D、Ck、頻繁k-項集Lk在HDFS中的目錄分別為DPath、CPathk、LPathk。在并行算法的執行過程中,需要多個MapReduce作業(Job)才能完成,第k個Job用Jobk表示,算法并行化的主要步驟描述如下。
(1)在主程序中設置Job1的輸入路徑為DPath,輸出路徑為LPath1。
?。?)Map函數讀取D的數據分片,將事務t的所有項ij分解出來,輸出<ij,1>鍵/值對。Reduce函數統計項ij的支持度計數count,將滿足最小支持度閾值minsup的項ij及其支持度計數輸出到LPath1目錄的文件中。Map、Reduce函數描述如下:
map(key=TID,value=t){
for each項ij of t
emit(<ij,1>);
}
reduce(key=ij,values={1,1,…}){
count=|values|; //|values|是集合中1的個數
if (count≥minsup)emit (<ij,count>);
}
?。?)k=2,C1=D。
(4)在主程序中將Lk-1設置為分布式緩存文件,設置Jobk的輸入路徑為CPathk-1,輸出路徑為LPathk。
?。?)在Map函數中定義setup、map和cleanup方法。setup方法讀取Lk-1,執行Ck=apriori-gen(Lk-1),初始化Ck。map方法讀取Ck-1的分片,對于分片的每一個記錄t,根據Ck計算t包含的候選k-項集集合Ct,將Ct添加到Ck中,并將Ct的每一個候選k-項集c以<c,l>鍵/值對傳遞給Reduce函數。cleanup方法將Ck保存到CPathk目錄的一個文件中。Reduce函數統計候選k-項集的支持度計數,將滿足最小支持度閾值的頻繁k-項集及其支持度計數輸出到LPathk目錄的文件中。
Map函數描述如下:
setup(){
從LPathk-1目錄讀取Lk-1;
Ck=apriori-gen(Lk-1);
Ck=;
}
map(key=TID,value=t){
Ct=;//初始化Ct
Ct={c∈Ck|(c-c[k])∈t.set-of-itemsets∧(c-c[k-1])∈t.set-of-itemsets};
if(Ct≠){
Ck+=<TID,Ct>;
for each 候選k-項集c∈Ct
emit(<c,1>);
}
cleanup(){
將Ck作為一個文件保存到CPathk目錄;
}
Reduce函數描述如下:
reduce(key=c,values={1,1,…}){
count=|values|;
if(count≥minsup)emit(<c,count>);
}
?。?)如果Lk=,則算法結束;否則,k=k+1,轉向執行步驟(4)。
3 實驗與結果分析
使用6臺配置相同的計算機和一臺100 M交換機搭建Hadoop集群,操作系統是Ubuntu Linux 12.04,安裝的軟件是Hadoop 1.2.1、JDK 1.7.0_51。從Frequent Itemset Mining Dataset Repository網站(http://fimi.ua.ac.be/data/)下載了6個數據集:chess、mushroom、pumsb_star、kosarak、retail和accidents,由于這些數據集的事務沒有TID,通過編寫程序給事務添加了從0開始順序編號的TID。
3.1算法的執行時間測試
使用Java實現了AprioriTid的串行算法和并行算法,使用4個數據集測試算法的執行時間。由于數據集的稠密程度不同,對這些數據集設置了不同的最小支持度,retail、kosarak、pumsb_star、chess的最小支持度閾值分別設置為0.3%、0.8%、45%、75%。
在單機環境下,串行算法對retail、kosarak、pumsb_star、chess的執行時間分別為1 879 s、721 s、906 s、3 265 s。在Hadoop平臺上,并行AprioriTid算法的執行時間如圖2所示。當節點數為2時,并行算法對4個數據集的執行時間都小于串行算法的執行時間,隨著節點數的增加,并行算法的執行時間不斷減少。由此可見,并行AprioriTid算法能取得較高的執行效率。
3.2 算法的可擴展性測試
使用1個節點對DB規模的數據執行算法的時間表示為t1×DB,使用m個節點對m×DB規模的數據執行算法的時間表示為tm×m×DB,則算法的可擴展性[8]定義為:scaleup(DB,m)=t1×DB/tm×m×DB。
對數據集mushroom和accidents測試并行AprioriTid算法的可擴展性。mushroom、accidents的最小支持度閾值分別設置為25%、70%,當使用m個節點時,將數據集復制m份上傳到HDFS,實驗結果如圖3所示。從實驗結果可以看出,對于兩個數據集,并行AprioriTid算法都取得了較好的可擴展性。
4 結論
AprioriTid算法是一種時間復雜度較高的關聯規則挖掘算法,在單機環境下對大數據的執行效率不高。針對這個問題,本文給出了一種AprioriTid算法的MapReduce并行化方法,該方法利用多個節點對數據分片并行計算候選項集的支持度,縮短了發現頻繁項集的時間。使用多個數據集測試了算法的性能,實驗結果表明,并行AprioriTid算法具有較高的執行效率,適合云計算環境下對大數據挖掘關聯規則。
參考文獻
[1] AGRAWAL R, SRIKANT R. Fast algorithms for mining association rules[C]. Proceedings of the 20th International Conference on Very Large Data Bases. Santiago,Chile, 1994: 487-499.
[2] 向程冠,姜季春,陳梅,等.AprioriTid算法的改進[J].計算機工程與設計,2009,30(15):3581-3583.
[3] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large clusters[J]. Communications of the ACM, 2008,51(1):107-113.
[4] The apache software foundation. Hadoop[EB/OL]. (2015-07-08) [2015-08-16]. http://hadoop.apache.org/.
[5] 廖寶魁,孫雋楓.基于MapReduce的增量數據挖掘研究[J].微型機與應用,2014,33(1):67-70.
[6] 亢麗蕓,王效岳,白如江.MapReduce原理及其主要實現平臺分析[J].現代圖書情報技術,2012(2):60-67.
[7] CHUCK L. Hadoop實戰[M].韓冀中,譯.北京:人民郵電出版社,2011.
[8] 楊勇,王偉.一種基于MapReduce的并行FP-growth算法[J].重慶郵電大學學報(自然科學版),2013,25(5):651-657.