标签:对比 from rate nta 包括 return asc 计划 字段
1、Transitive Closure是翻译闭包传递?我觉得直译不准确,意译应该是传递特性直至特性关闭,也符合本例中传递路径,寻找路径可达,直到可达路径不存在(即关闭)。
2、代码很简单,里面有些概念直指核心原理,详细看注释。
/**
* @Author: xu.dm
* @Date: 2019/7/3 11:41
* @Version: 1.0
* @Description: 传递闭包算法,本例中就是根据成对路径,查找和生成新的可达路径
* 例如:1-2,2-4这两对数据,可以得出新的可达路径1-4。
*
* 迭代算法步骤:
* 1、获取成对数据集edges,里面包括路径对,比如 1->2,2->4,2->5等,如果是无向边,还可以反转数据集union之前的数据。本例按有向边处理
* 2、生成迭代头paths可迭代数据集
* 3、用paths和原始数据集edges做join连接,找出头尾相连的数据nextPaths,即类似1->2,2->4这种,然后生成新的路径1->4。
* 4、新的路径集nextPaths和迭代头数据集paths进行并集操作,即union操作,生成新的nextPaths,这个时候它包含了新旧两种数据
* 在这里总是nextPaths>=paths
* 5、去重操作,第一次迭代不会重复,但是第二次迭代开始就会有重复数据,通过groupBy全字段,去分组第一条即可达到去重效果
* 6、以上核心迭代体完成,后面需要形成迭代闭环,确定迭代退出条件
* 7、退出原理:每次迭代完成后,需要检查是否新的路径产生,如果没有则表示迭代可以结束
* 8、可达寻路步骤完成后,通过对比nextPaths和paths,如果nextPaths>paths,表示有新路径生成,需要继续迭代,直到nextPaths=paths
* 9、这里有一个迭代重要的概念,paths和nextPaths是通过迭代闭环不断更新的
* 10、本例中迭代头和迭代尾的数据流向:paths->nextPaths->paths.
* 11、本例通过bulk迭代方式实现了delta迭代的效果
**/
public class TransitiveClosureNaive {
public static void main(String args[]) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
final int maxIterations = params.getInt("iterations", 10);
DataSet> edges;
if(params.has("edges")){
edges = env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class, Long.class);
}else {
System.out.println("Executing TransitiveClosureNaive example with default edges data set.");
System.out.println("Use --edges to specify file input.");
edges = ConnectedComponentsData.getDefaultEdgeDataSet(env);
}
IterativeDataSet> paths = edges.iterate(maxIterations);
DataSet> nextPaths = paths
.join(edges)
.where(1)
.equalTo(0)
.with(new JoinFunction, Tuple2, Tuple2>() {
/**
left: Path (z,x) - 通过z可达x
right: Edge (x,y) - 通过x可达y
out: Path (z,y) - 最终输出z可达y
*/
@Override
public Tuple2 join(Tuple2 left, Tuple2 right) throws Exception {
return new Tuple2(left.f0,right.f1);
}
})
//类似withForwardedFieldsFirst这种无损转发语义声明,是可选项,有助于提高flink优化器生成更高效的执行计划
//转发第一个输入Tuple2中的第一个字段,转发第二个输入Tuple2中的第二个字段
.withForwardedFieldsFirst("0").withForwardedFieldsSecond("1")
//合并原有的路径
.union(paths)
//这里的groupBy两个fields是打算给reduceGroup去重使用
.groupBy(0,1)
.reduceGroup(new GroupReduceFunction, Tuple2>() {
@Override
public void reduce(Iterable> values, Collector> out) throws Exception {
out.collect(values.iterator().next());
}
})
.withForwardedFields("0;1");
//对比paths以及新生成的nextPaths,获取nextPaths中比paths多的路径
//从上面的算子可以得知,nextPaths总是大于或等于paths
DataSet> newPaths = paths
.coGroup(nextPaths)
.where(0).equalTo(0)
.with(new CoGroupFunction, Tuple2, Tuple2>() {
Set> prevSet = new HashSet();
@Override
public void coGroup(Iterable> prevPaths, Iterable> nextPaths, Collector> out) throws Exception {
for(Tuple2 prev:prevPaths){
prevSet.add(prev);
}
//检查有没有新的数据产生,如果有就继续迭代,否则迭代终止
for(Tuple2 next:nextPaths){
if(!prevSet.contains(next)){
out.collect(next);
}
}
}
}).withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
//迭代尾,在这里形成闭环,nextPaths是反馈通道,nextPaths数据集被重新传递到迭代头paths里,然后通过迭代算子不断执行。
//newPaths为空或者迭代达到最大次数,迭代结束。newPaths这里表示是否有新的路径。
//数据集迭代环:paths->nextPaths->paths
DataSet> transitiveClosure = paths.closeWith(nextPaths, newPaths);
// emit result
if (params.has("output")) {
transitiveClosure.writeAsCsv(params.get("output"), "\n", " ");
// execute program explicitly, because file sinks are lazy
env.execute("Transitive Closure Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
transitiveClosure.print();
}
}
}
3、原始数据
public class ConnectedComponentsData {
public static final long[] VERTICES = new long[] {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
public static DataSet getDefaultVertexDataSet(ExecutionEnvironment env) {
List verticesList = new LinkedList();
for (long vertexId : VERTICES) {
verticesList.add(vertexId);
}
return env.fromCollection(verticesList);
}
public static final Object[][] EDGES = new Object[][] {
new Object[]{1L, 2L},
new Object[]{2L, 3L},
new Object[]{2L, 4L},
new Object[]{3L, 5L},
new Object[]{6L, 7L},
new Object[]{8L, 9L},
new Object[]{8L, 10L},
new Object[]{5L, 11L},
new Object[]{11L, 12L},
new Object[]{10L, 13L},
new Object[]{9L, 14L},
new Object[]{13L, 14L},
new Object[]{1L, 15L},
new Object[]{16L, 1L}
};
public static DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List> edgeList = new LinkedList>();
for (Object[] edge : EDGES) {
edgeList.add(new Tuple2((Long) edge[0], (Long) edge[1]));
}
return env.fromCollection(edgeList);
}
}
flink Transitive Closure算法,实现寻找新的可达路径
标签:对比 from rate nta 包括 return asc 计划 字段
原文地址:https://www.cnblogs.com/asker009/p/11131069.html