Spark Examples

This page has exercises, most presented or at least sketched in class.

Count Unusual Values

The problem is to count how many values in the eegsample dataset are not within two standard deviations of the mean. A program for this was developed in class, essentially consisting of the following steps.

This code took around 30 seconds to run. To get something that runs faster, sampling was suggested: by changing the initial line to the following, the program ran quite fast (sampling just 1% of the dataset).

Another demonstration for this problem put all the code into a file wanted.py, along with some initial lines to create the Spark Context (sc) variable. This file was copied, using the scp command, to the practice machine. Then the following command was demonstrated:

What this command does is run the wanted.py using spark-submit, but it puts the many informational (and error) messages into a file named "log" whereas the printed output goes to a file named "hold". This can be handy because scp can be used to copy log and/or hold to your local computer for study.

Bigrams and Trigrams

The bigram counting problem is an example worked out previously in an assignment with MapReduce. How can it be done in Spark? A rather complicated, though correct idea was sketched in class. Rather than just doing bigrams, this large example shows how trigrams can be counted using Spark RDD methods. It's quite complicated; some explanatory notes follow the code.

The logic of this code is based on something like a join operation. Words of the text are paired with index numbers, then each (word,index) pair is mapped to [ (index-1,(word,index)) , (index,(word,index)), (index+1,(word,index)) ]. After this, the magic of flatMap() and groupBy() builds an RDD in which there are elements (index,[(wordA,indexA),(wordB,indexB),(wordC,indexC)]). Such an element is the material to make a trigram: it has three items in the value part, though these may be out of order. Fortunately, by sorting on {indexA,indexB,indexC} the three words are put into the correct order, and we can map this messy thing into a single string trigram. After this step, the usual wordcount logic does the rest of the job.

Unfortunately this program is terribly slow. It ran for around five minutes to count all the trigrams.

Approximate Trigram Counting

Suppose the counts of trigrams don't need to be perfect. One thing to try is using the segsample() function defined above. It will get some portion of the database, and run the counting algorithm on a small size sample.

Another idea is to skip over counting trigrams that span partitions. (In fact, the bigram exercise using MapReduce also skipped counting bigrams that span splits.) Just skipping these cases results in a simple program that runs fast. Here is the code:

The program uses a trick that is available in Python (might be hard to do with Scala). Notice the variable windowMemory: it is defined once as an empty list. All the subsequent operations on windowMemory are mutations. Because they are mutations only (there is no other statement assigning to windowMemory), Python will treat windowMemory as a global variable. The value of windowMemory will persist between calls to trimap. It takes some knowledge of Python to know about this trick, but it makes possible the same kind of code that was used in the bigram assignment using MapReduce. The words of the text are fed, one by one, to trimap; and trimap emits a string containing three words (except for some initial cases). This seems perfect! But, it is not quite perfect. Understand that the map(trimap) method is invoked, in parallel, on multiple cores, one for each partition of the input data. Therefore, trigrams that would cross partitions are not counted.

Partition Counting of Trigrams

The explanation of map(trimap) talks about partitions, yet there is no explicit reference to partitions in the code (except for two statements that have been commented out). An equivalent program makes the connection to partitions clear.

The only difference between this and the previous program which has map(trimap) is that here it is mapPartitions(trimap) and the code for trimap uses the yield statement (which is usually skipped in beginning Python courses and textbooks). Whereas map(v) expects v to be an element of an rdd, the method for mapPartitions, say trimap(p), expects p to be an iterator of all the elements of a partition. Therefore trimap(p) should yield all the elements it needs to generate rather than return (a return statement would end execution of trimap too soon).

Dataframe and CSV

This example was presented in class, based on the government dataset thads.csv introduced in the Spark page.

The python code was put into a file thads.py. Because it is laborious to type in the "--packages com.databricks..." part, the in-class demonstration also used a script. The following two lines were put into a file named ss.

The first line indicates this is a Bash (Unix shell) script, and the second line has the placeholder "$1" at the end of the line. After putting those two lines into file, this command makes the script ss executable:

Now, from a shell, this command runs the python script:

This is like running ss as a command, in the current directory, with thads.py as argument to ss; the argument thads.py will be substituted for $1 so that what actually runs is like typing in

It is a bit time-consuming to create ss and set things up to use it, though in a debugging cycle of development, such a script would be used many times, and it is helpful to save keystrokes in repetitive work.

Parallel Prefix Sums

It was shown in a lecture how the ideas of a prefix sum are surprisingly useful for a variety of tasks, even related to the task of making a cumulative frequency distribution. The question here is, how can this be done using Spark? Would the ideas of the trigram examples above be enough to solve this problem?

The problem is to calculate the sequence x[0], x[0]+x[1], x[0]+x[1]+x[2], ... efficiently and in parallel, where the input is a file containing x[i], where i ranges over [0,n-1]. The program presented here uses some features of Spark not found in the previous examples: sc.broadcast and RDD.mapPartitionsWithIndex, which enable a program to collect limited information from all partitions, then distribute a dictionary of this information to all workers. This way of programming resembles typical asynchronous parallel programming, which is organized as phases with barrier synchronization.

The parallel algorithm for parallel prefix is, from a high level perspective, the same that's used in a synchronous PRAM architecture. There would be log(n) phases, each phase accumulating larger sums. Also, there is a divide-and-conquer theme of the program. Roughly described, it divides the input into two halves, does prefix sums on each half of the input, the gets the last element of the left half and adds that element to everything in the right half. Recursively, to calculate for a half of the input, it is divided into two quarters, and so on -- that's why it is a log(n) phase program. Technically, rather than going top-down (dividing into halves first), the order is reversed: begin with parts of length 2 in the first phase, then parts of length 4 in the second phase, and so on.

The difference between the PRAM-like strategy and the Spark program is that, before the first phase, a prefix-sum is done separately in each partition. This is a fast operation. Then the log(m) phases start, where m is the number of partitions. In each phase, selected partitions get the last element of some previous partition and add that element to everything in their partitions.

'''
 The first eight lines set up the Spark Context
'''
from pyspark import SparkConf, SparkContext 
import subprocess, sys, random
conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("My Letter Counting Application")
conf.set("spark.executor.memory","2g")
sc = SparkContext(conf = conf)

def basePrefix(baseRDD,prefixOp):
  '''
  Return an RDD obtained from baseRDD by calculating
  a prefix-sum, using the provided prefixOp,
  on a partition-basis (each partition does its own 
  prefix-sum to get the new RDD) 
  '''
  def prefix_part(part_iter):
    '''
    LOCAL function used within a mapPartitions() 
    to carry out the prefix-sum calculation
    '''
    total = None
    for element in part_iter:
      if total == None: 
        total = element
        yield element
      else:
        total = prefixOp(total,element)
        yield total

  newRDD = baseRDD.mapPartitions(prefix_part)
  return newRDD

def lastElements(curRDD):
  '''
  Return a dictionary which maps partition number to 
  the value of the last element in that partition
  '''
  def selast(part_no,part_iter):
    '''
    LOCAL function to yield only the last element
    in a partition (invoked by mapPartitionWithIndex) 
    '''
    for element in part_iter: last = element
    yield (part_no,last)

  finalsRDD = curRDD.mapPartitionsWithIndex(selast)
  return dict(finalsRDD.collect())

def phasePrefix(curRDD,phase_no,prefixOp):
  '''
  One phase of a repeated calculation of prefix, accross partitions.
  Input: curRDD, the current RDD from the previous phase
  Input: phase_no, presumably in the range [1,maxphase] 
  Input: prefixOp, the two-argument operator for prefix-sum
  Output: newRDD for the next iteration             

  Note: this functions works accross iterations by first 
  obtaining a dictionary lastElements(curRDD) which maps 
  each partition number to the value of the last element 
  in that partition -- this is a reasonably small dictionary.
  Then this dictionary, collected at the master machine, is 
  broadcast to all worker machines using sc.broadcast(), 
  which hopefull makes the dictionary available to each 
  partition running a mapPartitions() method.

  Two critical formulas are used. 
   (1) which partitions are "active" in this phase, 
       meaning they will add a value to all their
       elements for the next phase; the formula is
         i in range(number of partitions) 
         provided i%(2**phase_no) >= (2**phase_no/2)
       this formula essentially describes the 
       "right half" of a group of partitions
   (2) there is a formula to find the last 
       partition in the "left half", because its 
       last element is needed by the actives
        source = (part_no // 2**phase_no) * 2**phase_no 
                 + (2**(phase_no-1)) - 1
  '''
  curTotals = lastElements(curRDD)  # get last elements of each partition
  bdict = sc.broadcast(curTotals)   # broadcast dictionary in the cluster
  P = curRDD.getNumPartitions()     # with this many partitions
  d = 2**phase_no
  actives = [ i for i in range(P) if i%d >= d/2 ] 
  def sub_prefix(part_no,part_iter):
    '''
    LOCAL function used in mapPartitionsWithIndex(), 
    it will add the value from the source partition's last
    element to all elements in this partition 
    '''
    source_partition = (part_no // 2**phase_no) * 2**phase_no + (2**(phase_no-1)) - 1
    for element in part_iter:  
      if part_no not in actives:    
         yield element
      else:
         v = bdict.value[source_partition]
         yield prefixOp(v,element)

  newRDD = curRDD.mapPartitionsWithIndex(sub_prefix)
  return newRDD

def runPhasePrefix(baseRDD,prefixOp):
  from math import log
  # calculate number of phases to be rounded up logarithm, base 2, 
  # of the number of partitions in baseRDD; this number of partitions
  # will be constant throughout the calculation
  phases = int( log(baseRDD.getNumPartitions(),2) + 0.5 )
  # before the loop of phases, let each partition run its own
  # individual prefix sum on the elements it has
  curRDD = basePrefix(baseRDD,prefixOp)
  # iterate for i = 1, 2, 3, ... phases
  for i in range(1,phases+1): 
    curRDD = phasePrefix(curRDD,i,prefixOp) 
  return curRDD
  
def test():
  sampleRDD = sc.parallelize([1]*1000,128) # try 128 partitions 
  from operator import add
  outputRDD = runPhasePrefix(sampleRDD,add)
  testresult = outputRDD.collect()
  assert sorted(testresult) == range(1,1001)

test()

print ( "**** Done, test passed ****" )