Fork me on GitHub

JPregel





Developer Docs



API Tutorial

Writing a JPregel program involves subclassing the predefined abstract class : Vertex whose definition is explained below :

public abstract class Vertex
{
    public abstract void compute(List<Message> msgs);
    public String getVertexID();
    public int getTotalNumVertices();
    public int getSuperStep();
    public double getValue();
    public void setValue(double newValue);
    public List<Message> getMessages();
    public void sendMessage(Edge e, double msgValue);
    public List<Edge> getEdges();
    public void writeSolution(OutputStream anOutputStream) 
                                         throws IOException;
}

The developer implements the abstract method : Vertex.compute() which will be executed at each active vertex in every superstep. The method : getMessages() returns a list of messages passed to this vertex by other vertices at the end of the previous superstep. These messages contain information to be processed in the vertex-centric computation.

Inside the compute() method, the developer can use other Vertex methods to obtain information about the current vertex and its edges and to send message to other vertices.

The method : getVertexID() returns the unique identification for the vertex. The methods : getValue() and setValue(double newValue) can be used to get and set the vertex value. As of now, class Vertex is not generic. This means that vertices can store 'Double' values and vertices can send messages containing 'Double' values to other vertices. We plan to improve to a generic design in the future once we have a stable end-to-end system in place.

Vertices communicate with one another by sending messages using the sendMessageTo(Edge e, double msgValue) method. For each call to this method, the vertex has to pass a message value (Double, as of now) and an outgoing edge starting from this vertex and ending in a neighbouring vertex. The getEdges() method returns a list of outgoing edges of this vertex in the input graph. The method Edge.getCost() defined in class Edge can be used to get the cost of an outgoing edge.

The sendMessageTo(Edge e, double msgValue) method in the super class uses an independent entity called a Communicator tied to every compute node to perform the communication among vertices. This is basically an independent thread that aggregates outgoing messages from all vertices executing in a compute node. The communicator is aware of the assignment of vertices to other compute nodes in the cluster. It uses a multithreaded design to communicate messages through parallel RMI calls to other compute nodes, once all worker threads complete their computations.

The method : getTotalNumVertices() returns the number of vertices in the input graph. This value is initialized by the JPregel system in all vertices across all compute nodes before the first superstep. The method : getSuperStep() returns the superstep number of the current superstep (starting from 0).

The values associated with a vertex define its independent state across every superstep. This state can be persisted at the end of every superstep which simplifies the main computation cycle, graph distribution and failure recovery.

Once all the supersteps for a particular problem instance are over, each vertex can write its solution to an output stream using the writeSolution(OutputStream anOutputStream) method.


Example 1 : Shortest Paths Problem

To solve the shortest path problem using JPregel, the application programmer has to extend class Vertex to define class ShortestPathVertex, and provide a defintion for the method : Vertex.Compute() which solves the single source shortest path problem using Dijkstra's algorithm.

In this problem, the value of a vertex at any point of time represents the cost of the tentative shortest path from the source vertex to this vertex. The method : Vertex.Compute() receives a list of messages from all the other nodes in the previous superstep. It iterates over the set of messages to find the message having the minimum distance value. It sets its own value to the minimum distance received and propagates the minimum distance value to all the vertices that are directly connected to it. This repeats in every superstep.

The vertex votes to a halt after the computation and is activated by the JPregel system in the next superstep only if it has recieved any messages in the previous superstep.

import graphs.Edge;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;

import system.JPregelConstants;
import system.Message;
import api.Vertex;

/**
 * 
 * Calculates the single source shortest path from the source vertex to every
 * other vertex in the graph based in Dijkstra's shortest path algorithm
 *
 */
public class ShortestPathVertex extends Vertex {

    private static final long serialVersionUID = -4877455216665842419L;
    
    /*
     * The vertex previous to the current vertex in the shortest path solution
     */
    private int prevVertex;

    public ShortestPathVertex() {
        this.prevVertex = -1;
    }

    private boolean isSourceVertex() {
        if (this.getVertexID() == 0) {
            return true;
        }
        return false;
    }

    /*
     * Implementation of compute() for Dijkstra's shortest path
     */
    @Override
    public void compute() {

        double minDist = isSourceVertex() ? 0 : JPregelConstants.INFINITY;
        int newPrevVertexID = -1;
        for (Message aMsg : this.getMessages()) {
            if (aMsg.getMessageValue() < minDist) {
                minDist = aMsg.getMessageValue();
                newPrevVertexID = aMsg.getSourceVertexID();
            }
        }

        if (minDist < this.getValue()) {
            this.prevVertex = newPrevVertexID;
            this.setValue(minDist);
            for (Edge e : this.getEdges()) {
                this.sendMessage(e, minDist + e.getCost());
            }
        }
    }
    
    /*
     * Write the solution to an output stream
     */

    @Override
    public void writeSolution(OutputStream anOutputStream) throws IOException {
        BufferedWriter buffWriter = new BufferedWriter(new OutputStreamWriter(
                anOutputStream));
        buffWriter.write("" + this.getVertexID() + "->" + this.getValue() + ","
                + this.prevVertex + "\n");
        buffWriter.close();
    }

}



Example 2 : PageRank Algorithm

To solve the Page Rank problem using JPregel, the application programmer extends the Vertex class to create a PageRankVertex class which defines the method : Vertex.compute() to calculate the page rank of a vertex.

At any point of time, the value of a vertex represents the tentative page rank of the vertex. The graph is intialized with each vertex value equal to 1/num_of_vertices. In each of the first 30 supersteps, each vertex sends its tentative page rank along all of its outgoing edges.

From SuperStep 1 to 30, each vertex sums up the values arriving on all its messages and sets its tentative page rank to 0.15/num_of_vertices + 0.85 * sum_of_PRs. After 30 Supersteps the vertices vote to halt. The number 30 is an arbitrary value assumed to provide a simplified implementation of PageRank. In reality, PageRank algorithm will run until convergence is achieved.

import graphs.Edge;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;

import system.Message;

import api.Vertex;

/**
 * 
 * Calculates the page rank of a vertex
 * 
 */
public class PageRankVertex extends Vertex{    
    
    private static final long serialVersionUID = -4444855908940126630L;

    public PageRankVertex(){
        
    }

    /**
     * Defines the compute() method foc calculating PageRank
     */
    @Override
    public void compute() {
        
        if(this.getSuperStep()>=1){
            double sum=0;
            for(Message m : this.getMessages()){
                sum+=m.getMessageValue();
            }
            double newPageRank=0.15/this.getTotalNumVertices()+0.85*sum;
            this.setValue(newPageRank);
        }
        
        if(this.getSuperStep()< 30){
            int numEdges=this.getEdges().size();
            for(Edge e : this.getEdges()){
                this.sendMessage(e, this.getValue()/numEdges);
            }
        }
    }
    
    @Override
    public void writeSolution(OutputStream anOutputStream) throws IOException{
        BufferedWriter buffWriter = new BufferedWriter(new OutputStreamWriter(anOutputStream));
        buffWriter.write(""+this.getVertexID()+"->"+this.getValue()+"\n");        
        buffWriter.close();
    }

    
}



Future Work

  1. Generic representation of input graphs
  2. Partitioning based on vertex degrees (Achieves better load balancing)
  3. Custom partitioning - Reduces message communication overhead
    Ex: Vertices representing pages of the same site for world wide web graphs are assigned to the same worker
  4. Distributed database to store messages in communicator and checkpoint data
  5. Dedicated machines to detect faults
  6. Multi-threaded design to issue requests to worker machines to dump solutions once the problem is solved









© 2010 Kowshik Prakasam and Manasa Chandrasekhar | Template design by Andreas Viklund