diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index 869caa3..fa8a619 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -148,6 +148,17 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] /** + * Repartitions the edges in the graph according to `partitionStrategy`. + * @param partitionStrategy the partitioning strategy to use when partitioning the edges + * in the graph + * @param numPartitions the number of edge partitions in the new graph. + * @param degreeCutoff decides whether to chose vertex cut or edge cut + * @return + */ + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int, degreeCutoff: Int) + : Graph[VD, ED] + + /** * Transforms each vertex attribute in the graph using the map function. * * @note The new graph has the same structure. As a consequence the underlying index structures diff --git a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala index 70a7592..8f3c2b0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/PartitionStrategy.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.graphx +import scala.reflect.ClassTag +import org.apache.spark.rdd.RDD /** * Represents the way edges are assigned to edge partitions based on their source and destination @@ -24,6 +26,27 @@ package org.apache.spark.graphx trait PartitionStrategy extends Serializable { /** Returns the partition number for a given edge. */ def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID + def getPartitionedEdgeRDD[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + numPartitions: PartitionID, degreeCutoff: Int): RDD[(PartitionID, (VertexId, VertexId, ED))] +} + +/** + * Represents Default Partition Strategy + * + */ +trait DefaultPartitionStrategy extends PartitionStrategy { + override def getPartitionedEdgeRDD[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], + numPartitions: PartitionID, degreeCutoff: Int): RDD[(PartitionID, (VertexId, VertexId, ED))] = { graph.edges.map { e => + val part: PartitionID = getPartition(e.srcId, e.dstId, numPartitions) + (part, (e.srcId, e.dstId, e.attr)) + } + } + + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { + // Some random implementation + val mixingPrime: VertexId = 1125899906842597L + (math.abs((src + dst) * mixingPrime) % numParts).toInt + } } /** @@ -71,7 +94,7 @@ object PartitionStrategy { * method where the last column can have a different number of rows than the others while still * maintaining the same size per block. */ - case object EdgePartition2D extends PartitionStrategy { + case object EdgePartition2D extends DefaultPartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: VertexId = 1125899906842597L @@ -98,7 +121,7 @@ object PartitionStrategy { * Assigns edges to partitions using only the source vertex ID, colocating edges with the same * source. */ - case object EdgePartition1D extends PartitionStrategy { + case object EdgePartition1D extends DefaultPartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val mixingPrime: VertexId = 1125899906842597L (math.abs(src * mixingPrime) % numParts).toInt @@ -110,7 +133,7 @@ object PartitionStrategy { * Assigns edges to partitions by hashing the source and destination vertex IDs, resulting in a * random vertex cut that colocates all same-direction edges between two vertices. */ - case object RandomVertexCut extends PartitionStrategy { + case object RandomVertexCut extends DefaultPartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { math.abs((src, dst).hashCode()) % numParts } @@ -122,7 +145,7 @@ object PartitionStrategy { * direction, resulting in a random vertex cut that colocates all edges between two vertices, * regardless of direction. */ - case object CanonicalRandomVertexCut extends PartitionStrategy { + case object CanonicalRandomVertexCut extends DefaultPartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { if (src < dst) { math.abs((src, dst).hashCode()) % numParts @@ -132,12 +155,183 @@ object PartitionStrategy { } } + case object EdgePartition1DByDst extends DefaultPartitionStrategy { + override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { + val mixingPrime: VertexId = 1125899906842597L + (math.abs(dst * mixingPrime) % numParts).toInt + } + } + + case object HybridSrc extends DefaultPartitionStrategy { + override def getPartitionedEdgeRDD[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numPartitions: PartitionID, degreeCutoff: Int): RDD[( + PartitionID, (VertexId, VertexId, ED))] = { + val out_degrees = graph.edges.map(e => (e.srcId, (e.dstId, e.attr))). + join(graph.outDegrees.map(e => (e._1, e._2))) + out_degrees.map { e => + var part: PartitionID = 0 + val srcId = e._1 + val dstId = e._2._1._1 + val attr = e._2._1._2 + val Degree = e._2._2 + val mixingPrime: VertexId = 1125899906842597L + if (Degree > degreeCutoff) { + part = ((math.abs(dstId) * mixingPrime) % numPartitions).toInt + } else { + part = ((math.abs(srcId) * mixingPrime) % numPartitions).toInt + } + (part, (srcId, dstId, attr)) + } + } + } + + case object HybridDst extends DefaultPartitionStrategy { + override def getPartitionedEdgeRDD[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numPartitions: PartitionID, degreeCutoff: Int): RDD[( + PartitionID, (VertexId, VertexId, ED))] = { + val in_degrees = graph.edges.map(e => (e.dstId, (e.srcId, e.attr))). + join(graph.inDegrees.map(e => (e._1, e._2))) + in_degrees.map { e => + var part: PartitionID = 0 + val srcId = e._2._1._1 + val dstId = e._1 + val attr = e._2._1._2 + val Degree = e._2._2 + val mixingPrime: VertexId = 1125899906842597L + if (Degree > degreeCutoff) { + part = ((math.abs(srcId) * mixingPrime) % numPartitions).toInt + } else { + part = ((math.abs(dstId) * mixingPrime) % numPartitions).toInt + } + (part, (srcId, dstId, attr)) + } + } + } + + case object GreedySrc extends DefaultPartitionStrategy { + override def getPartitionedEdgeRDD[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numPartitions: PartitionID, degreeCutoff: Int): RDD[( + PartitionID, (VertexId, VertexId, ED))] = { + val mixingPrime: VertexId = 1125899906842597L + + // Count the number of edges corresponding to each srcID. + val groupedSrc = graph.edges.map{e => (e.srcId, e.dstId)}.groupByKey + + // Assuming initial random partitioning based on dstId + // Count the overlap for each SrcId with each Partition. + val srcEdgeCount = groupedSrc.map{ e => + var srcOverlap = new Array[Long](numPartitions) + e._2.map{ dsts => srcOverlap((math.abs(dsts * mixingPrime) % numPartitions).toInt) += 1} + (e._1, srcOverlap) + } + + // An array to capture the load on each partition + // as we greedily assign edges to different partitions + var current_load = new Array[Long](numPartitions) + + val FinalSrcAssignment = srcEdgeCount.map { e => + val src = e._1 + val dst = e._2 + + // Randomly assign the src id to a partition. + var part : PartitionID = (math.abs(src * mixingPrime) % numPartitions).toInt + + // Go over each partition and see with which partitions the neighbors of this vertex + // overlap the most. Also take into account that the partition doesn't get too heavy. + var mostOverlap : Double = dst.apply(part) - math.sqrt(1.0*current_load(part)) + for (cur <- 0 to numPartitions-1){ + val overlap : Double = dst.apply(cur) - math.sqrt(1.0*current_load(cur)) + if (overlap > mostOverlap){ + part = cur + mostOverlap = overlap + } + } + + // All the edges associated with this source vertex is sent to partition choosen + // Hence we increase the edge count of this partition in the current load. + for (cur <- 0 to numPartitions-1){ + current_load(part) += dst.apply(cur) + } + (src, part) + } + + // Join the found partition for each source to its edges. + val PartitionedRDD = (graph.edges.map {e => (e.srcId, (e.dstId , e.attr))}). + join(FinalSrcAssignment.map{ e => (e._1 , e._2)}) + PartitionedRDD.map { e => + (e._2._2, (e._1, e._2._1._1, e._2._1._2)) + } + } + } + + case object GreedyDst extends DefaultPartitionStrategy { + override def getPartitionedEdgeRDD[VD: ClassTag, ED: ClassTag]( + graph: Graph[VD, ED], numPartitions: PartitionID, degreeCutoff: Int): RDD[( + PartitionID, (VertexId, VertexId, ED))] = { + val mixingPrime: VertexId = 1125899906842597L + + // Count the number of edges corresponding to each dstID. + val groupedDst = graph.edges.map{e => (e.dstId, e.srcId)}.groupByKey + + // Assuming initial random partitioning based on srcId + // Count the overlap for each DstId with each Partition. + val dstEdgeCount = groupedDst.map{ e => + var dstOverlap = new Array[Long](numPartitions) + e._2.map{ srcs => dstOverlap((math.abs(srcs * mixingPrime) % numPartitions).toInt) += 1} + (e._1, dstOverlap) + } + + // An array to capture the load on each partition + // as we greedily assign edges to different partitions + var current_load = new Array[Long](numPartitions) + + val FinalDstAssignment = dstEdgeCount.map { e => + val dst = e._1 + val src = e._2 + + // Randomly assign the destination id to a partition. + var part : PartitionID = (math.abs(dst * mixingPrime) % numPartitions).toInt + + // Go over each partition and see with which partitions the neighbors of this vertex + // overlap the most. Also take into account that the partition doesn't get too heavy. + var mostOverlap : Double = src.apply(part) - math.sqrt(1.0*current_load(part)) + for (cur <- 0 to numPartitions-1){ + val overlap : Double = src.apply(cur) - math.sqrt(1.0*current_load(cur)) + if (overlap > mostOverlap){ + part = cur + mostOverlap = overlap + } + } + + // All the edges associated with this destination vertex is sent to partition choosen + // Hence we increase the edge count of this partition in the current load. + for (cur <- 0 to numPartitions-1){ + current_load(part) += src.apply(cur) + } + (dst, part) + } + + // Join the found partition for each destination to its edges. + val PartitionedRDD = (graph.edges.map {e => (e.dstId, (e.srcId , e.attr))}). + join(FinalDstAssignment.map{ e => (e._1 , e._2)}) + PartitionedRDD.map { e => + (e._2._2, (e._2._1._1, e._1, e._2._1._2)) + } + } + } + + /** Returns the PartitionStrategy with the specified name. */ def fromString(s: String): PartitionStrategy = s match { case "RandomVertexCut" => RandomVertexCut case "EdgePartition1D" => EdgePartition1D + case "EdgePartition1DByDst" => EdgePartition1DByDst case "EdgePartition2D" => EdgePartition2D case "CanonicalRandomVertexCut" => CanonicalRandomVertexCut + case "HybridSrc" => HybridSrc + case "HybridDst" => HybridDst + case "GreedySrc" => GreedySrc + case "GreedtDst" => GreedyDst case _ => throw new IllegalArgumentException("Invalid PartitionStrategy: " + s) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index da95314..4d867aa 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -26,7 +26,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.graphx.impl.GraphImpl._ import org.apache.spark.graphx.util.BytecodeUtils - +import org.apache.spark.Logging /** * An implementation of [[org.apache.spark.graphx.Graph]] to support computation on graphs. @@ -38,8 +38,8 @@ import org.apache.spark.graphx.util.BytecodeUtils class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( @transient val vertices: VertexRDD[VD], @transient val replicatedVertexView: ReplicatedVertexView[VD, ED]) - extends Graph[VD, ED] with Serializable { - + extends Graph[VD, ED] with Serializable with Logging { + val DEFAULT_DEGREE_CUTOFF : Int = 70 /** Default constructor is provided to support serialization */ protected def this() = this(null, null) @@ -97,14 +97,18 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( partitionBy(partitionStrategy, edges.partitions.size) } + override def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int) + : Graph[VD, ED] = { + partitionBy(partitionStrategy, numPartitions, DEFAULT_DEGREE_CUTOFF) + } + override def partitionBy( - partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED] = { + partitionStrategy: PartitionStrategy, numPartitions: Int, degreeCutoff: Int = 100) + : Graph[VD, ED] = { val edTag = classTag[ED] val vdTag = classTag[VD] - val newEdges = edges.withPartitionsRDD(edges.map { e => - val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) - (part, (e.srcId, e.dstId, e.attr)) - } + val newEdges = edges.withPartitionsRDD( + partitionStrategy.getPartitionedEdgeRDD(this, numPartitions, degreeCutoff) .partitionBy(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex( { (pid, iter) => val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag)