Realtime Streaming Data Pipeline using Google Cloud Platform and Bokeh

This post aims to build a real-time streaming data pipeline and a simple dashboard to visualise the streaming data. At Data Reply, we specialise in Google Cloud Platform (GCP), hence I decided to use GCP to build this pipeline. 

Data Source

The best streaming data source is Twitter so I decided to use Twitter Streaming API for this tutorial. 

Objective

Build the data pipeline to stream the tweets in real-time and create a predefined time interval window (10 seconds for our purpose) that:

  • Stores the raw tweets to a persistent data warehouse that can be used for analytical purposes.
  • Counts the number of tweets that contains a string (defined through command line argument) in that time window and publishes the count to a messaging service from where the visualisation tool will pull the results every X seconds.

Tools

Based on the objectives described above, the tools that I will use for this tutorial are:

  • Cloud Pub/Sub: A fully-managed real-time messaging service that allows you to send and receive messages between independent applications.
  • Cloud Dataflow: A fully-managed service to transform and enrich data in stream (real time) and batch (historical) modes to reliably build your ETL pipelines.
  • BigQuery: Google's fully managed, petabyte scale, data warehouse for analytics.
  • Compute Engine: It is the Infrastructure-as-a-Service component of Google Cloud Platform offering high performance virtual machines.
  • Bokeh: A visualization library for Python delivering high-performance interactivity for large or streamed datasets.

 

Architecture

streaming-data-pipeline-architecture-gcp.png

Basic idea

  • The aim is to first have a python process running in a Compute Engine instance receiving the data continuously from Twitter and publish it to cloud Pub/Sub topic.
  • A Cloud Dataflow Streaming job will then be continuously pulling the tweets from Pub/Sub and will create a window of X seconds (I am going to use 10 seconds window as mentioned earlier).
  • Every 10 seconds, for data persistence and future analysis, the raw tweets are stored in Bigquery. At the same time, the number of tweets in the window containing a user-defined string is counted and the results are published to a different Pub/Sub topic.
  • To visualise the aggregated results in a time window we will create a Bokeh Dashboard that runs in another python process in the same Compute Engine instance.

Setting up Compute Engine Instance

This step is optional. If you don't want to use Compute Engine you can use your local machine to run the python processes described above.

  • Create a Compute Engine instance with CentOS 7 image
  • SSH into the VM and install/update the developmental tools by running the following command:
sudo yum -y update
sudo yum install -y wget git
sudo yum groupinstall -y "Development Tools"
sudo yum install -y zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gdbm-devel db4-devel libpcap-devel xz-devel expat-devel
  • Install Python pip package management system, Python header files (python-devel) to build Python extensions and upgrade python setup tools:
sudo yum -y install python-pip python-wheel python-devel 
sudo pip install --upgrade pip
sudo pip install --upgrade setuptools

Twitter to Pub/Sub

The First step in the data pipeline is receiving data from Twitter. We will use tweepy python package to use the Twitter streaming api. Just run the following command to install tweepy.

pip install tweepy

Now create an application on Twitter Application Management page and note down the Consumer Key, Consumer Secret, Access Token and Access Token Secret. We will be using this to authorize the Twitter Streaming API from tweepy.

Now create a StreamListener class that takes a list of keywords that will filter the real-time tweet stream. Override the default on_status method to the one as below to cherry-pick the meta-data required from the tweet and publish the tweet to Pub/Sub.

Here I filter tweets that contain 'Royal Wedding', '#RoyalWedding', 'Prince Harry' or 'Meghan Markle'. If you run this code after setting the Twitter App credentials variables and GCP_PROJECT_NAME, PUBSUB_TOPIC_NAME and TOTAL_TWEETS, it will start receiving the tweets and publishing them to the Pub/Sub topic defined by PUBSUB_TOPIC_NAME. Here, if you change the batch_size to 50, the python process will collect 50 tweets and then publish to Pub/Sub instead of publishing each tweet one by one.

Stream Processing in Dataflow 

dataflow-ui.png

While writing this post, the Dataflow Python SDK for streaming was still in experimental stages so I used Java to write the Dataflow code. The steps carried out (as shown in the image above) are as follows:

  1. Read messages from Pub/Sub
  2. Extract a single message from Pub/Sub containing X number of tweets (depends on the batch_size parameters set in the Twitter to PubSub step) and create a PCollection of tweets.
  3. Create a time window of 10 seconds.
  4. Here we need to do two things:
    • Persist raw data to a Data WareHouse: Store the results to Bigquery by streaming inserts.
    • Check whether the tweet contains the string passed in the input-argument to Dataflow or not. Count the total number of tweets in the given time window that contains this string. Send the results to a different Pub/Sub topic.
 Data persisted in Bigquery by streaming inserts from Dataflow job

Data persisted in Bigquery by streaming inserts from Dataflow job

Pub/Sub to Bokeh Dashboard

The last step is building a dashboard that can handle streaming data. After extensively searching for the tools that allow creating a dashboard with a streaming input source, I came across Bokeh that supported streaming data source.

You need to first create ColumnDataSource. By using the stream method, Bokeh only sends new data to the browser instead of the entire dataset. The stream method takes a new_data parameter containing a dict mapping column names to sequences of data to be appended to the respective columns. It additionally takes an optional argument rollover, which is the maximum length of data to keep (data from the beginning of the column will be discarded). The default rollover value of None allows data to grow unbounded.

To access the code for the Bokeh dashboard click here.

The final dashboard looks as below and updates every 10 seconds with new data points:

bokeh-streaming-dashboard-pubsub.png

Future Work

  • Use Kubernetes (via Google Kubernetes Engine) instead of a VM so that I can isolate the two python processes in 2 different pods.
  • Deploy the Bokeh dashboard to AppEngine.
  • Extend the Dataflow and Bokeh dashboard code to visualise more than 1 entities for example visualise the number of tweets mentioning Boris Johnson and Theresa May on a single bokeh dashboard which would make it easy to compare.