Differences between revisions 4 and 5
Revision 4 as of 2017-03-12 13:30:45
Size: 22664
Editor: Ted Herman
Comment:
Revision 5 as of 2017-03-12 13:32:22
Size: 22645
Editor: Ted Herman
Comment:
Deletions are marked like this. Additions are marked like this.
Line 332: Line 332:
 The first eight lines set up the Spark Context  The first seven lines set up the Spark Context
Line 338: Line 338:
conf.setAppName("My Letter Counting Application") conf.setAppName("prefix sums")

Spark Examples

This page has exercises, most presented or at least sketched in class.

Count Unusual Values

The problem is to count how many values in the eegsample dataset are not within two standard deviations of the mean. A program for this was developed in class, essentially consisting of the following steps.

  •  rawRDD = sc.textFile("/user/hadoop/eegsample")
     wordsRDD = rawRDD.flatMap(lambda line: line.split())
     numRDD = wordsRDD.map(lambda word: float(word))
     mu = numRDD.mean()
     sigma = numRDD.stdev()
     wantedRDD = numRDD.filter(lambda e: e<mu-2*sigma or e>mu+2*sigma)
     print (wantedRDD.count())

This code took around 30 seconds to run. To get something that runs faster, sampling was suggested: by changing the initial line to the following, the program ran quite fast (sampling just 1% of the dataset).

  •  rawRDD = sc.textFile("/user/hadoop/eegsample").sample(False,0.01)

Another demonstration for this problem put all the code into a file wanted.py, along with some initial lines to create the Spark Context (sc) variable. This file was copied, using the scp command, to the practice machine. Then the following command was demonstrated:

  •  $ spark-submit wanted.py 2> log > hold

What this command does is run the wanted.py using spark-submit, but it puts the many informational (and error) messages into a file named "log" whereas the printed output goes to a file named "hold". This can be handy because scp can be used to copy log and/or hold to your local computer for study.

Bigrams and Trigrams

The bigram counting problem is an example worked out previously in an assignment with MapReduce. How can it be done in Spark? A rather complicated, though correct idea was sketched in class. Rather than just doing bigrams, this large example shows how trigrams can be counted using Spark RDD methods. It's quite complicated; some explanatory notes follow the code.

  •  '''
     The first eight lines set up the Spark Context
     and erase any existing HDFS output
     '''
     from pyspark import SparkConf, SparkContext 
     import subprocess, sys, random
     conf = SparkConf()
     conf.setMaster("yarn-client")
     conf.setAppName("My Letter Counting Application")
     conf.set("spark.executor.memory","2g")
     sc = SparkContext(conf = conf)
     subprocess.call("hdfs dfs -rm -r output",shell=True)
    
     def segsample(anRDD,percent):
      '''
      Spark's RDD sample() method just returns randomly 
      chosen elements of the RDD (with or without replacement);
      but what if we would like a random sequential portion
      of an RDD? This function does something like that. Input
      is an RDD and a percentage (number in range 0-100), and
      the function returns a new RDD which preserves the 
      sequential order of the elements it returns
      '''
      N = anRDD.count()
      size = int((percent/100.0)*N)
      Limi = N-size
      minL = random.randint(0,Limi)
      maxL = minL + size
      tempRDD = anRDD.zipWithIndex()
      anRDD.unpersist()
      pairRDD = tempRDD.filter(lambda v: minL<=v[1]<maxL)
      sampRDD = pairRDD.map(lambda v: v[0])
      pairRDD.unpersist()
      return sampRDD
    
     def toWords(hdfsDir,percent=100.0):
      '''
      Turn input dataset in HDFS directory int an RDD
      of words by splitting and flattening; optional "percent"
      paramter will just sample the input dataset by the 
      desired percentage
      ''' 
      text_file = sc.textFile(hdfsDir) # splits are in here
      text_file = text_file.map(lambda line: line.encode('ascii','ignore')) # just use ascii
      if percent < 100.0:
         sampleRDD = segsample(text_file,percent) 
      else:
         sampleRDD = text_file
      wordsRDD = sampleRDD.flatMap(lambda line: line.split(" ")) 
      return wordsRDD 
    
     def window(baseRDD):
      '''
      Return a new RDD consisting of all possible segments of 3 
      consecutive items; and the new RDD should preserve order 
      also. Thus, if baseRDD = ["one","two","three","four","five"]
      the new RDD will be 
         [("one","two","three"),("two","three","four"),
          ("three","four","five")] 
      but it's more complicated when the baseRDD has pairs instead
      of words - it still works anyway
      '''
      def destinations(i):
        '''
        LOCAL function destinations(i) 
        - given i, the index of an element in some RDD, 
        generate a window of length 3 with i at its center.
        Examples:
          4 -> (3,4,5)       
          98 -> (97,98,99) 
          0 -> (-1,0,1)    # there is no element -1
        '''
        left =  [i-j for j in range(1,1+3//2)]  # indices to "left" of i
        right = [i+j for j in range(1,1+3//2)]  # indices to "right" of i 
        r = [j for j in left+[i]+right] 
        return tuple(r)
    
      def spread(v):
        '''
        LOCAL function spread(v)
        - given v which is a pair (value,index), generate 
        a tuple with pairs (t,v) for each element 
        in destinations(index); thus for instance
        spread(("some",40)) will return 
         ( (39,("some",40)), (40,("some",40)), (41,("some",40)) )
        '''
        assert len(v) == 2
        value,index = v 
        out = [(t,v) for t in destinations(index)]
        return tuple(out)
    
      def segmented(v):
        '''
        LOCAL function used to map collected neighboring values
        into a tuple, where the values are like those generated
        by the spread(v) function; for example:
         (38,[("b",37),("c",39),("d",38)]) -> (38,["b","d","c"]) 
        Notice how the order in the result comes from the index
        order of the pairs, like in sorted order.          
        '''
        ind,valist = v  # ind is an index number; valist is a 
                        # list of tuples, or maybe an iterable
                        # of tuples like the comment above says
        # sort with respect to the second item of all the tuples in valist
        sortvalist = sorted(valist,key=lambda x: x[1]) 
        w = [ a for a,b in sortvalist ] # just the values please
        return (ind,w)
    
      # finally we are coding up the window(baseRDD) function
      indRDD = baseRDD.zipWithIndex() # get ("one",0) ("two",1) etc
      spreadRDD = indRDD.flatMap(lambda v: spread(v))
      rawWinRDD = spreadRDD.groupByKey() # the window values, but unsorted
      winDowRDD = rawWinRDD.map(lambda v: segmented(v)) # now they are sorted
      return winDowRDD
    
     def trigrams(hdfsloc): 
      '''
      Count all the trigrams in a dataset of text, using functions
      defined above to do the different parts of extracting trigrams
    
      Return the top 10 trigrams and their counts
      '''
      def justWords(w):
        '''
        LOCAL function used in an RDD map(). The situation is that
        w is an RDD element that looks like 
           (38,["b","d","c"]) 
        Except in a few cases, there could be one or two words instead
        of three - normally it is three words. The mapping should 
        return "b d c" which is the trigram to count later
        '''
        wordlist = w[1]  
        return ' '.join(wordlist) 
    
      wordsRDD = toWords(hdfsloc)
      tripleRDD = window(wordsRDD) # get elements which are triples 
      trigramRDD = tripleRDD.map(justWords)
      trigramRDD = trigramRDD.filter(lambda s: len(s.split())==3) # throw out bad cases
      # now use the standard word-count pattern for Spark
      tricountRDD = trigramRDD.map(lambda w: (w,1))
      trisumRDD = tricountRDD.reduceByKey(lambda a,b: a+b)
      # sort the RDD in reverse order, most frequent at beginning
      sortedTriRDD = trisumRDD.sortBy(keyfunc=(lambda w: w[1]),ascending=False) 
      return sortedTriRDD.take(10) 
    
     Z = trigrams("file:///home/herman/Spark/ulysses")
     print (Z)

The logic of this code is based on something like a join operation. Words of the text are paired with index numbers, then each (word,index) pair is mapped to [ (index-1,(word,index)) , (index,(word,index)), (index+1,(word,index)) ]. After this, the magic of flatMap() and groupBy() builds an RDD in which there are elements (index,[(wordA,indexA),(wordB,indexB),(wordC,indexC)]). Such an element is the material to make a trigram: it has three items in the value part, though these may be out of order. Fortunately, by sorting on {indexA,indexB,indexC} the three words are put into the correct order, and we can map this messy thing into a single string trigram. After this step, the usual wordcount logic does the rest of the job.

Unfortunately this program is terribly slow. It ran for around five minutes to count all the trigrams.

Approximate Trigram Counting

Suppose the counts of trigrams don't need to be perfect. One thing to try is using the segsample() function defined above. It will get some portion of the database, and run the counting algorithm on a small size sample.

Another idea is to skip over counting trigrams that span partitions. (In fact, the bigram exercise using MapReduce also skipped counting bigrams that span splits.) Just skipping these cases results in a simple program that runs fast. Here is the code:

  •  def toWords(hdfsDir):
      '''
      Turn input dataset in HDFS directory int an RDD
      of words by splitting and flattening; optional "percent"
      parameter will just sample the input dataset by the 
      desired percentage
      ''' 
      text_file = sc.textFile(hdfsDir) # splits are in here
      text_file = text_file.map(lambda line: line.encode('ascii','ignore')) # just use ascii
      #text_file = text_file.coalesce(1)      USED FOR DEMONSTRATION PURPOSES IN CLASS
      #text_file = text_file.repartition(4)   JUST USED OR DEMONSTRATION
      wordsRDD = text_file.flatMap(lambda line: line.split(" ")) 
      return wordsRDD 
    
     windowMemory = []  # this is a list of "previous" words, empty at first
    
     def window(baseRDD):
      '''
      Return a new RDD consisting of all possible segments of 3 
      consecutive items; 
      also. Thus, if baseRDD = ["one","two","three","four","five"]
      the new RDD will be 
         [("one","two","three"),("two","three","four"),
          ("three","four","five")] 
      '''
      def trimap(word):
        '''
        LOCAL mapping function used within an RDD map() method,
        to map each word into a list of up to three words.      
        '''
        if len(windowMemory) < 3:
          windowMemory.append(word)
        else:
          windowMemory.pop(0)  # remove first word
          windowMemory.append(word) # put new word on the end
        return ' '.join(windowMemory)
     
      tripleRDD = baseRDD.map(trimap)
      return tripleRDD
    
     def trigrams(hdfsloc): 
      '''
      Count all the trigrams in a dataset of text, using functions
      defined above to do the different parts of extracting trigrams
    
      Return the top 10 trigrams and their counts
      '''
      wordsRDD = toWords(hdfsloc)
      tripleRDD = window(wordsRDD) # get elements which are triples 
      trigramRDD = tripleRDD.filter(lambda w: len(w.split())==3) # remove bad cases
      # now use the standard word-count pattern for Spark
      tricountRDD = trigramRDD.map(lambda w: (w,1))
      trisumRDD = tricountRDD.reduceByKey(lambda a,b: a+b)
      # sort the RDD in reverse order, most frequent at beginning
      sortedTriRDD = trisumRDD.sortBy(keyfunc=(lambda w: w[1]),ascending=False) 
      return sortedTriRDD.take(10) 

The program uses a trick that is available in Python (might be hard to do with Scala). Notice the variable windowMemory: it is defined once as an empty list. All the subsequent operations on windowMemory are mutations. Because they are mutations only (there is no other statement assigning to windowMemory), Python will treat windowMemory as a global variable. The value of windowMemory will persist between calls to trimap. It takes some knowledge of Python to know about this trick, but it makes possible the same kind of code that was used in the bigram assignment using MapReduce. The words of the text are fed, one by one, to trimap; and trimap emits a string containing three words (except for some initial cases). This seems perfect! But, it is not quite perfect. Understand that the map(trimap) method is invoked, in parallel, on multiple cores, one for each partition of the input data. Therefore, trigrams that would cross partitions are not counted.

Partition Counting of Trigrams

The explanation of map(trimap) talks about partitions, yet there is no explicit reference to partitions in the code (except for two statements that have been commented out). An equivalent program makes the connection to partitions clear.

  •  windowMemory = []  # this is a list of "previous" words, empty at first
    
     def window(baseRDD):
      '''
      Return a new RDD consisting of all possible segments of 3 
      consecutive items; 
      also. Thus, if baseRDD = ["one","two","three","four","five"]
      the new RDD will be 
         [("one","two","three"),("two","three","four"),
          ("three","four","five")] 
      '''
      def trimap(partitionelements):
        '''
        LOCAL mapping function to be invoked through the 
        mapPartitions() method.    
        '''
        for word in partitionelements:
          if len(windowMemory) < 3:
            windowMemory.append(word)
          else:
            windowMemory.pop(0)  # remove first word
            windowMemory.append(word) # put new word on the end
          yield ' '.join(windowMemory)
     
      return baseRDD.mapPartitions(trimap) 

The only difference between this and the previous program which has map(trimap) is that here it is mapPartitions(trimap) and the code for trimap uses the yield statement (which is usually skipped in beginning Python courses and textbooks). Whereas map(v) expects v to be an element of an rdd, the method for mapPartitions, say trimap(p), expects p to be an iterator of all the elements of a partition. Therefore trimap(p) should yield all the elements it needs to generate rather than return (a return statement would end execution of trimap too soon).

Dataframe and CSV

This example was presented in class, based on the government dataset thads.csv introduced in the Spark page.

  •  # run this with spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 
     import sys # because using sys.stdout instead of print, just to demo
     from pyspark.sql import SparkSession
     spark = SparkSession.builder.appName("Pyspark SQL Demo").getOrCreate()
     thads = spark.read.load("/user/hadoop/thads.csv",format='com.databricks.spark.csv',header="true",inferSchema="true")  
     sys.stdout.write("thads.csv file read into dataframe\n")
     sys.stdout.write("Row count = {0}\n".format(thads.count()))
     sys.stdout.write("Column names = {0}\n".format(thads.columns))
     sys.stdout.write("First row = {0}\n".format(thads.head(1)))
     sys.stdout.write("Selected fields, first four rows = {0}\n".format(
       thads.select('CONTROL','TYPE','AGE1','BUILT').show(4)))
     sys.stdout.write("Selected fields, summary statistics = {0}\n".format(
       thads.select('CONTROL','TYPE','AGE1','BUILT').describe().show()))

The python code was put into a file thads.py. Because it is laborious to type in the "--packages com.databricks..." part, the in-class demonstration also used a script. The following two lines were put into a file named ss.

  •  #!/usr/bin/sh
     spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 $1 

The first line indicates this is a Bash (Unix shell) script, and the second line has the placeholder "$1" at the end of the line. After putting those two lines into file, this command makes the script ss executable:

  •  $ chmod +x ss

Now, from a shell, this command runs the python script:

  •  $ ./ss thads.py

This is like running ss as a command, in the current directory, with thads.py as argument to ss; the argument thads.py will be substituted for $1 so that what actually runs is like typing in

  •  spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 thads.py

It is a bit time-consuming to create ss and set things up to use it, though in a debugging cycle of development, such a script would be used many times, and it is helpful to save keystrokes in repetitive work.

Parallel Prefix Sums

It was shown in a lecture how the ideas of a prefix sum are surprisingly useful for a variety of tasks, even related to the task of making a cumulative frequency distribution. The question here is, how can this be done using Spark? Would the ideas of the trigram examples above be enough to solve this problem?

The problem is to calculate the sequence x[0], x[0]+x[1], x[0]+x[1]+x[2], ... efficiently and in parallel, where the input is a file containing x[i], where i ranges over [0,n-1]. The program presented here uses some features of Spark not found in the previous examples: sc.broadcast and RDD.mapPartitionsWithIndex, which enable a program to collect limited information from all partitions, then distribute a dictionary of this information to all workers. This way of programming resembles typical asynchronous parallel programming, which is organized as phases with barrier synchronization.

The parallel algorithm for parallel prefix is, from a high level perspective, the same that's used in a synchronous PRAM architecture. There would be log(n) phases, each phase accumulating larger sums. Also, there is a divide-and-conquer theme of the program. Roughly described, it divides the input into two halves, does prefix sums on each half of the input, the gets the last element of the left half and adds that element to everything in the right half. Recursively, to calculate for a half of the input, it is divided into two quarters, and so on -- that's why it is a log(n) phase program. Technically, rather than going top-down (dividing into halves first), the order is reversed: begin with parts of length 2 in the first phase, then parts of length 4 in the second phase, and so on.

The difference between the PRAM-like strategy and the Spark program is that, before the first phase, a prefix-sum is done separately in each partition. This is a fast operation. Then the log(m) phases start, where m is the number of partitions. In each phase, selected partitions get the last element of some previous partition and add that element to everything in their partitions.

'''
 The first seven lines set up the Spark Context
'''
from pyspark import SparkConf, SparkContext 
import subprocess, sys, random
conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("prefix sums")
conf.set("spark.executor.memory","2g")
sc = SparkContext(conf = conf)

def basePrefix(baseRDD,prefixOp):
  '''
  Return an RDD obtained from baseRDD by calculating
  a prefix-sum, using the provided prefixOp,
  on a partition-basis (each partition does its own 
  prefix-sum to get the new RDD) 
  '''
  def prefix_part(part_iter):
    '''
    LOCAL function used within a mapPartitions() 
    to carry out the prefix-sum calculation
    '''
    total = None
    for element in part_iter:
      if total == None: 
        total = element
        yield element
      else:
        total = prefixOp(total,element)
        yield total

  newRDD = baseRDD.mapPartitions(prefix_part)
  return newRDD

def lastElements(curRDD):
  '''
  Return a dictionary which maps partition number to 
  the value of the last element in that partition
  '''
  def selast(part_no,part_iter):
    '''
    LOCAL function to yield only the last element
    in a partition (invoked by mapPartitionWithIndex) 
    '''
    for element in part_iter: last = element
    yield (part_no,last)

  finalsRDD = curRDD.mapPartitionsWithIndex(selast)
  return dict(finalsRDD.collect())

def phasePrefix(curRDD,phase_no,prefixOp):
  '''
  One phase of a repeated calculation of prefix, accross partitions.
  Input: curRDD, the current RDD from the previous phase
  Input: phase_no, presumably in the range [1,maxphase] 
  Input: prefixOp, the two-argument operator for prefix-sum
  Output: newRDD for the next iteration             

  Note: this functions works accross iterations by first 
  obtaining a dictionary lastElements(curRDD) which maps 
  each partition number to the value of the last element 
  in that partition -- this is a reasonably small dictionary.
  Then this dictionary, collected at the master machine, is 
  broadcast to all worker machines using sc.broadcast(), 
  which hopefull makes the dictionary available to each 
  partition running a mapPartitions() method.

  Two critical formulas are used. 
   (1) which partitions are "active" in this phase, 
       meaning they will add a value to all their
       elements for the next phase; the formula is
         i in range(number of partitions) 
         provided i%(2**phase_no) >= (2**phase_no/2)
       this formula essentially describes the 
       "right half" of a group of partitions
   (2) there is a formula to find the last 
       partition in the "left half", because its 
       last element is needed by the actives
        source = (part_no // 2**phase_no) * 2**phase_no 
                 + (2**(phase_no-1)) - 1
  '''
  curTotals = lastElements(curRDD)  # get last elements of each partition
  bdict = sc.broadcast(curTotals)   # broadcast dictionary in the cluster
  P = curRDD.getNumPartitions()     # with this many partitions
  d = 2**phase_no
  actives = [ i for i in range(P) if i%d >= d/2 ] 
  def sub_prefix(part_no,part_iter):
    '''
    LOCAL function used in mapPartitionsWithIndex(), 
    it will add the value from the source partition's last
    element to all elements in this partition 
    '''
    source_partition = (part_no // 2**phase_no) * 2**phase_no + (2**(phase_no-1)) - 1
    for element in part_iter:  
      if part_no not in actives:    
         yield element
      else:
         v = bdict.value[source_partition]
         yield prefixOp(v,element)

  newRDD = curRDD.mapPartitionsWithIndex(sub_prefix)
  return newRDD

def runPhasePrefix(baseRDD,prefixOp):
  from math import log
  # calculate number of phases to be rounded up logarithm, base 2, 
  # of the number of partitions in baseRDD; this number of partitions
  # will be constant throughout the calculation
  phases = int( log(baseRDD.getNumPartitions(),2) + 0.5 )
  # before the loop of phases, let each partition run its own
  # individual prefix sum on the elements it has
  curRDD = basePrefix(baseRDD,prefixOp)
  # iterate for i = 1, 2, 3, ... phases
  for i in range(1,phases+1): 
    curRDD = phasePrefix(curRDD,i,prefixOp) 
  return curRDD
  
def test():
  sampleRDD = sc.parallelize([1]*1000,128) # try 128 partitions 
  from operator import add
  outputRDD = runPhasePrefix(sampleRDD,add)
  testresult = outputRDD.collect()
  assert sorted(testresult) == range(1,1001)

test()

print ( "**** Done, test passed ****" )

SparkExamples (last edited 2017-03-23 14:45:17 by Ted Herman)