Contents

## Spark Examples

This page has exercises, most presented or at least sketched in class.

## Count Unusual Values

The problem is to count how many values in the eegsample dataset are not within two standard deviations of the mean. A program for this was developed in class, essentially consisting of the following steps.

rawRDD = sc.textFile("/user/hadoop/eegsample") wordsRDD = rawRDD.flatMap(lambda line: line.split()) numRDD = wordsRDD.map(lambda word: float(word)) mu = numRDD.mean() sigma = numRDD.stdev() wantedRDD = numRDD.filter(lambda e: e<mu-2*sigma or e>mu+2*sigma) print (wantedRDD.count())

This code took around 30 seconds to run. To get something that runs faster, sampling was suggested: by changing the initial line to the following, the program ran quite fast (sampling just 1% of the dataset).

rawRDD = sc.textFile("/user/hadoop/eegsample").sample(False,0.01)

Another demonstration for this problem put all the code into a file wanted.py, along with some initial lines to create the Spark Context (sc) variable. This file was copied, using the scp command, to the practice machine. Then the following command was demonstrated:

$ spark-submit wanted.py 2> log > hold

What this command does is run the wanted.py using spark-submit, but it puts the many informational (and error) messages into a file named "log" whereas the printed output goes to a file named "hold". This can be handy because scp can be used to copy log and/or hold to your local computer for study.

## Bigrams and Trigrams

The bigram counting problem is an example worked out previously in an assignment with MapReduce. How can it be done in Spark? A rather complicated, though correct idea was sketched in class. Rather than just doing bigrams, this large example shows how trigrams can be counted using Spark RDD methods. It's quite complicated; some explanatory notes follow the code.

''' The first eight lines set up the Spark Context and erase any existing HDFS output ''' from pyspark import SparkConf, SparkContext import subprocess, sys, random conf = SparkConf() conf.setMaster("yarn-client") conf.setAppName("My Letter Counting Application") conf.set("spark.executor.memory","2g") sc = SparkContext(conf = conf) subprocess.call("hdfs dfs -rm -r output",shell=True) def segsample(anRDD,percent): ''' Spark's RDD sample() method just returns randomly chosen elements of the RDD (with or without replacement); but what if we would like a random sequential portion of an RDD? This function does something like that. Input is an RDD and a percentage (number in range 0-100), and the function returns a new RDD which preserves the sequential order of the elements it returns ''' N = anRDD.count() size = int((percent/100.0)*N) Limi = N-size minL = random.randint(0,Limi) maxL = minL + size tempRDD = anRDD.zipWithIndex() anRDD.unpersist() pairRDD = tempRDD.filter(lambda v: minL<=v[1]<maxL) sampRDD = pairRDD.map(lambda v: v[0]) pairRDD.unpersist() return sampRDD def toWords(hdfsDir,percent=100.0): ''' Turn input dataset in HDFS directory int an RDD of words by splitting and flattening; optional "percent" paramter will just sample the input dataset by the desired percentage ''' text_file = sc.textFile(hdfsDir) # splits are in here text_file = text_file.map(lambda line: line.encode('ascii','ignore')) # just use ascii if percent < 100.0: sampleRDD = segsample(text_file,percent) else: sampleRDD = text_file wordsRDD = sampleRDD.flatMap(lambda line: line.split(" ")) return wordsRDD def window(baseRDD): ''' Return a new RDD consisting of all possible segments of 3 consecutive items; and the new RDD should preserve order also. Thus, if baseRDD = ["one","two","three","four","five"] the new RDD will be [("one","two","three"),("two","three","four"), ("three","four","five")] but it's more complicated when the baseRDD has pairs instead of words - it still works anyway ''' def destinations(i): ''' LOCAL function destinations(i) - given i, the index of an element in some RDD, generate a window of length 3 with i at its center. Examples: 4 -> (3,4,5) 98 -> (97,98,99) 0 -> (-1,0,1) # there is no element -1 ''' left = [i-j for j in range(1,1+3//2)] # indices to "left" of i right = [i+j for j in range(1,1+3//2)] # indices to "right" of i r = [j for j in left+[i]+right] return tuple(r) def spread(v): ''' LOCAL function spread(v) - given v which is a pair (value,index), generate a tuple with pairs (t,v) for each element in destinations(index); thus for instance spread(("some",40)) will return ( (39,("some",40)), (40,("some",40)), (41,("some",40)) ) ''' assert len(v) == 2 value,index = v out = [(t,v) for t in destinations(index)] return tuple(out) def segmented(v): ''' LOCAL function used to map collected neighboring values into a tuple, where the values are like those generated by the spread(v) function; for example: (38,[("b",37),("c",39),("d",38)]) -> (38,["b","d","c"]) Notice how the order in the result comes from the index order of the pairs, like in sorted order. ''' ind,valist = v # ind is an index number; valist is a # list of tuples, or maybe an iterable # of tuples like the comment above says # sort with respect to the second item of all the tuples in valist sortvalist = sorted(valist,key=lambda x: x[1]) w = [ a for a,b in sortvalist ] # just the values please return (ind,w) # finally we are coding up the window(baseRDD) function indRDD = baseRDD.zipWithIndex() # get ("one",0) ("two",1) etc spreadRDD = indRDD.flatMap(lambda v: spread(v)) rawWinRDD = spreadRDD.groupByKey() # the window values, but unsorted winDowRDD = rawWinRDD.map(lambda v: segmented(v)) # now they are sorted return winDowRDD def trigrams(hdfsloc): ''' Count all the trigrams in a dataset of text, using functions defined above to do the different parts of extracting trigrams Return the top 10 trigrams and their counts ''' def justWords(w): ''' LOCAL function used in an RDD map(). The situation is that w is an RDD element that looks like (38,["b","d","c"]) Except in a few cases, there could be one or two words instead of three - normally it is three words. The mapping should return "b d c" which is the trigram to count later ''' wordlist = w[1] return ' '.join(wordlist) wordsRDD = toWords(hdfsloc) tripleRDD = window(wordsRDD) # get elements which are triples trigramRDD = tripleRDD.map(justWords) trigramRDD = trigramRDD.filter(lambda s: len(s.split())==3) # throw out bad cases # now use the standard word-count pattern for Spark tricountRDD = trigramRDD.map(lambda w: (w,1)) trisumRDD = tricountRDD.reduceByKey(lambda a,b: a+b) # sort the RDD in reverse order, most frequent at beginning sortedTriRDD = trisumRDD.sortBy(keyfunc=(lambda w: w[1]),ascending=False) return sortedTriRDD.take(10) Z = trigrams("file:///home/herman/Spark/ulysses") print (Z)

The logic of this code is based on something like a join operation. Words of the text are paired with index numbers, then each (word,index) pair is mapped to [ (index-1,(word,index)) , (index,(word,index)), (index+1,(word,index)) ]. After this, the magic of flatMap() and groupBy() builds an RDD in which there are elements (index,[(wordA,indexA),(wordB,indexB),(wordC,indexC)]). Such an element is the material to make a trigram: it has three items in the value part, though these may be out of order. Fortunately, by sorting on {indexA,indexB,indexC} the three words are put into the correct order, and we can map this messy thing into a single string trigram. After this step, the usual wordcount logic does the rest of the job.

Unfortunately this program is terribly slow. It ran for around five minutes to count all the trigrams.

## Approximate Trigram Counting

Suppose the counts of trigrams don't need to be perfect. One thing to try is using the segsample() function defined above. It will get some portion of the database, and run the counting algorithm on a small size sample.

Another idea is to skip over counting trigrams that span partitions. (In fact, the bigram exercise using MapReduce also skipped counting bigrams that span splits.) Just skipping these cases results in a simple program that runs fast. Here is the code:

def toWords(hdfsDir): ''' Turn input dataset in HDFS directory int an RDD of words by splitting and flattening; optional "percent" parameter will just sample the input dataset by the desired percentage ''' text_file = sc.textFile(hdfsDir) # splits are in here text_file = text_file.map(lambda line: line.encode('ascii','ignore')) # just use ascii #text_file = text_file.coalesce(1) USED FOR DEMONSTRATION PURPOSES IN CLASS #text_file = text_file.repartition(4) JUST USED OR DEMONSTRATION wordsRDD = text_file.flatMap(lambda line: line.split(" ")) return wordsRDD windowMemory = [] # this is a list of "previous" words, empty at first def window(baseRDD): ''' Return a new RDD consisting of all possible segments of 3 consecutive items; also. Thus, if baseRDD = ["one","two","three","four","five"] the new RDD will be [("one","two","three"),("two","three","four"), ("three","four","five")] ''' def trimap(word): ''' LOCAL mapping function used within an RDD map() method, to map each word into a list of up to three words. ''' if len(windowMemory) < 3: windowMemory.append(word) else: windowMemory.pop(0) # remove first word windowMemory.append(word) # put new word on the end return ' '.join(windowMemory) tripleRDD = baseRDD.map(trimap) return tripleRDD def trigrams(hdfsloc): ''' Count all the trigrams in a dataset of text, using functions defined above to do the different parts of extracting trigrams Return the top 10 trigrams and their counts ''' wordsRDD = toWords(hdfsloc) tripleRDD = window(wordsRDD) # get elements which are triples trigramRDD = tripleRDD.filter(lambda w: len(w.split())==3) # remove bad cases # now use the standard word-count pattern for Spark tricountRDD = trigramRDD.map(lambda w: (w,1)) trisumRDD = tricountRDD.reduceByKey(lambda a,b: a+b) # sort the RDD in reverse order, most frequent at beginning sortedTriRDD = trisumRDD.sortBy(keyfunc=(lambda w: w[1]),ascending=False) return sortedTriRDD.take(10)

The program uses a trick that is available in Python (might be hard to do with Scala). Notice the variable windowMemory: it is defined once as an empty list. All the subsequent operations on windowMemory are mutations. Because they are mutations only (there is no other statement assigning to windowMemory), Python will treat windowMemory as a global variable. **The value of windowMemory will persist between calls to trimap.** It takes some knowledge of Python to know about this trick, but it makes possible the same kind of code that was used in the bigram assignment using MapReduce. The words of the text are fed, one by one, to trimap; and trimap emits a string containing three words (except for some initial cases). This seems perfect! But, it is not quite perfect. Understand that the map(trimap) method is invoked, in parallel, on multiple cores, one for each partition of the input data. Therefore, trigrams that would cross partitions are not counted.

## Partition Counting of Trigrams

The explanation of map(trimap) talks about partitions, yet there is no explicit reference to partitions in the code (except for two statements that have been commented out). An equivalent program makes the connection to partitions clear.

windowMemory = [] # this is a list of "previous" words, empty at first def window(baseRDD): ''' Return a new RDD consisting of all possible segments of 3 consecutive items; also. Thus, if baseRDD = ["one","two","three","four","five"] the new RDD will be [("one","two","three"),("two","three","four"), ("three","four","five")] ''' def trimap(partitionelements): ''' LOCAL mapping function to be invoked through the mapPartitions() method. ''' for word in partitionelements: if len(windowMemory) < 3: windowMemory.append(word) else: windowMemory.pop(0) # remove first word windowMemory.append(word) # put new word on the end yield ' '.join(windowMemory) return baseRDD.mapPartitions(trimap)

The only difference between this and the previous program which has map(trimap) is that here it is mapPartitions(trimap) and the code for trimap uses the **yield** statement (which is usually skipped in beginning Python courses and textbooks). Whereas map(v) expects v to be an element of an rdd, the method for mapPartitions, say trimap(p), expects p to be an iterator of all the elements of a partition. Therefore trimap(p) should yield all the elements it needs to generate rather than return (a return statement would end execution of trimap too soon).

## Dataframe and CSV

This example was presented in class, based on the government dataset thads.csv introduced in the Spark page.

# run this with spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 import sys # because using sys.stdout instead of print, just to demo from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Pyspark SQL Demo").getOrCreate() thads = spark.read.load("/user/hadoop/thads.csv",format='com.databricks.spark.csv',header="true",inferSchema="true") sys.stdout.write("thads.csv file read into dataframe\n") sys.stdout.write("Row count = {0}\n".format(thads.count())) sys.stdout.write("Column names = {0}\n".format(thads.columns)) sys.stdout.write("First row = {0}\n".format(thads.head(1))) sys.stdout.write("Selected fields, first four rows = {0}\n".format( thads.select('CONTROL','TYPE','AGE1','BUILT').show(4))) sys.stdout.write("Selected fields, summary statistics = {0}\n".format( thads.select('CONTROL','TYPE','AGE1','BUILT').describe().show()))

The python code was put into a file thads.py. Because it is laborious to type in the "--packages com.databricks..." part, the in-class demonstration also used a script. The following two lines were put into a file named **ss**.

#!/usr/bin/sh spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 $1

The first line indicates this is a Bash (Unix shell) script, and the second line has the placeholder "$1" at the end of the line. After putting those two lines into file, this command makes the script **ss** executable:

$ chmod +x ss

Now, from a shell, this command runs the python script:

$ ./ss thads.py

This is like running ss as a command, in the current directory, with thads.py as argument to ss; the argument thads.py will be substituted for $1 so that what actually runs is like typing in

spark-submit --packages com.databricks:spark-csv_2.11:1.5.0 thads.py

It is a bit time-consuming to create ss and set things up to use it, though in a debugging cycle of development, such a script would be used many times, and it is helpful to save keystrokes in repetitive work.

## Parallel Prefix Sums

It was shown in a lecture how the ideas of a prefix sum are surprisingly useful for a variety of tasks, even related to the task of making a cumulative frequency distribution. The question here is, how can this be done using Spark? Would the ideas of the trigram examples above be enough to solve this problem?

The problem is to calculate the sequence x[0], x[0]+x[1], x[0]+x[1]+x[2], ... efficiently and in parallel, where the input is a file containing x[i], where i ranges over [0,n-1]. The program presented here uses some features of Spark not found in the previous examples: sc.broadcast and RDD.mapPartitionsWithIndex, which enable a program to collect limited information from all partitions, then distribute a dictionary of this information to all workers. This way of programming resembles typical asynchronous parallel programming, which is organized as phases with barrier synchronization.

The parallel algorithm for parallel prefix is, from a high level perspective, the same that's used in a synchronous PRAM architecture. There would be log(n) phases, each phase accumulating larger sums. Also, there is a divide-and-conquer theme of the program. Roughly described, it divides the input into two halves, does prefix sums on each half of the input, the gets the last element of the left half and adds that element to everything in the right half. Recursively, to calculate for a half of the input, it is divided into two quarters, and so on -- that's why it is a log(n) phase program. Technically, rather than going top-down (dividing into halves first), the order is reversed: begin with parts of length 2 in the first phase, then parts of length 4 in the second phase, and so on.

The difference between the PRAM-like strategy and the Spark program is that, before the first phase, a prefix-sum is done separately in each partition. This is a fast operation. Then the log(m) phases start, where m is the number of partitions. In each phase, selected partitions get the last element of some previous partition and add that element to everything in their partitions.

''' The first seven lines set up the Spark Context ''' from pyspark import SparkConf, SparkContext import subprocess, sys, random conf = SparkConf() conf.setMaster("yarn-client") conf.setAppName("prefix sums") conf.set("spark.executor.memory","2g") sc = SparkContext(conf = conf) def basePrefix(baseRDD,prefixOp): ''' Return an RDD obtained from baseRDD by calculating a prefix-sum, using the provided prefixOp, on a partition-basis (each partition does its own prefix-sum to get the new RDD) ''' def prefix_part(part_iter): ''' LOCAL function used within a mapPartitions() to carry out the prefix-sum calculation ''' total = None for element in part_iter: if total == None: total = element yield element else: total = prefixOp(total,element) yield total newRDD = baseRDD.mapPartitions(prefix_part) return newRDD def lastElements(curRDD): ''' Return a dictionary which maps partition number to the value of the last element in that partition ''' def selast(part_no,part_iter): ''' LOCAL function to yield only the last element in a partition (invoked by mapPartitionWithIndex) ''' for element in part_iter: last = element yield (part_no,last) finalsRDD = curRDD.mapPartitionsWithIndex(selast) return dict(finalsRDD.collect()) def phasePrefix(curRDD,phase_no,prefixOp): ''' One phase of a repeated calculation of prefix, accross partitions. Input: curRDD, the current RDD from the previous phase Input: phase_no, presumably in the range [1,maxphase] Input: prefixOp, the two-argument operator for prefix-sum Output: newRDD for the next iteration Note: this functions works accross iterations by first obtaining a dictionary lastElements(curRDD) which maps each partition number to the value of the last element in that partition -- this is a reasonably small dictionary. Then this dictionary, collected at the master machine, is broadcast to all worker machines using sc.broadcast(), which hopefull makes the dictionary available to each partition running a mapPartitions() method. Two critical formulas are used. (1) which partitions are "active" in this phase, meaning they will add a value to all their elements for the next phase; the formula is i in range(number of partitions) provided i%(2**phase_no) >= (2**phase_no/2) this formula essentially describes the "right half" of a group of partitions (2) there is a formula to find the last partition in the "left half", because its last element is needed by the actives source = (part_no // 2**phase_no) * 2**phase_no + (2**(phase_no-1)) - 1 ''' curTotals = lastElements(curRDD) # get last elements of each partition bdict = sc.broadcast(curTotals) # broadcast dictionary in the cluster P = curRDD.getNumPartitions() # with this many partitions d = 2**phase_no actives = [ i for i in range(P) if i%d >= d/2 ] def sub_prefix(part_no,part_iter): ''' LOCAL function used in mapPartitionsWithIndex(), it will add the value from the source partition's last element to all elements in this partition ''' source_partition = (part_no // 2**phase_no) * 2**phase_no + (2**(phase_no-1)) - 1 for element in part_iter: if part_no not in actives: yield element else: v = bdict.value[source_partition] yield prefixOp(v,element) newRDD = curRDD.mapPartitionsWithIndex(sub_prefix) return newRDD def runPhasePrefix(baseRDD,prefixOp): from math import log # calculate number of phases to be rounded up logarithm, base 2, # of the number of partitions in baseRDD; this number of partitions # will be constant throughout the calculation phases = int( log(baseRDD.getNumPartitions(),2) + 0.5 ) # before the loop of phases, let each partition run its own # individual prefix sum on the elements it has curRDD = basePrefix(baseRDD,prefixOp) # iterate for i = 1, 2, 3, ... phases for i in range(1,phases+1): curRDD = phasePrefix(curRDD,i,prefixOp) return curRDD def test(): sampleRDD = sc.parallelize([1]*1000,128) # try 128 partitions from operator import add outputRDD = runPhasePrefix(sampleRDD,add) testresult = outputRDD.collect() assert sorted(testresult) == range(1,1001) test() print ( "**** Done, test passed ****" )

## Run Counting (Slow)

One way to look at time series data is to consider the "peaks" and "valleys" that data has when it is plotted visually, though often this is too much to humanly examine. Therefore researchers have devised functions that measure how often data swings from low to high, etc. A primitive example of this is to count how many times the data has a sequence of length k, in which the trend is strictly increasing: this measurement we call *uprun-k* counting in the example program shown here. Generally speaking, if one were to make graph of uprun-k counts versus k for the same data, it would be a curve that decreases because the probability of an uprun-k grows smaller as k grows larger.

The program shown here copies the ideas from the trigram counting example above. It is quite slow because it has a "groupby()" operation, which does a major shuffle of so many (key,value) pairs. When run on a sample dataset of about 52MB of data (actually stored as four files in compressed format, around 3.2MB each) the running time was 338 seconds, and significantly longer when there were other users competing for the CPU.

''' The first eight lines set up the Spark Context and erase any existing HDFS output ''' from pyspark import SparkConf, SparkContext import subprocess, sys, random conf = SparkConf() conf.setMaster("yarn-client") conf.setAppName("runcount computation") conf.set("spark.executor.memory","2g") sc = SparkContext(conf = conf) subprocess.call("hdfs dfs -rm -r output",shell=True) def prepare_input(hdfsDir): ''' This function returns an RDD of floats obtained from the provided input directory on HDFS ''' def convertToFloat(w): ''' LOCAL function convertToFloat(w) - given a text string w, attempt to convert it into a floating point number, but if that fails, just return 0.0 ''' r = 0.0 # default value try: r = float(w) except ValueError: pass return r text_file = sc.textFile(hdfsDir) # splits are in here text_file = text_file.map(lambda line: line.encode('ascii','ignore')) tokens = text_file.flatMap(lambda line: line.split()) numbers = tokens.map(lambda word: convertToFloat(word)) return numbers def window(k,baseRDD): ''' Return a new RDD consisting of all possible segments of k consecutive items; and the new RDD should preserve order also. Thus, if k=3 and baseRDD = ["one","two","three","four","five"] the new RDD will be [("one","two","three"),("two","three","four"), ("three","four","five")] Note: the function likely only works for small values of k larger than 1 ''' def destinations(i,N,k): ''' LOCAL function destinations(i,N) - given N, the number of items in some RDD, and i, the index of a row in that RDD, generate a window of length k with i at its center (for even k, just round down). Examples with k=5: (4,99) -> (2,3,4,5,6) (98,99) -> (96,97,98) # there is no element 99 (0,99) -> (0.1,2) # there is no element -1 Examples with k=4: (4,99) -> (3,4,5,6) # like k=5, but truncate ''' assert k>1 if i<2: return () # doesn't work for i<2 if i>=N: return () # also out of bounds if k%2 > 0: left = [i-j for j in range(1,1+k//2)] # indices to "left" of i right = [i+j for j in range(1,1+k//2)] # indices to "right" of i r = [j for j in left+[i]+right if j>=0 and j<N] return tuple(r) else: # else k is even, so do the same but not so far on the left left = [i-j for j in range(1,k//2)] # indices to "left" of i right = [i+j for j in range(1,1+k//2)] # indices to "right" of i r = [j for j in left+[i]+right if j>=0 and j<N] return tuple(r) def spread(v,N,k): ''' LOCAL function spread(v,N,k) - given N, number of items in some RDD, and v which is a pair (value,index), generate a tuple with pairs (t,v) for each element in destinations(index,N,k); thus for instance spread(("some",40),100,5) will return (38,("some",40)),(39,("some",40)),(40,("some",40)), (41,("some",40)),(42,("some",40)) ''' assert len(v) == 2 value,index = v out = [(t,v) for t in destinations(index,N,k)] return tuple(out) def segmented(v): ''' LOCAL function used to map collected neighboring values into a tuple, where the values are like those generated by the spread(v,N,k) function; for example: (38,[("a",40),("b",37),("c",36),("d",38),("e",41)]) -> (38,["c","b","d","a","e"]) Notice how the order in the result comes from the index order of the pairs ''' ind,valist = v sortvalist = sorted(valist,key=lambda x: x[1]) # sort by second item w = [ a for a,b in sortvalist ] # just the values please return (ind,w) N = baseRDD.count() indRDD = baseRDD.zipWithIndex() # get ("one",0) ("two",1) etc spreadRDD = indRDD.flatMap(lambda v: spread(v,N,k)) rawWinRDD = spreadRDD.groupByKey() # the window values, but unsorted winDowRDD = rawWinRDD.map(lambda v: segmented(v)) return winDowRDD def upruns(k,winDowRDD): ''' With winDowRDD as produced by window(k,baseRDD), make a new RDD with only rows consisting of runs that are increasing order elements and have exactly k items ''' def runUp(vlist): ''' LOCAL function used to say whether a row has sorted order values - Given vlist = [list of elements], return True if the list is in increasing order, otherwise return False ''' if len(vlist) <= 1: return True compares = [ vlist[i] < vlist[i+1] for i in range(len(vlist)-1) ] return all(compares) withKRDD = winDowRDD.filter(lambda v: len(v[1])==k) uponlyRDD = withKRDD.filter(lambda v: runUp(v[1])) return uponlyRDD def upruncount(hdfsloc,windowsize): ''' Return a percent (number in the range 0.00 to 100.00) of how many "up runs" there are with a specified windowsize. Input: hdfsloc is the location of the input, e.g. "/user/hadoop/eegsample". Input: windowsize is a positive small integer greater than 1. Example: lines from hdfsloc are like this: 1.11 -2.3 0.51 0.75 1.81 2.42 -0.7 -1.3 With windowsize=3, there are these sequences to consider from the input: (1.11,-2.3,0.51) (-2.3,0.51,0.75) (0.51,0.75,1.81) (0.75,1.81,2.42) (1.81,2.42,-0.7) (2.42,-0.7,-1.3) We can see there are 6 sequences derived from the input, and only 2 of these 6 are increasing sequences (the third and the fourth). With these inputs, uprunpercent() should return 33.33 as the percent. ''' X = prepare_input(hdfsloc) Y = window(windowsize,X) Z = upruns(windowsize,Y) sys.stdout.write("COUNT = {0}\n".format(Z.count())) upruncount("/user/hadoop/eegsample",3)

## Run Counting (Fast)

An alternative way to explore data is to get approximate metrics which can be computed quickly. This idea was shown above for partition counting of trigrams. In this fast version of run counting, the calculation is again partition based. An exact count is done for each partition, and these are summed to get an approximate total. To get an exact count, a correction term is added. This correction term is calculated at the master by collecting all the runs which cross from one partition to another. This is a reasonable idea because the number of partitions is small, so the data collected by the master won't be too much network and memory resource.

This program ran in under 20 seconds (the slow program took 338 seconds). Both programs calculate the same total for the number of upruns of length k.

''' The first seven lines set up the Spark Context and erase any existing HDFS output ''' from pyspark import SparkConf, SparkContext import subprocess, sys, random conf = SparkConf() conf.setMaster("yarn-client") conf.setAppName("Uprun Counter") conf.set("spark.executor.memory","2g") sc = SparkContext(conf = conf) def prepare_input(hdfsDir): ''' This function returns an RDD of floats obtained from the provided input directory on HDFS ''' def convertToFloat(w): ''' LOCAL function convertToFloat(w) - given a text string w, attempt to convert it into a floating point number, but if that fails, just return 0.0 ''' r = 0.0 # default value try: r = float(w) except ValueError: pass return r text_file = sc.textFile(hdfsDir) # splits are in here # next line may not be necessary, getting rid of unicode text_file = text_file.map(lambda line: line.encode('ascii','ignore')) tokens = text_file.flatMap(lambda line: line.split()) numbers = tokens.map(lambda word: convertToFloat(word)) return numbers def runUp(vlist): ''' LOCAL function used to say whether a row has sorted order values - Given vlist = [list of elements], return True if the list is in increasing order, otherwise return False ''' if len(vlist) <= 1: return True compares = [ vlist[i] < vlist[i+1] for i in range(len(vlist)-1) ] return all(compares) def partition_upruns(baseRDD,k): ''' Count the runs of length k that are up-sequences, returning an RDD of p numbers, where p is the number of partitions in baseRDD ''' def localRun(part_iter): ''' LOCAL function to look at all the sequences of length k within a partition, and return a count of how many are increasing sequences ''' count, sequence = 0, [] for element in part_iter: sequence.append(element) # add element at end of current sequence if len(sequence) < k: continue if runUp(sequence): count += 1 sequence.pop(0) # remove first element, so next can be added to end yield count return baseRDD.mapPartitions(localRun) def tail_sequences(indxRDD,k): ''' This function returns an RDD consisting of runs of length 1, 2, 3, ..., k-1 which end at the last element of a partition. Thus, for each partition, k-1 sequences are generated. Also, each of these sequences is put into (key,value) form, where the value is the sequence and the key is the index, within baseRDD, where the sequence starts. ''' def emit_tail(iter): ''' LOCAL function for a mapPartitions(), with the implicit additional input being sizedict, which generates the tail runs described above ''' sequence = [] for pair in iter: sequence.append(pair) if len(sequence) > k-1: sequence.pop(0) # right here, sequence has k-1 items (unless supersmall RDD) for j in range(len(sequence)): val,indx = sequence[j] # get a pair, to see where it starts yield (indx,[v for (v,i) in sequence[j:]]) tailRDD = indxRDD.mapPartitions(emit_tail) return tailRDD def head_sequences(indxRDD,k): ''' This function returns an RDD consisting of runs of length 1, 2, 3, ..., k-1 which begin at the first element of a partition. Thus, for each partition, k-1 sequences are generated. Also, each of these sequences is put into (key,value) form, where the value is the sequence and the key is the index, within baseRDD, where the sequence starts. ''' def emit_head(iter): ''' LOCAL function for a mapPartitions(), with the implicit additional input being sizedict, which generates the head runs described above ''' sequence = [] for pair in iter: sequence.append(pair) if len(sequence) >= k-1: break # right here, sequence has k-1 items (unless supersmall RDD) val,start = sequence[0] # start is index of first element for j in range(len(sequence)): yield (start,[v for (v,i) in sequence[:j+1]]) headRDD = indxRDD.mapPartitions(emit_head) return headRDD def left_overs(sequences_with_index,k): ''' This function is given some pairs of the form (index,seq) as produced by tail_sequences() and head_sequences() above, and it tries to construct sequences of length k that logically join together, with respect to the baseRDD. Then, for each of these, it counts how many are increasing sequences and returns the total. ''' count = 0 for (start,seq) in sequences_with_index: size = len(seq) otherSize = k-size # this is the other half otherStart = start + size # where other starts for (canstart,remainder) in sequences_with_index: if len(remainder) == otherSize and canstart == otherStart: tryseq = seq + remainder if runUp(tryseq): count += 1 return count def upruns(k): baseRDD = prepare_input("/user/hadoop/eegsample") T = partition_upruns(baseRDD,k) N = T.sum() # this is the approximate count indxRDD = baseRDD.zipWithIndex() V = tail_sequences(indxRDD,k).collect() W = head_sequences(indxRDD,k).collect() M = left_overs(V+W,k) return N+M # HERE is where to set k total = upruns(3) print "****", total