标签:stdout mtu 反转 next turn convert 路径 basic 三元组
* @Author: xu.dm
* @Date: 2019/7/4 21:31
* @Description: 三角枚举算法
* 三角枚举是在图(数据结构)中找到紧密连接的部分的预处理步骤。三角形由三条边连接,三条边相互连接。
* 该算法的工作原理如下:它将所有共享一个共同顶点的边(edge)分组,并构建三元组,即由两条边连接的顶点三元组。
* 最后,通过join操作连接原数据和三元组,过滤所有三元组,去除不存在第三条边(闭合三角形)的三元组。
* 输入文件是纯文本文件,必须格式如下:
* 边缘表示为顶点ID的对,由空格字符分隔。边线由换行符分隔。
* 例如,"1 2\n2 12\n1 12\n42 63"给出包括三角形的四个(无向)边(1) - (2),(2) - (12),(1) - (12)和(42) - (63)
* (1)
* / * (2)-(12)
* 用法:EnumTriangleBasic --edges --output
* 如果未提供参数,则使用{@link EnumTrianglesData}中的默认数据运行程序。
public class EnumTriangles {
public static void main(String args[]) throws Exception{
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
// read input data
DataSet edges;
if (params.has("edges")) {
edges = env.readCsvFile(params.get("edges"))
.fieldDelimiter(" ")
.includeFields(true, true)
.types(Integer.class, Integer.class)
.map(new TupleEdgeConverter());
} else {
System.out.println("Executing EnumTriangles example with default edges data set.");
System.out.println("Use --edges to specify file input.");
edges = EnumTrianglesData.getDefaultEdgeDataSet(env);
// project edges by vertex id
// 对Edge里V1,V2排序,让小的在前
DataSet edgesById = edges
.map(new EdgeByIdProjector());
DataSet triangles = edgesById
// build triads
.sortGroup(EnumTrianglesDataTypes.Edge.V2, Order.ASCENDING)
.reduceGroup(new TriadBuilder())
.where(EnumTrianglesDataTypes.Triad.V2, EnumTrianglesDataTypes.Triad.V3)
.equalTo(EnumTrianglesDataTypes.Edge.V1, EnumTrianglesDataTypes.Edge.V2)
// filter triads
.with(new TriadFilter());
// emit result
if (params.has("output")) {
triangles.writeAsCsv(params.get("output"), "\n", ",");
// execute program
env.execute("Basic Triangle Enumeration Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
public static class TupleEdgeConverter implements MapFunction, EnumTrianglesDataTypes.Edge>{
private final EnumTrianglesDataTypes.Edge outEdge = new EnumTrianglesDataTypes.Edge();
public EnumTrianglesDataTypes.Edge map(Tuple2 value) throws Exception {
return outEdge;
* Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
* 转换一对边,如果first大于second,则互相交换
* */
private static class EdgeByIdProjector implements MapFunction{
public EnumTrianglesDataTypes.Edge map(EnumTrianglesDataTypes.Edge value) throws Exception {
//flip vertices 反转顶点
return value;
* Builds triads (triples of vertices) from pairs of edges that share a vertex.
* The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
* 构造triads(三位一体结构),即经过分组排序后,拥有共同顶点的edge,按顺序(分组后顶点需要升序)组合成三体
* 构成Triad的第一个顶点是分组id,也是分组里所有对子(edge)共享的id,第二和第三个顶点是分组的对子(edge)里的第二个顶点,edge他们是按升序排列。
* 分组结构类似如下,s1就是共享的顶点,f1到f3是按升序排列的对子(s1-f1),(s1-f2),(f1-f3)。
* f1
* /
* s1 一 f2
* * f3
private static class TriadBuilder
implements GroupReduceFunction{
private final List vertices = new ArrayList();
private final EnumTrianglesDataTypes.Triad outTriad = new EnumTrianglesDataTypes.Triad();
public void reduce(Iterable values, Collector out) throws Exception {
final Iterator edges = values.iterator();
// clear vertex list
// read first edge
EnumTrianglesDataTypes.Edge firstEdge = edges.next();
// build and emit triads
Integer higherVertexId = edges.next().getSecondVertex();
//combine vertex with all previously read vertices
for(Integer lowVertexId:vertices){
* Filters triads (three vertices connected by two edges) without a closing third edge.
* 过滤没有闭合边的三体
* */
private static class TriadFilter
implements JoinFunction{
public EnumTrianglesDataTypes.Triad join(EnumTrianglesDataTypes.Triad first, EnumTrianglesDataTypes.Edge second) throws Exception {
return first;
public class EnumTrianglesDataTypes {
* A POJO storing two vertex IDs.
public static class Edge extends Tuple2 {
private static final long serialVersionUID = 1L;
public static final int V1 = 0;
public static final int V2 = 1;
public Edge() {}
public Edge(final Integer v1, final Integer v2) {
public Integer getFirstVertex() {
return this.getField(V1);
public Integer getSecondVertex() {
return this.getField(V2);
public void setFirstVertex(final Integer vertex1) {
this.setField(vertex1, V1);
public void setSecondVertex(final Integer vertex2) {
this.setField(vertex2, V2);
public void copyVerticesFromTuple2(Tuple2 t) {
public void copyVerticesFromEdgeWithDegrees(EdgeWithDegrees ewd) {
public void flipVertices() {
Integer tmp = this.getFirstVertex();
* A POJO storing three vertex IDs.
public static class Triad extends Tuple3 {
private static final long serialVersionUID = 1L;
public static final int V1 = 0;
public static final int V2 = 1;
public static final int V3 = 2;
public Triad() {}
public void setFirstVertex(final Integer vertex1) {
this.setField(vertex1, V1);
public void setSecondVertex(final Integer vertex2) {
this.setField(vertex2, V2);
public void setThirdVertex(final Integer vertex3) {
this.setField(vertex3, V3);
* A POJO storing two vertex IDs with degree.
public static class EdgeWithDegrees extends Tuple4 {
private static final long serialVersionUID = 1L;
public static final int V1 = 0;
public static final int V2 = 1;
public static final int D1 = 2;
public static final int D2 = 3;
public EdgeWithDegrees() { }
public Integer getFirstVertex() {
return this.getField(V1);
public Integer getSecondVertex() {
return this.getField(V2);
public Integer getFirstDegree() {
return this.getField(D1);
public Integer getSecondDegree() {
return this.getField(D2);
public void setFirstVertex(final Integer vertex1) {
this.setField(vertex1, V1);
public void setSecondVertex(final Integer vertex2) {
this.setField(vertex2, V2);
public void setFirstDegree(final Integer degree1) {
this.setField(degree1, D1);
public void setSecondDegree(final Integer degree2) {
this.setField(degree2, D2);
public void copyFrom(final EdgeWithDegrees edge) {
public class EnumTrianglesData {
public static final Object[][] EDGES = {
{1, 2},
{1, 3},
{1, 4},
{1, 5},
{2, 3},
{2, 5},
{3, 4},
{3, 5},
{3, 7},
{3, 8},
{5, 6},
{7, 8}
public static DataSet getDefaultEdgeDataSet(ExecutionEnvironment env) {
List edges = new ArrayList();
for (Object[] e : EDGES) {
edges.add(new EnumTrianglesDataTypes.Edge((Integer) e[0], (Integer) e[1]));
return env.fromCollection(edges);
flink 实现三角枚举EnumTriangles算法详解
标签:stdout mtu 反转 next turn convert 路径 basic 三元组