标签:迭代 object 比较 cal over min filter dataset gic
连通分量(connected component):无向图中的极大连通子图(maximal connected subgraph)称为原图的连通分量。
2、flink 实现连通分量算法,本例中将分量值小的数据传递到其他连接点,通过增量迭代实现。
public class ConnectedComponentsData {
public static final long[] VERTICES = new long[] {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
public static DataSet getDefaultVertexDataSet(ExecutionEnvironment env) {
List verticesList = new LinkedList();
for (long vertexId : VERTICES) {
return env.fromCollection(verticesList);
public static final Object[][] EDGES = new Object[][] {
new Object[]{1L, 2L},
new Object[]{2L, 3L},
new Object[]{2L, 4L},
new Object[]{3L, 5L},
new Object[]{6L, 7L},
new Object[]{8L, 9L},
new Object[]{8L, 10L},
new Object[]{5L, 11L},
new Object[]{11L, 12L},
new Object[]{10L, 13L},
new Object[]{9L, 14L},
new Object[]{13L, 14L},
new Object[]{1L, 15L},
new Object[]{16L, 1L}
public static DataSet> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List> edgeList = new LinkedList>();
for (Object[] edge : EDGES) {
edgeList.add(new Tuple2((Long) edge[0], (Long) edge[1]));
return env.fromCollection(edgeList);
2.2 算法实现,代码里有详细注释
* @Description:
* 使用delta迭代实现连通分量算法。
* 最初,算法为每个顶点分配唯一的ID。在每个步骤中,顶点选择其自身ID及其邻居ID的最小值作为其新ID,并告知其邻居其新ID。算法完成后,同一组件中的所有顶点将具有相同的ID。
* 组件ID未更改的顶点不需要在下一步中传播其信息。因此,该算法可通过delta迭代轻松表达。我们在这里将解决方案集建模为具有当前组件ID的顶点,并将工作集设置为更改的顶点。因为我们最初看到所有顶点都已更改,所以初始工作集和初始解决方案集是相同的。
* 此外,解决方案集的增量也是下一个工作集。
* 输入文件是纯文本文件,必须格式如下:
* 顶点表示为ID并用换行符分隔。
* 例如,"1\n2\n12\n42\n63"给出五个顶点(1),(2),(12),(42)和(63)。
* 边缘表示为顶点ID的对,由空格字符分隔。边线由换行符分隔。
* 例如,"1 2\n2 12\n1 12\n42 63"给出四个(无向)边缘(1) - (2),(2) - (12),(1) - (12)和(42) - (63)。
* 用法:ConnectedComponents --vertices --edges --output --iterations
* 如果未提供参数,则使用{@link ConnectedComponentsData}中的默认数据和10次迭代运行程序。
public class ConnectedComponents {
private static DataSet getVertexDataSet(ParameterTool params, ExecutionEnvironment env){
return env.readCsvFile(params.get("vertices")).types(Long.class)
.map(new MapFunction, Long>() {
public Long map(Tuple1 value) throws Exception {
return value.f0;
System.out.println("Executing Connected Components example with default vertices data set.");
System.out.println("Use --vertices to specify file input.");
return ConnectedComponentsData.getDefaultVertexDataSet(env);
private static DataSet> getEdgeDataSet(ParameterTool params,ExecutionEnvironment env){
return env.readCsvFile(params.get("edges")).fieldDelimiter(" ").types(Long.class,Long.class);
}else {
System.out.println("Executing Connected Components example with default edges data set.");
System.out.println("Use --edges to specify file input.");
return ConnectedComponentsData.getDefaultEdgeDataSet(env);
public static void main(String args[]) throws Exception{
final ParameterTool params = ParameterTool.fromArgs(args);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
final int maxIterations = params.getInt("iterations",10);
//make parameters available in the web interface
// read vertex and edge data
DataSet vertices = getVertexDataSet(params, env);
DataSet> edges = getEdgeDataSet(params, env).flatMap(new UndirectEdge());
// assign the initial components (equal to the vertex id)
DataSet> verticesWithInitialId =
vertices.map(new DuplicateValue());
// open a delta iteration
DeltaIteration,Tuple2> iteration = verticesWithInitialId
// apply the step logic: join with the edges, select the minimum neighbor, update if the component of the candidate is smaller
DataSet> changes = iteration.getWorkset().join(edges)
.with(new NeighborWithComponentIDJoin())
.groupBy(0).aggregate(Aggregations.MIN, 1)
.with(new ComponentIdFilter());
// close the delta iteration (delta and new workset are identical)
DataSet> result = iteration.closeWith(changes, changes);
// emit result
if (params.has("output")) {
result.writeAsCsv(params.get("output"), "\n", " ");
// execute program
env.execute("Connected Components Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
* Undirected edges by emitting for each input edge the input edges itself and an inverted version.
* 因为是无向连通图,反转边元组edges是为了将所有顶点(vertex)都放在Tuple2的第一个元素中,
* 这样合并原来的元组和反转的元组后,生成的新元组的第一个元素将包括所有的顶点vertex,下一步就可以用join进行关联
public static final class UndirectEdge implements FlatMapFunction,Tuple2>{
Tuple2 invertedEdge = new Tuple2();
public void flatMap(Tuple2 value, Collector> out) throws Exception {
invertedEdge.f0 = value.f1;
invertedEdge.f1 = value.f0;
* Function that turns a value into a 2-tuple where both fields are that value.
* 将每个点(vertex)映射成(id,id),表示用id值初始化顶点(vertex)的Component-ID (分量ID)
* 实际上是(Vertex-ID, Component-ID)对,这个Component-ID就是需要比较以及传播的值
public static final class DuplicateValue implements MapFunction>{
public Tuple2 map(T value) throws Exception {
return new Tuple2(value,value);
* UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that
* a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function
* produces a (Target-vertex-ID, Component-ID) pair.
* 通过(Vertex-ID, Component-ID)顶点对与(Source-Vertex-ID, Target-VertexID)边对的连接,
* 得到(Target-vertex-ID, Component-ID)对,这个对是相连顶点的新的分量值,
* 下一步这个相连顶点分量值将与原来的自己的分量值比较大小,并保留小的那一对,通过增量迭代传播。
* 这个地方有点烧脑。这个步骤主要目的就是传播分量。
public static final class NeighborWithComponentIDJoin implements JoinFunction, Tuple2, Tuple2> {
public Tuple2 join(Tuple2 vertexWithComponent, Tuple2 edge) {
return new Tuple2(edge.f1, vertexWithComponent.f1);
* Emit the candidate (Vertex-ID, Component-ID) pair if and only if the
* candidate component ID is less than the vertex‘s current component ID.
* 从上一步的(Target-vertex-ID, Component-ID)对与SolutionSet里的原始数据进行比对,保留小的,
* 增量迭代部分由系统框架实现了。
public static final class ComponentIdFilter implements FlatJoinFunction, Tuple2, Tuple2> {
public void join(Tuple2 candidate, Tuple2 old, Collector> out) {
if (candidate.f1
flink 实现ConnectedComponents 连通分量,增量迭代算法(Delta Iteration)实现详解
