Running Graph Analytics with Spark GraphFrames: A Simple Example

Let me jump right to the point: in the open-source community it is not often enough that a new software distribution/package/library becomes available while accompanied with sufficient documentation to accommodate the needs of its diverse users. And while immense effort is often put into tackling this problem, there will always be a group of users out there with way too specialised issues or needs, for whom that documentation will always be rendered not helpful. At the same time, simply miss-reading or, sometimes even, neglecting to read the existing documentation, leads to many problems which take valuable time to resolve. To cut a long story short, end of the day, as soon as something is out, all we want to do is just "get our hands dirty with it". Much like James Bond with a new Aston Martin.

"Really Q, who has time for manuals!?" (frankly, if we did, Python may have never existed). 

Anyhow, this is exactly why I appreciate immensely anyone who dedicates some of his/her time to put together a short blog post, with simple but detailed instructions on setting up and configuring a new release, as well as on how to run a basic POC that relies on it. That's my main drive for writing this blog post. So, below, I will cover how to setup PyCharm to run a standalone Spark application using GraphFrames, which where recently released with Apache Spark's 2.0 distribution

Download Apache Spark

First things first. You need to download the latest distribution of Apache Spark (2.0.0). Truth be told, the GraphFrames project is compatible with any Spark 1.4+, but, at least as far as DataFrames are concerned, in more recent Spark versions one can observe significant speed improvements.

It does not really matter where you decide to save Spark but for the purpose of this post let us assume that the directory path is:

/Users/yourUserName/Spark/spark-2.0.0-bin-hadoop2.7

Download & Install GraphFrames

Next you need to download and install GraphFrames. On a side note, one may wonder at this point why GraphFrames is not already intergraded in the existing Apache Spark distribution. Well, at the moment GraphFrames is a collaborative effort among UC Berkeley, MIT, and Databricks. It is true that two of these are amongst the Apache Spark official developers (Apache Software FoundationUC Berkeley AMPLab, Databricks), but this does not change the fact that GraphFrames is a project at birth and it will take some time for it to be broadly accepted by the community and thus integrated into the root Spark distribution.

You can find the latest distribution of the GraphFrames package here. Download the zip file for Version: 0.2.0-spark2.0-s_2.11. Again, it does not matter where this is stored but lets assume that we choose:

/Users/yourUserName/graphframes

Next follow these steps:

  1. Open a terminal and navigate to the .../graphframes directory above.
  2. You should notice that there is a build directory with an sbt file that needs to be run in oder to compile the project. While still in the .../graphframes directory run: 
    $ ./build/sbt assembly. If all goes well you should see something like the following in your terminal:
Successful build...

Successful build...

Next you need to navigate to /Users/yourUserName/graphframes/python/graphframes where you should see the following files: 

It's worth reminding the reader at this point that Python relies on an Interpreter and thus needs to compile a script into compiled code before running it. That's what those *.pyc files are.

Following on, we need to include this directory (.../graphframes/python/graphframes) into a specified Spark directory. That would be:

/Users/yourUserName/Spark/spark-2.0.0-bin-hadoop2.7/python/pyspark
 

If you have done everything the way you should, then you must be able to see something like this in your Spark directory: 

Now this may seem like a quick work-around fix, but after experimenting with the alternatives I think that this is good enough to satisfy most of your needs. If you prefer, you can configure PyCharm to import the package with:

Run -> Edit configuration -> Choose configuration -> Select configuration tab -> Choose Environment variables -> Add:

"PYSPARK_SUBMIT_ARGS"

with 

"--packages graphframes:graphframes:0.2.0-spark2.0-s_2.11 pyspark-shell"

or by including the following line of code at the begining of your code:

import os
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages graphframes:graphframes:0.2.0-spark2.0-s_2.11 pyspark-shell"
)

(both require that the graphframes.jar is saved into /Users/yourUserName/Spark/spark-2.0.0-bin-hadoop2.7/jars) but in this case, the editor wont be able to identify the graphfame dependencies and will keep throwing "unresolved" warnings which is very irritating—at least to me. Your code should run fine though in either case.

The Dataset: A Gene's Association Graph (GAG)

That's it! We are ready to get down to coding. So let me introduce our dataset. We will be importing a small graph (a bit bigger than 1.5 MB) stored in a .csv file containing 50,000 rows and three columns of data. At inspection, the file looks like below:

Gene associations...

Gene associations...

where OFFICIAL_SYMBOLs represent Genes and EXPERIMENTAL_SYSTEM concerns their association type. (you can find the dataset here along with code for loading it into Neo4j). 

Initiating Spark

To run Spark from PyCharm you need to specify the $SPARK_HOME directory from your code and append to it the path to Py4J which enables the Python interpreter to dynamically access Java objects in JVM. This is done as follows:

# IMPORT LIBS -------------------------------------------------------------------------------------#
import os
import sys

# IMPORT SPARK ------------------------------------------------------------------------------------#
# Path to Spark source folder
USER_FILE_PATH = "/Users/yourUserName"
SPARK_PATH = "/Spark"
SPARK_FILE = "/spark-2.0.0-bin-hadoop2.7"
SPARK_HOME = USER_FILE_PATH + SPARK_PATH + SPARK_FILE
os.environ['SPARK_HOME'] = SPARK_HOME

# Append py4j to Python Path
sys.path.append(SPARK_HOME + "/python")
sys.path.append(SPARK_HOME + "/python" + "/lib/py4j-0.10.1-src.zip")

Next import the following packages:

try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    from pyspark.sql import SQLContext
    from pyspark.graphframes import graphframe as GF

except ImportError as ex:
    print "Can not import Spark Modules", ex
    sys.exit(1)

Following on, configure Spark according to your needs and instantiate as SparkContext object (SC). Finally, use the SC object to instantiate an SQL_CONTEXT object.

# Configure spark properties
CONF = (SparkConf()
        .setMaster("local")
        .setAppName("My app")
        .set("spark.executor.memory", "10g")
        .set("spark.executor.instances", "4"))

# Instantiate SparkContext object
SC = SparkContext(conf=CONF)

# Instantiate SQL_SparkContext object
SQL_CONTEXT = SQLContext(SC)

Loading the Data

We will use the pandas package for loading the dataset as a dataframe. Though not explicitly necessary in this case, the code I am using below accounts for cases where the file size is too big to be loaded in one go and performs the loading operation in chunks. We first load the genes with the following code. Notice that since the .csv file contains relationships, genes under the OFFICIAL_SYMBOL_A header may appear multiple times and thus duplicates need to be removed. Also, make sure to name the dataframe's single column that we are importing with the header "id" otherwise you will get an exception when you try to instantiate the GraphFrame based on it. 

# Read csv file and load as DataFrame
GENES = pd.read_csv(USER_FILE_PATH + DATA_PATH + FILE_NAME,
                    usecols=['OFFICIAL_SYMBOL_A'],
                    low_memory=True,
                    iterator=True,
                    chunksize=1000)

# Concatenate chunks into list & convert to DataFrame
GENES_DF = pd.DataFrame(pd.concat(list(GENES), ignore_index=True))

# Remove duplicates
GENES_DF_CLEAN = GENES_DF.drop_duplicates(keep='first')

# Name Columns
GENES_DF_CLEAN.columns = ['id']

# Output DataFrame
print GENES_DF_CLEAN

Once the dataframe has been created, we then need to instantiate a Spark dataframe that will serve as our Vertices. 

# Create vertices
VERTICES = SQL_CONTEXT.createDataFrame(GENES_DF_CLEAN)

# Show some vertices
print VERTICES.take(5)

Next it's the relationships' turn. We repeat the same process as before with the difference that now we will be importing all three columns. If I am to be strict, I should say that this step is not really necessary if one opts for loading everything in one go the first time, then SELECTing different column-dataframes thus efficiently differentiating between vertices and edges. Still, I am repeating the code here for presentation convenience. Also, even though it is not really necessary in our case, you should make sure that once you are done with the loading process you get rid of anything that should not be kept in the memory any more, e.g. run:  

del GENES_DF_CLEAN, GENES_DF, GENES

Side note: The del statement does not directly reclaim memory. It only, removes a reference, which decrements the reference count on the value and if that count is zero, the memory can be reclaimed. In general, CPython will reclaim the memory immediately, there's no need to wait for the garbage collector to run.

Here is the code for loading the edges:

# Read csv file and load as DataFrame
EDGES = pd.read_csv(USER_FILE_PATH + DATA_PATH + FILE_NAME,
usecols=['OFFICIAL_SYMBOL_A', 'OFFICIAL_SYMBOL_B', 'EXPERIMENTAL_SYSTEM'],
                low_memory=True,
                iterator=True,
                chunksize=1000)

# Concatenate chunks into list & convert to DataFrame
EDGES_DF = pd.DataFrame(pd.concat(list(EDGES), ignore_index=True))

# Name Columns
EDGES_DF.columns = ["src", "dst", "rel_type"]

# Output DataFrame
print EDGES_DF

# Create vertices
EDGES = SQL_CONTEXT.createDataFrame(EDGES_DF)

# Show some edges
print EDGES.take(5)

Next we finally create the graph:

GENES_GRAPH = GF.GraphFrame(VERTICES, EDGES)

Running Analytics

Once the GraphFrame has been instantiated there are many interesting out of the box analytics you can try. I list some of them below:

print "Vertex in-Degree -----------------------------------------------------------------------"
GENES_GRAPH.inDegrees.sort('inDegree', ascending=False).show()
print "Vertex out-Degree ----------------------------------------------------------------------"
GENES_GRAPH.outDegrees.sort('outDegree', ascending=False).show()
print "Vertex degree --------------------------------------------------------------------------"
GENES_GRAPH.degrees.sort('degree', ascending=False).show()
print "Triangle Count -------------------------------------------------------------------------"
RESULTS = GENES_GRAPH.triangleCount()
RESULTS.select("id", "count").show()
print "Label Propagation ----------------------------------------------------------------------"
GENES_GRAPH.labelPropagation(maxIter=10).show()     # Convergence is not guaranteed
print "PageRank -------------------------------------------------------------------------------"
GENES_GRAPH.pageRank(resetProbability=0.15, tol=0.01)\
  .vertices.sort('pagerank', ascending=False).show()
print "Find Shortest Paths w.r.t. Landmarks ---------------------------------------------------"
SHORTEST_PATH = GENES_GRAPH.shortestPaths(landmarks=["ARF3", "MAP2K4"])
    SHORTEST_PATH.select("id", "distances").show()

That's all for now... 

That's all for now then. I have created a repository which you can clone and use for running the above example without too much fuss here. I promise to come back soon with some more advanced analytics you can try, probably using motifs, as well as with ways you can visualise results of those analytics. After all, in the graphs' world there is nothing more expressive than a visual snapshot, right?