最近,研究了一下pagerank算法,收获颇丰,所以花一些时间整理一下相关知识。本文提到的代码都是可以执行的,如果有任何问题,欢迎留言交流。
本文结构如下:
-
pagerank算法介绍
–算法背景和原理
–算法局限性
–算法改进 -
分布式实现pagerank
–计算转移矩阵
–循环计算pagerank
–规范化计算结果
–关于代码的补充
PageRank算法介绍
算法背景和原理
- 1、pagerank的江湖地位
提起pagerank算法,大家一定不陌生,这是Google发家的关键法宝之一,虽然现在看来算法过于简单,但是在当年,这个算法绝对是谷歌网页排名的扛把子!
pagerank算法的原理很精辟!我们假设有一个无聊的人,开始网上冲浪,他没有任何浏览倾向,他会随机点击网页上的任何链接,一直点下去。那么,经过无数的这种场景,被点击的多的网页当然是谷歌认为更重要的网页。换句话说,被链接链出的次数越多,被点击的可能性越大,被权重越大的网页链接,那么自己的权重也越大。关于这个问题,我们看下面这个图片:
B被链接的最多,权重也最大。但是我们发现C只被B链接了,而E被很多页面链接了,反而C的权重更大,就是因为链接C的是权重为38.4的B,而链接E的都是一些权重很小的网页。
- 2、由网页关联到矩阵表示
我们举一个简单的案例,本文也会根据这个案例来计算pagerank值!
图中存在4个网页,他们的链接情况很明显,比如网页1链出到网页2和网页3,同时,网页3和网页4链接到网页1。关于这个场景,我们可以得到一个矩阵如下:
针对这个矩阵中的元素a[i][j](注:此处i和j代表从1开始编号),我们表达的意思是第j个网页链出到第i个网页。那么第一行的1/2也就是代表第4个网页有链接到第1个网页。1/2代表什么呢?我们将一个网页所拥有的可以链出的权重假设为“1”,那么如果一共链出2个网页,每一个则可以分配1/2,这就是1/2的由来!
- 3、pagerank值求解
此处初始化,每一个网页的权重都是1,完成页面的第一次权重分配,产生新的页面权重。过程如下:
至于矩阵和向量的乘法方法,此处不再介绍。选取矩阵的第一行和向量乘法来介绍。第一个网页的权重:0* 1+0* 1+1* 1+1/2*1 = 3/2,矩阵的第一行代表的是每一个页面对页面1的权重分配占各页面自身的比重,然后叉乘向量,则代表页面1得到的权重总和。
在此基础上,我们可以不停地左乘M,直到权重变化微小,即可停止。(注:关于这个操作的收敛性,此处不做证明)
算法局限性和解决方法
- 1、网页出现无链出情况
如果一个网页不存在任何链出的链接,那么我称之为“无链出”。回到我们提到的任意浏览页面的场景,到了无链出页面,操作只能停止了。并且,存在这种情况的话,我们如果还是按照上面的方法不断左乘M的话,会出现所有页面权重趋于0的情况。
比如,这种情况。网页2只有链入,没有链出。那么此时的M为:
我们发现M的第二列全是0,也就代表网页2对任何其他页面权重都没有贡献。针对这种情况,我们模拟一个循环50次的左乘M,结果如下:
不难看到,50次循环后,4个网页的权重基本都是0了。 - 2、网页存在自链
第二种异常出现在网页有自链的情况。有一些想做网页推广的人,将自己网页的链接加入到当前网页,打算这样提高网页排名,比如下面的网页2:
网页2不对其他网页做贡献,反而自己链出自己,这种情况下的转移矩阵就是这样:
我们可以看到第二列的1在第二行。出现这种情况的话,经过一定数量的左乘之后,结果是只有网页2权重不趋于0,其余页面权重都趋于0。我们同样做一个实验,如下:
很容易看到,50此迭代后,只有网页2的权重没有接近0。
关于算法局限性的改进
针对以上两种异常情形,科学家给出了一个很巧妙的解决方案,思想是这样的!还是那个人在无聊地浏览网页,但是他现在有了一些思想,他不但会随意点击网页的链接,他还可能随意在浏览器随意输入一个合法的网页地址,直接调转到其他网页。科学家定义这个继续点击下去的概率为阻尼系数d,也就是(1-d)的可能是不再继续点击下去了,而是自己输入网址跳转到其他网页。
官方对于d多采用0.85,我们相信权威!观察上面的方程,第一部分加入了系统d,然后给每一个页面的权重都加入了一个“(1-d)*1/网页总数”。解释第二部分,一个人不想继续点击链接了,他输入其他合法网址,输入的这个网址是网址集合中的任何一个,那么每一个的概率是1/网页总数,然后乘以概率0.15。那么为什么每一个页面加上的是0.15而不是0.15/4,拿网页1做说明,因为4个页面都有0.15的可能跳转到网页1,那么4个页面对页面1的贡献权重总数是0.15。
分布式实现pagerank
上面我们介绍了这个算法的提出背景和理论讲解,我们惊叹于这么牛叉的算法,原来思想这么简单。实际应用中,10亿网页组成10亿*10亿的大矩阵,肯定不是一个电脑就能计算的,分布式解决是必然的。
下面把我实现算法采用的环境先介绍一下:
运行环境:
Hadoop 2.6.0-cdh5.15.1
jdk1.8
工具:
maven3(Intellij IDEA自带的)
git
编辑器:
Intellij IDEA
介绍一下我的项目结构:
解决方案分三个模块,第一个模块会根据网页的链接情况计算出转移矩阵;第二个模块会根据转移矩阵和个页面的权重值(初始化权重都为1),不断计算新的权重,新权重和旧权重做比较,直到每一个页面权重变化小于0.5(这个值是我自己定义的),停止迭代;第三个模块完成结果的规范化,使结果权重和为1。
- 1、 计算转移矩阵
page.txt的内容如下:
我们需要实现的过程如下:
最右边的矩形中的内容解读:第一列为网页编号,第二列的内容用逗号分隔,我们拿第一行为代表。它代表页面1在初始化权重为1的情况下,对页面1、2、3、4的权重贡献。关键代码部分,我定义了一个类:PreDealer,其中包含两个内部类:PreMapper extends Mapper,PreReducer extends Reducer,这两个类分别实现上图的两个步骤。
package com.google.pagerank.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.URI;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Properties;
/**
* 此函数为数据的预处理,最终输出结果为概率矩阵
*/
public class PreDealer {
private static int pageNum;//试验页面的总数
private static float d;//阻尼系数
private static Job job;
private static Configuration configuration;
public PreDealer(int pageNum, float d, Configuration configuration)throws Exception{
this.pageNum = pageNum;
this.d = d;
this.configuration = configuration;
this.job = Job.getInstance(configuration);
}
public static class PreMapper extends Mapper<LongWritable, Text, Text, Text>{
/**
*
* @param key
* @param value 每一行的格式是:1,2,以逗号分隔,代表1到2有链接
* @param context 返回的数据格式是:1 2,代表一个链接的起止
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] values = value.toString().split(",");//逗号分隔一行数据
context.write(new Text(values[0]),new Text(values[1]));
}
}
public static class PreReducer extends Reducer<Text, Text, Text, Text>{
/**
*
* @param key key代表有发出链接的网页代号
* @param values 可以链接到的页面组成的串
* @param context 输出目标是以网页代号为key,可达页面的概率组成的以逗号分隔的串
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//首先,初始化一行float组成的数组,容量为页面总数
float[] outArray = new float[pageNum];
Arrays.fill(outArray, (1-d)/pageNum);//自由跳转产生的概率
int outNum = 0;//链出网页数
float[] outArrayTrue = new float[pageNum];//存放遍历的values中的链出的网页
Iterator<Text> itr = values.iterator();
while(itr.hasNext()){
int pageIndex = Integer.parseInt(itr.next().toString());
outArrayTrue[pageIndex-1] = 1;//对应编号设置为1
outNum ++;//链出总数加一
}
StringBuilder stringBuilder = new StringBuilder();
for(int i=0; i<outArrayTrue.length; i++){//拼接结果
stringBuilder.append(",").append((outArrayTrue[i]/outNum)*d + outArray[i]);//链出项加上随机量
}
context.write(key, new Text(stringBuilder.toString().substring(1)));
}
}
public static boolean runPre() throws Exception{
job.setJarByClass(PreDealer.class);
job.setMapperClass(PreMapper.class);
job.setReducerClass(PreReducer.class);
//设置自定义分区规则
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Properties properties = ParamUtils.getProperties();
FileSystem fileSystem = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)),
configuration,"hadoop");
Path path = new Path(properties.getProperty(Constants.MATRIX_OUTPUT_PATH) +
properties.getProperty(Constants.MATRIX_OUTPUT_FILE));
if(fileSystem.exists(path)){
fileSystem.delete(path, true);
}
FileInputFormat.setInputPaths(job, new Path(properties.getProperty(Constants.PAGE_INPUT_PATH)));
FileOutputFormat.setOutputPath(job, path);
boolean mark = job.waitForCompletion(true);
return mark;
//System.exit(mark ? 0 : 1);
}
}
- 2、循环计算pagerank
在第一步我们得到了转移矩阵,那么我们就要根据实际的权重计算每个页面重分配后的权重。我最终得到的结果是:
此部分的数据流程如下:
代码部分,我定义了一个类Cycler,完成整个循环计算过程的业务,其中包括两个内部类,CycleMapper extends Mapper,CycleReducer extends Reducer,分别完成上图的map和reduce两个阶段。
下面的代码中,循环计算部分的逻辑是:每次计算出一个新的权重分配,就会和旧的权重分配进行对比,如果每一个权重差值都小于0.5,才退出迭代,否则的话,使用新的迭代结果替换旧的权重集。
package com.google.pagerank.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.*;
import java.URI;
import java.util.*;
/**
* 此类代表循环体
*/
public class Cycler {
private static int pageNum;//试验页面的总数
private static Configuration configuration;
public Cycler(int pageNum, Configuration configuration){
this.pageNum = pageNum;
this.configuration = configuration;
}
public static class CycleMapper extends Mapper<LongWritable, Text, Text, Text>{
private String filePath = "";
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
filePath = fileSplit.getPath().toString();
}
/**
*
* @param key
* @param value 概率矩阵的一行、或中间计算结果的PageRank(或初始值)
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if(filePath.contains("out")){//概率矩阵
//value的形式如:1 0.01,0.27,0.27,0.27..
String[] values = value.toString().split("\t");
String[] vvs = values[1].split(",");
for(int i=1; i<=vvs.length; i++){
String k = i + "";
String v = "M:" + values[0] + "," + vvs[i-1];//代表第values[0]个页面对第k个页面的权重贡献
context.write(new Text(k), new Text(v));
}
}else if(filePath.contains("pr")){//pagerank值
//value的形式:1 1,第一个页面的pagerank为1
String[] values = value.toString().split("\t");
for(int i=1; i<=pageNum; i++){
context.write(new Text(i+""), new Text("P:" +values[0] + "," +values[1]));
}
}
}
}
public static class CycleReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator<Text> iterator = values.iterator();
Map<String, String> mapM = new HashMap<String, String>();
Map<String, String> mapP = new HashMap<String, String>();
while (iterator.hasNext()){
String value = iterator.next().toString();
String[] valuess = value.split(":");
String[] vvs = valuess[1].split(",");//若为M,vvs[0]代表对哪个页面做贡献,vvs[1]代表贡献值;
//若为P,vvs[0]代表哪个页面做贡献,vvs[1]代表贡献值;
if(value.startsWith("M")){//概率贡献
mapM.put(vvs[0], vvs[1]);
}else if(value.startsWith("P")){//上一阶段的Pangerank
mapP.put(vvs[0], vvs[1]);
}
}
float sum = 0f;
for(int i=1; i<=pageNum; i++){
float m = Float.parseFloat(mapM.get(i+""));
float p = Float.parseFloat(mapP.get(i+""));
sum += m * p;
}
context.write(key, new Text(sum + ""));
}
}
public static boolean runCycler() throws Exception{
//不停进行循环,知道计算两次计算结果差值课忽略
boolean mark = false;
while(true) {
Job job = Job.getInstance(configuration);
job.setJarByClass(Cycler.class);
job.setMapperClass(CycleMapper.class);
job.setReducerClass(CycleReducer.class);
//设置自定义分区规则
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Properties properties = ParamUtils.getProperties();
FileSystem fileSystem = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)),
configuration, "hadoop");
Path path = new Path(properties.getProperty(Constants.PAGERANK_OUTPUT_PATH) +
properties.getProperty(Constants.PAGERANK_OUTPUT_FILE));
if (fileSystem.exists(path)) {
fileSystem.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
FileInputFormat.setInputPaths(job, new Path(properties.getProperty(Constants.PAGERANK_INPUT_PATH)),
new Path(properties.getProperty(Constants.MATRIX_INPUT_PATH)));
mark = job.waitForCompletion(true);
//System.exit(mark ? 0 : 1);
//读取 /pagerank/pagerank/pagerank.out 和 /pagerank/pr/pagerank.txt,对比每个页面pagerank差值,如果都小于0.01
// 则停止计算;否则,使用/pagerank/pagerank/pagerank.out替换/pagerank/pr/pagerank.txt
System.out.println("--------------------"+mark);
boolean stop = false;
while (true) {
if (mark) {
FSDataInputStream outFile = fileSystem.open(new Path(properties.getProperty(Constants.PAGERANK_OUTPUT_PATH)
+properties.getProperty(Constants.PAGERANK_OUTPUT_FILE)+"/part-r-00000"));
FSDataInputStream prFile = fileSystem.open(new Path(properties.getProperty(Constants.PAGERANK_INPUT_PATH)));
BufferedReader readerOut = new BufferedReader(new InputStreamReader(outFile));
BufferedReader readerPr = new BufferedReader(new InputStreamReader(prFile));
Map<String, String> mapM = new HashMap<String, String>();//存储M文件的内容映射
Map<String, String> mapP = new HashMap<String, String>();//存储pr文件的内容映射
String lineOut = "";
String linePr = "";
while ((lineOut = readerOut.readLine()) != null) {
String[] outS = lineOut.split("\t");
mapM.put(outS[0], outS[1]);
}
while ((linePr = readerPr.readLine()) != null) {
String[] outP = linePr.split("\t");
mapP.put(outP[0], outP[1]);
}
boolean tag = true;
for (int i = 1; i <= pageNum; i++) {
if (Math.abs(Float.parseFloat(mapM.get(i+"")) - Float.parseFloat(mapP.get(i+""))) > 0.1) {
tag = false;
break;
}
}
if (tag) {//如果不存在差值大于0.01的,那么结束总循环
stop = true;
break;
} else {//替换文件
FSDataOutputStream writeFile = fileSystem.create(new Path(properties.getProperty(Constants.PAGERANK_INPUT_PATH)));//如果文件存在,直接覆盖
Set<String> keySet = mapM.keySet();
Iterator<String> keyItr = keySet.iterator();//pr集的key
while (keyItr.hasNext()) {
String key = keyItr.next();
String value = mapM.get(key);
System.out.println("key:"+key+";value:"+value);
writeFile.write((key + "\t" + value + "\n").getBytes());
}
writeFile.close();
break;
}
}
}
if(stop)
break;
}
return mark;
}
}
- 3、规范化计算结果
在此部分我们需要实现对第二步计算的权重集的规范化,使总和为1。数据流程如下:
map阶段,key全部设置为Text(“1”),就是为了在reduce阶段全都分发到一个节点进行计算。
package com.google.pagerank.example;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.URI;
import java.util.*;
/**
* 此类用于对循环计算的PageRank进行规范化,是他们和为1
*/
public class Formatter {
private static Job job;
private static Configuration configuration;
public Formatter(Configuration configuration) throws Exception{
this.configuration = configuration;
this.job = Job.getInstance(configuration);
}
public static class FormatMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//System.out.println(value.toString());
context.write(new Text("1"), value);
}
}
public static class FormatReducer extends Reducer<Text, Text, Text, Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Iterator<Text> itr = values.iterator();
float sum = 0;
Map<String,String> mapV = new HashMap<String,String>();
while (itr.hasNext()){
String[] valuess = itr.next().toString().split("\t");
mapV.put(valuess[0], valuess[1]);
sum += Float.parseFloat(valuess[1]);
}
Set<String> setK = mapV.keySet();
Iterator<String> itr2 = setK.iterator();
while(itr2.hasNext()){
String k = itr2.next();
String v = mapV.get(k);
context.write(new Text(k), new Text(Float.parseFloat(v)/sum+""));
}
}
}
public static boolean runFormat() throws Exception{
job.setJarByClass(Formatter.class);
job.setMapperClass(FormatMapper.class);
job.setReducerClass(FormatReducer.class);
//设置自定义分区规则
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
Properties properties = ParamUtils.getProperties();
FileSystem fileSystem = FileSystem.get(new URI(properties.getProperty(Constants.HDFS_URI)),
configuration,"hadoop");
Path path = new Path(properties.getProperty(Constants.FORMAT_OUTPUT_PATH) +
properties.getProperty(Constants.FORMAT_OUTPUT_FILE));
if(fileSystem.exists(path)){
fileSystem.delete(path, true);
}
FileInputFormat.setInputPaths(job, new Path(properties.getProperty(Constants.FORMAT_INPUT_PATH)));
FileOutputFormat.setOutputPath(job, path);
boolean mark = job.waitForCompletion(true);
return mark;
//System.exit(mark ? 0 : 1);
}
}
- 4、关于代码的补充
除了上面三个主要部分,具体的驱动程序如下:
package com.google.pagerank.example;
import org.apache.hadoop.conf.Configuration;
/**
* 此类为整个项目的入口,完成参数设置和程序启动
*/
public class Driver {
public static void main(String[] args) throws Exception {
//初始化windows环境下运行hadoop实例环境
Configuration configuration = new Configuration();
configuration.set("fs.defaultFS","hdfs://192.168.43.26:8020");
System.setProperty("HADOOP_USER_NAME","hadoop");
System.setProperty("hadoop.home.dir", "E:\\BaiduNetdiskDownload\\新建文件夹\\hadoop-2.6.0-cdh5.15.1");
//Job job = Job.getInstance(configuration);
//PreDealer preDealer = new PreDealer(4, 0.85f, configuration);
//preDealer.runPre();
//Cycler cycler = new Cycler(4, configuration);
//cycler.runCycler();
Formatter formatter = new Formatter(configuration);
formatter.runFormat();
}
}
关于驱动类部分,针对三个模块的执行,我是按照分别注释两个,按顺序运行一个执行的,因为mapreduce任务对于我们的调用函数是异步的,并且很复杂,此模块可以引入任务调度功能,此处不再扩展。
同时,此项目的常量定义如下:
#数据源,存储了页面直接的连接关系
PAGE_INPUT_PATH = /pagerank/page.txt
#预处理得到概率矩阵的存放路径
MATRIX_OUTPUT_PATH = /pagerank/matrix/
MATRIX_OUTPUT_FILE = matrix.out
#循环体中使用的概率矩阵,也就是预处理处的输出
MATRIX_INPUT_PATH = /pagerank/matrix/matrix.out/part-r-00000
#预设值的pagerank值
PAGERANK_INPUT_PATH = /pagerank/pr/pagerank.txt
#每次循环体的输出
PAGERANK_OUTPUT_PATH = /pagerank/pagerank/
PAGERANK_OUTPUT_FILE = pagerank.out
#规范化的输入文件
FORMAT_INPUT_PATH = /pagerank/pagerank/pagerank.out/part-r-00000
#规范化输出结果
FORMAT_OUTPUT_PATH = /pagerank/format/
FORMAT_OUTPUT_FILE = pagerank_format.out
HDFS_URI = hdfs://192.168.43.26:8020
PREDEALER_CLASS = com.google.pagerank.example.PreDealer
CYCLER_CLASS = com.google.pagerank.example.Cycler
FORMATTER_CLASS = com.google.pagerank.example.Formatter
一个常量工具类和常量名称类如下:
package com.google.pagerank.example;
import java.io.IOException;
import java.util.Properties;
public class ParamUtils {
private static Properties properties = new Properties();
static {
try {
properties.load(ParamUtils.class.getClassLoader().getResourceAsStream("pagerank.properties"));
} catch (IOException e) {
e.printStackTrace();
}
}
public static Properties getProperties(){
return properties;
}
}
package com.google.pagerank.example;
public class Constants {
public static String PAGE_INPUT_PATH = "PAGE_INPUT_PATH";
public static String MATRIX_OUTPUT_PATH = "MATRIX_OUTPUT_PATH";
public static String MATRIX_OUTPUT_FILE = "MATRIX_OUTPUT_FILE";
public static String MATRIX_INPUT_PATH = "MATRIX_INPUT_PATH";
public static String PAGERANK_INPUT_PATH = "PAGERANK_INPUT_PATH";
public static String PAGERANK_OUTPUT_PATH = "PAGERANK_OUTPUT_PATH";
public static String PAGERANK_OUTPUT_FILE = "PAGERANK_OUTPUT_FILE";
public static String FORMAT_INPUT_PATH = "FORMAT_INPUT_PATH";
public static String FORMAT_OUTPUT_PATH = "FORMAT_OUTPUT_PATH";
public static String FORMAT_OUTPUT_FILE = "FORMAT_OUTPUT_FILE";
public static String HDFS_URI = "HDFS_URI";
public static String CYCLER_CLASS = "CYCLER_CLASS";
public static String PREDEALER_CLASS = "PREDEALER_CLASS";
public static String FORMATTER_CLASS = "FORMATTER_CLASS";
}
至此,此项目介绍完毕,代码我自己都运行过,没有问题。希望大家一起交流!(注:github就不分享了,因为我所有的项目都在一个大项目中,不方便全部分享,请见谅!但是,需要具体代码,可以联系我!)
更多推荐
PageRank介绍与分布式实现
发布评论