Analyzing Data with Spark

Times are getting tough now. Attacks from the mutants are on the rise all over the country, and Division 3 has not handled this problem effectively with the previous applications. Since the applications were not properly preventing or stopping attacks in a timely manner, Division 3 now wants to dive back into data analytics to learn how to prevent the attacks. In this lesson, we will go over how to use Apache Spark to analyze and visualize the data from the Mutant Monitoring system.

As our mutant data increases in size over time, we need to be able to quickly analyze it using a cluster of machines, and that is where Apache Spark comes in. Apache Spark is a distributed system and unified analytics engine for large-scale data processing.

Spark

So what is Spark, exactly? It is many things – but first and foremost, it is a platform for executing distributed data processing computations. Spark provides both the execution engine – which is to say that it distributes computations across different physical nodes – and the programming interface: a high-level set of APIs for many different tasks.

Spark includes several modules, all of which are built on top of the same abstraction: the Resilient, Distributed Dataset (RDD). In this lesson, we will see how to use Spark together with Scylla to analyze the Mutant Monitoring data more effectively and efficiently. 

This topic is explored in-depth in a series of blog posts, that starts here.

Setting up the Scylla Cluster

In this lesson, we will require a three-node Scylla cluster, with some data. 

Follow this procedure to remove previous clusters and set up a new cluster.

The next step is to create the tracking system keyspace and table that will allow us to keep track of the following mutant metrics:

  • Name
  • Timestamp
  • Location
  • Speed
  • Velocity
  • Heat
  • Telepathy powers

First, let’s create the tracking keyspace. If you are not in the cqlsh, connect first:

docker exec -it mms_scylla-node1_1 cqlsh
CREATE KEYSPACE tracking WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy','DC1' : 3};

Now we can create the tracking_data table:

use tracking;
CREATE TABLE tracking_data (
    first_name text,
    last_name text,
    timestamp timestamp,
    location varchar,
    speed double,
    heat double,
    telepathy_powers int,
    primary key((first_name), timestamp))
    WITH CLUSTERING ORDER BY (timestamp DESC)
    AND COMPACTION = {'class': 'DateTieredCompactionStrategy',
    'base_time_seconds': 3600,
    'max_sstable_age_days': 1};

Now that the table is created, let’s insert data into the tracking_data table for Jim Jeffries:

INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Jim','Jeffries','2017-11-1 08:05+0000','New York',1.0,3.0,17) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Jim','Jeffries','2017-11-2 09:05+0000','New York',2.0,4.0,27) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Jim','Jeffries','2017-11-2 10:05+0000','New York',3.0,7.0,37) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Jim','Jeffries','2017-11-3 10:22+0000','New York',4.0,12.0,47) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Jim','Jeffries','2017-11-7 11:05+0000','New York',4.0,9.0,87) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Jim','Jeffries','2017-11-11 12:05+0000','New York',4.0,24.0,57) ;

Let’s also add data for Bob Loblaw:

INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Bob','Loblaw','2017-11-1 08:05+0000','Cincinatti',2.0,9.0,5) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Bob','Loblaw','2017-11-1 09:05+0000','Cincinatti',4.0,1.0,10) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Bob','Loblaw','2017-11-3 10:05+0000','Cincinatti',6.0,1.0,15) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Bob','Loblaw','2017-11-5 10:22+0000','Cincinatti',8.0,3.0,6) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Bob','Loblaw','2017-11-9 11:05+0000','Cincinatti',10.0,2.0,3) ;
INSERT INTO tracking.tracking_data ("first_name","last_name","timestamp","location","speed","heat","telepathy_powers") VALUES ('Bob','Loblaw','2017-11-11 12:05+0000','Cincinatti',12.0,10.0,60) ;

Running Spark

We’ll run Spark using Docker, through the provided docker-compose.yaml file.

If you’re still in the cqlsh exit first:

exit

Browse to the scylla-code-samples/mms/spark directory and run the following command:

docker-compose up -d spark-master spark-worker

After Docker pulls the images, Spark should be up and running, composed of a master process and a worker process. We’ll see what these are in the next section. To use Spark, we’ll launch the Spark shell inside the master container:

docker-compose exec spark-master spark-shell --conf spark.driver.host=spark-master

It takes a short while to start, but eventually, you should see this prompt:

As the prompt line hints, this is a Scala REPL, preloaded with some helpful Spark objects. Aside from interacting with Spark, you can evaluate any Scala expression that uses the standard library data types.

To read more about the Spark Architecture and RDDs, check out this blog post. 

The Datastax Spark/Cassandra Connector

The Datastax Spark/Cassandra connector is an open-source project that will allow us to import data in Cassandra into Spark RDDs, and write Spark RDDs back to Cassandra tables.

Since Scylla is compatible with Cassandra’s protocol, we can seamlessly use the connector with it.

We’ll need to make sure the connector ends up on the Spark REPL’s classpath and configure it with Scylla’s hostname, so we’ll exit the shell (using Cntrl + C) re-launch the shell in the Spark master’s container, with the addition of the –packages and –conf arguments:

docker-compose exec spark-master spark-shell \
   --conf spark.driver.host=spark-master \
   --conf spark.cassandra.connection.host=scylla-node1 \
   --packages datastax:spark-cassandra-connector:2.4.0-s_2.11,commons-configuration:commons-configuration:1.10

The shell should now download the required dependencies and make them available on the classpath. After the shell’s prompt shows up, test that everything worked correctly by making the required imports, loading the table as an RDD and running .count() on the RDD:

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
val trackingRdd = sc.cassandraTable("tracking","tracking_data")
trackingRdd.count()

The call to count should result in 12 (or more, if you’ve inserted more data than listed in the example!):

The imports that we’ve added bring in syntax enrichments to the standard Spark data types, making the interaction with Scylla more ergonomic. You’ve already seen one of those enrichments: sc.cassandraTable is a method added to SparkContext for conveniently creating a specialized RDD backed by a Scylla table.

The type of that specialized RDD is CassandraTableScanRDD[CassandraRow]. As hinted by the type, it represents a scan of the underlying table. 

Under the hood, the call to .count() translates to the following query in Scylla:

SELECT * FROM tracking.tracking_data

The entire table is loaded into the Spark executors, and the rows are counted afterward by Spark. If this seems inefficient to you – it is! You can also use the .cassandraCount() method on the RDD, which will execute the count directly on Scylla.

The element contained in the RDD is of type CassandraRow. This is a wrapper class for a sequence of untyped data, with convenience getters for retrieving fields by name and casting to the required type.

Here’s a short example of interacting with those rows:

val row = trackingRdd.first
row.getString("first_name")
row.getDouble("heat")

This will extract the first row from the RDD, and extract the first_name and the heat 

We’ve already seen cassandraCount, that delegates the work of counting to Scylla. Similarly, we have the where method, that allows you to specify a CQL predicate to be appended to the query. A case of a mutant traveling at high speed while having a high temperature is very suspicious. To check if the mutant Bob moved at a speed higher then 9 MPH while having a temperature high than 6 degrees we would:

val filteredRdd =
trackingRdd.where("first_name = ? AND heat >= ? AND speed >= ?", "Bob", "6.0", "9.0")
filteredRdd.count()

The benefit of the where method compared to applying a filter transformation on an RDD can be drastic. In the case of the filter transformation, the Spark executors must read the entire table from Scylla, whereas when using where, only the matching data will be read. Compare the two queries that’ll be generated:

The first query is much more efficient! Spark allows for performing advanced queries which are impossible to perform directly in Scylla. Learn more here

Summary

Division 3 was not taking the proper actions to analyze the Mutant Monitoring data effectively and took action to do better in the future. This lesson has been a high-level overview of Spark, the Spark/Cassandra connector, and to work with Spark and Scylla.  Apache Spark is a powerful distributed data analytics system that can interact with data from Scylla. Division 3 now has a smooth and powerful solution to analyze the data from the Mutant Monitoring System. Please be safe out there and continue to monitor the mutants so we can protect as many lives as possible.

To report this post you need to login first.