Spark Examples

This page has exercises, most presented or at least sketched in class.

Count Unusual Values

The problem is to count how many values in the eegsample dataset are not within two standard deviations of the mean. A program for this was developed in class, essentially consisting of the following steps.

This code took around 30 seconds to run. To get something that runs faster, sampling was suggested: by changing the initial line to the following, the program ran quite fast (sampling just 1% of the dataset).

Another demonstration for this problem put all the code into a file wanted.py, along with some initial lines to create the Spark Context (sc) variable. This file was copied, using the scp command, to the practice machine. Then the following command was demonstrated:

What this command does is run the wanted.py using spark-submit, but it puts the many informational (and error) messages into a file named "log" whereas the printed output goes to a file named "hold". This can be handy because scp can be used to copy log and/or hold to your local computer for study.

Bigrams and Trigrams

The bigram counting problem is an example worked out previously in an assignment with MapReduce. How can it be done in Spark? A rather complicated, though correct idea was sketched in class. Rather than just doing bigrams, this large example shows how trigrams can be counted using Spark RDD methods. It's quite complicated; some explanatory notes follow the code.

The logic of this code is based on something like a join operation. Words of the text are paired with index numbers, then each (word,index) pair is mapped to [ (index-1,(word,index)) , (index,(word,index)), (index+1,(word,index)) ]. After this, the magic of flatMap() and groupBy() builds an RDD in which there are elements (index,[(wordA,indexA),(wordB,indexB),(wordC,indexC)]). Such an element is the material to make a trigram: it has three items in the value part, though these may be out of order. Fortunately, by sorting on {indexA,indexB,indexC} the three words are put into the correct order, and we can map this messy thing into a single string trigram. After this step, the usual wordcount logic does the rest of the job.

Unfortunately this program is terribly slow. It ran for around five minutes to count all the trigrams.

Approximate Trigram Counting

Suppose the counts of trigrams don't need to be perfect. One thing to try is using the segsample() function defined above. It will get some portion of the database, and run the counting algorithm on a small size sample.

Another idea is to skip over counting trigrams that span partitions. (In fact, the bigram exercise using MapReduce also skipped counting bigrams that span splits.) Just skipping these cases results in a simple program that runs fast. Here is the code:

The program uses a trick that is available in Python (might be hard to do with Scala). Notice the variable windowMemory: it is defined once as an empty list. All the subsequent operations on windowMemory are mutations. Because they are mutations only (there is no other statement assigning to windowMemory), Python will treat windowMemory as a global variable. The value of windowMemory will persist between calls to trimap. It takes some knowledge of Python to know about this trick, but it makes possible the same kind of code that was used in the bigram assignment using MapReduce. The words of the text are fed, one by one, to trimap; and trimap emits a string containing three words (except for some initial cases). This seems perfect! But, it is not quite perfect. Understand that the map(trimap) method is invoked, in parallel, on multiple cores, one for each partition of the input data. Therefore, trigrams that would cross partitions are not counted.

Partition Counting of Trigrams

The explanation of map(trimap) talks about partitions, yet there is no explicit reference to partitions in the code (except for two statements that have been commented out). An equivalent program makes the connection to partitions clear.

The only difference between this and the previous program which has map(trimap) is that here it is mapPartitions(trimap) and the code for trimap uses the yield statement (which is usually skipped in beginning Python courses and textbooks). Whereas map(v) expects v to be an element of an rdd, the method for mapPartitions, say trimap(p), expects p to be an iterator of all the elements of a partition. Therefore trimap(p) should yield all the elements it needs to generate rather than return (a return statement would end execution of trimap too soon).

Dataframe and CSV

This example was presented in class, based on the government dataset thads.csv introduced in the Spark page.

The python code was put into a file thads.py. Because it is laborious to type in the "--packages com.databricks..." part, the in-class demonstration also used a script. The following two lines were put into a file named ss.

The first line indicates this is a Bash (Unix shell) script, and the second line has the placeholder "$1" at the end of the line. After putting those two lines into file, this command makes the script ss executable:

Now, from a shell, this command runs the python script:

This is like running ss as a command, in the current directory, with thads.py as argument to ss; the argument thads.py will be substituted for $1 so that what actually runs is like typing in

It is a bit time-consuming to create ss and set things up to use it, though in a debugging cycle of development, such a script would be used many times, and it is helpful to save keystrokes in repetitive work.

Parallel Prefix Sums

It was shown in a lecture how the ideas of a prefix sum are surprisingly useful for a variety of tasks, even related to the task of making a cumulative frequency distribution. The question here is, how can this be done using Spark? Would the ideas of the trigram examples above be enough to solve this problem?

The problem is to calculate the sequence x[0], x[0]+x[1], x[0]+x[1]+x[2], ... efficiently and in parallel, where the input is a file containing x[i], where i ranges over [0,n-1]. The program presented here uses some features of Spark not found in the previous examples: sc.broadcast and RDD.mapPartitionsWithIndex, which enable a program to collect limited information from all partitions, then distribute a dictionary of this information to all workers. This way of programming resembles typical asynchronous parallel programming, which is organized as phases with barrier synchronization.

The parallel algorithm for parallel prefix is, from a high level perspective, the same that's used in a synchronous PRAM architecture. There would be log(n) phases, each phase accumulating larger sums. Also, there is a divide-and-conquer theme of the program. Roughly described, it divides the input into two halves, does prefix sums on each half of the input, the gets the last element of the left half and adds that element to everything in the right half. Recursively, to calculate for a half of the input, it is divided into two quarters, and so on -- that's why it is a log(n) phase program. Technically, rather than going top-down (dividing into halves first), the order is reversed: begin with parts of length 2 in the first phase, then parts of length 4 in the second phase, and so on.

The difference between the PRAM-like strategy and the Spark program is that, before the first phase, a prefix-sum is done separately in each partition. This is a fast operation. Then the log(m) phases start, where m is the number of partitions. In each phase, selected partitions get the last element of some previous partition and add that element to everything in their partitions.

'''
 The first seven lines set up the Spark Context
'''
from pyspark import SparkConf, SparkContext 
import subprocess, sys, random
conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("prefix sums")
conf.set("spark.executor.memory","2g")
sc = SparkContext(conf = conf)

def basePrefix(baseRDD,prefixOp):
  '''
  Return an RDD obtained from baseRDD by calculating
  a prefix-sum, using the provided prefixOp,
  on a partition-basis (each partition does its own 
  prefix-sum to get the new RDD) 
  '''
  def prefix_part(part_iter):
    '''
    LOCAL function used within a mapPartitions() 
    to carry out the prefix-sum calculation
    '''
    total = None
    for element in part_iter:
      if total == None: 
        total = element
        yield element
      else:
        total = prefixOp(total,element)
        yield total

  newRDD = baseRDD.mapPartitions(prefix_part)
  return newRDD

def lastElements(curRDD):
  '''
  Return a dictionary which maps partition number to 
  the value of the last element in that partition
  '''
  def selast(part_no,part_iter):
    '''
    LOCAL function to yield only the last element
    in a partition (invoked by mapPartitionWithIndex) 
    '''
    for element in part_iter: last = element
    yield (part_no,last)

  finalsRDD = curRDD.mapPartitionsWithIndex(selast)
  return dict(finalsRDD.collect())

def phasePrefix(curRDD,phase_no,prefixOp):
  '''
  One phase of a repeated calculation of prefix, accross partitions.
  Input: curRDD, the current RDD from the previous phase
  Input: phase_no, presumably in the range [1,maxphase] 
  Input: prefixOp, the two-argument operator for prefix-sum
  Output: newRDD for the next iteration             

  Note: this functions works accross iterations by first 
  obtaining a dictionary lastElements(curRDD) which maps 
  each partition number to the value of the last element 
  in that partition -- this is a reasonably small dictionary.
  Then this dictionary, collected at the master machine, is 
  broadcast to all worker machines using sc.broadcast(), 
  which hopefull makes the dictionary available to each 
  partition running a mapPartitions() method.

  Two critical formulas are used. 
   (1) which partitions are "active" in this phase, 
       meaning they will add a value to all their
       elements for the next phase; the formula is
         i in range(number of partitions) 
         provided i%(2**phase_no) >= (2**phase_no/2)
       this formula essentially describes the 
       "right half" of a group of partitions
   (2) there is a formula to find the last 
       partition in the "left half", because its 
       last element is needed by the actives
        source = (part_no // 2**phase_no) * 2**phase_no 
                 + (2**(phase_no-1)) - 1
  '''
  curTotals = lastElements(curRDD)  # get last elements of each partition
  bdict = sc.broadcast(curTotals)   # broadcast dictionary in the cluster
  P = curRDD.getNumPartitions()     # with this many partitions
  d = 2**phase_no
  actives = [ i for i in range(P) if i%d >= d/2 ] 
  def sub_prefix(part_no,part_iter):
    '''
    LOCAL function used in mapPartitionsWithIndex(), 
    it will add the value from the source partition's last
    element to all elements in this partition 
    '''
    source_partition = (part_no // 2**phase_no) * 2**phase_no + (2**(phase_no-1)) - 1
    for element in part_iter:  
      if part_no not in actives:    
         yield element
      else:
         v = bdict.value[source_partition]
         yield prefixOp(v,element)

  newRDD = curRDD.mapPartitionsWithIndex(sub_prefix)
  return newRDD

def runPhasePrefix(baseRDD,prefixOp):
  from math import log
  # calculate number of phases to be rounded up logarithm, base 2, 
  # of the number of partitions in baseRDD; this number of partitions
  # will be constant throughout the calculation
  phases = int( log(baseRDD.getNumPartitions(),2) + 0.5 )
  # before the loop of phases, let each partition run its own
  # individual prefix sum on the elements it has
  curRDD = basePrefix(baseRDD,prefixOp)
  # iterate for i = 1, 2, 3, ... phases
  for i in range(1,phases+1): 
    curRDD = phasePrefix(curRDD,i,prefixOp) 
  return curRDD
  
def test():
  sampleRDD = sc.parallelize([1]*1000,128) # try 128 partitions 
  from operator import add
  outputRDD = runPhasePrefix(sampleRDD,add)
  testresult = outputRDD.collect()
  assert sorted(testresult) == range(1,1001)

test()

print ( "**** Done, test passed ****" )

Run Counting (Slow)

One way to look at time series data is to consider the "peaks" and "valleys" that data has when it is plotted visually, though often this is too much to humanly examine. Therefore researchers have devised functions that measure how often data swings from low to high, etc. A primitive example of this is to count how many times the data has a sequence of length k, in which the trend is strictly increasing: this measurement we call uprun-k counting in the example program shown here. Generally speaking, if one were to make graph of uprun-k counts versus k for the same data, it would be a curve that decreases because the probability of an uprun-k grows smaller as k grows larger.

The program shown here copies the ideas from the trigram counting example above. It is quite slow because it has a "groupby()" operation, which does a major shuffle of so many (key,value) pairs. When run on a sample dataset of about 52MB of data (actually stored as four files in compressed format, around 3.2MB each) the running time was 338 seconds, and significantly longer when there were other users competing for the CPU.

'''
 The first eight lines set up the Spark Context
 and erase any existing HDFS output
'''
from pyspark import SparkConf, SparkContext 
import subprocess, sys, random
conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("runcount computation")
conf.set("spark.executor.memory","2g")
sc = SparkContext(conf = conf)
subprocess.call("hdfs dfs -rm -r output",shell=True)

def prepare_input(hdfsDir):
  '''
  This function returns an RDD of floats obtained
  from the provided input directory on HDFS
  '''
  def convertToFloat(w):
    '''
    LOCAL function convertToFloat(w)
    - given a text string w, attempt to convert it into
    a floating point number, but if that fails, just
    return 0.0
    '''
    r = 0.0  # default value
    try: 
      r = float(w)
    except ValueError:
      pass
    return r
  text_file = sc.textFile(hdfsDir) # splits are in here
  text_file = text_file.map(lambda line: line.encode('ascii','ignore'))
  tokens = text_file.flatMap(lambda line: line.split()) 
  numbers = tokens.map(lambda word: convertToFloat(word))       
  return numbers 

def window(k,baseRDD):
  '''
  Return a new RDD consisting of all possible segments of k 
  consecutive items; and the new RDD should preserve order 
  also. Thus, if k=3 and baseRDD = ["one","two","three","four","five"]
  the new RDD will be 
     [("one","two","three"),("two","three","four"),
      ("three","four","five")] 
  Note: the function likely only works for small values of k
  larger than 1
  '''
  def destinations(i,N,k):
    '''
    LOCAL function destinations(i,N) 
    - given N, the number of items in some RDD, and i,
    the index of a row in that RDD, generate a window of 
    length k with i at its center (for even k, just round down).
    Examples with k=5:
      (4,99) -> (2,3,4,5,6)       
      (98,99) -> (96,97,98) # there is no element 99
      (0,99) -> (0.1,2)     # there is no element -1
    Examples with k=4:
      (4,99) -> (3,4,5,6)   # like k=5, but truncate
    '''
    assert k>1 
    if i<2: return ()  # doesn't work for i<2  
    if i>=N: return () # also out of bounds
    if k%2 > 0:
      left = [i-j for j in range(1,1+k//2)]  # indices to "left" of i
      right = [i+j for j in range(1,1+k//2)] # indices to "right" of i 
      r = [j for j in left+[i]+right if j>=0 and j<N] 
      return tuple(r)
    else:
      # else k is even, so do the same but not so far on the left
      left = [i-j for j in range(1,k//2)]      # indices to "left" of i
      right = [i+j for j in range(1,1+k//2)]   # indices to "right" of i 
      r = [j for j in left+[i]+right if j>=0 and j<N] 
      return tuple(r)
  def spread(v,N,k):
    '''
    LOCAL function spread(v,N,k)
    - given N, number of items in some RDD, and v 
    which is a pair (value,index), generate 
    a tuple with pairs (t,v) for each element 
    in destinations(index,N,k); thus for instance
    spread(("some",40),100,5) will return 
     (38,("some",40)),(39,("some",40)),(40,("some",40)),
     (41,("some",40)),(42,("some",40))
    '''
    assert len(v) == 2
    value,index = v 
    out = [(t,v) for t in destinations(index,N,k)]
    return tuple(out)
  def segmented(v):
    '''
    LOCAL function used to map collected neighboring values
    into a tuple, where the values are like those generated
    by the spread(v,N,k) function; for example:
     (38,[("a",40),("b",37),("c",36),("d",38),("e",41)]) 
     -> (38,["c","b","d","a","e"]) 
    Notice how the order in the result comes from the index
    order of the pairs
    '''
    ind,valist = v
    sortvalist = sorted(valist,key=lambda x: x[1]) # sort by second item
    w = [ a for a,b in sortvalist ] # just the values please
    return (ind,w)

  N = baseRDD.count()
  indRDD = baseRDD.zipWithIndex() # get ("one",0) ("two",1) etc
  spreadRDD = indRDD.flatMap(lambda v: spread(v,N,k))
  rawWinRDD = spreadRDD.groupByKey() # the window values, but unsorted
  winDowRDD = rawWinRDD.map(lambda v: segmented(v))
  return winDowRDD

def upruns(k,winDowRDD):  
  '''
  With winDowRDD as produced by window(k,baseRDD),  
  make a new RDD with only rows consisting of runs that
  are increasing order elements and have exactly k items
  '''
  def runUp(vlist):
    '''
    LOCAL function used to say whether a row has sorted order values
    - Given vlist = [list of elements], return True if the list 
      is in increasing order, otherwise return False
    '''
    if len(vlist) <= 1: return True 
    compares = [ vlist[i] < vlist[i+1] for i in range(len(vlist)-1) ]
    return all(compares)

  withKRDD = winDowRDD.filter(lambda v: len(v[1])==k)
  uponlyRDD = withKRDD.filter(lambda v: runUp(v[1]))
  return uponlyRDD 

def upruncount(hdfsloc,windowsize):
  '''
  Return a percent (number in the range 0.00 to 100.00)
  of how many "up runs" there are with a specified windowsize.

  Input: hdfsloc is the location of the input, e.g. "/user/hadoop/eegsample".
  Input: windowsize is a positive small integer greater than 1.

  Example: lines from hdfsloc are like this:
  
     1.11 -2.3 0.51 0.75 
     1.81 2.42 -0.7 -1.3

  With windowsize=3, there are these sequences to consider
  from the input:

     (1.11,-2.3,0.51) 
     (-2.3,0.51,0.75)
     (0.51,0.75,1.81)
     (0.75,1.81,2.42)
     (1.81,2.42,-0.7)
     (2.42,-0.7,-1.3)
 
  We can see there are 6 sequences derived from the input,
  and only 2 of these 6 are increasing sequences (the third
  and the fourth). With these inputs, uprunpercent() should 
  return 33.33 as the percent.                   
  '''
  X = prepare_input(hdfsloc)
  Y = window(windowsize,X) 
  Z = upruns(windowsize,Y) 
  sys.stdout.write("COUNT = {0}\n".format(Z.count()))

upruncount("/user/hadoop/eegsample",3)

Run Counting (Fast)

An alternative way to explore data is to get approximate metrics which can be computed quickly. This idea was shown above for partition counting of trigrams. In this fast version of run counting, the calculation is again partition based. An exact count is done for each partition, and these are summed to get an approximate total. To get an exact count, a correction term is added. This correction term is calculated at the master by collecting all the runs which cross from one partition to another. This is a reasonable idea because the number of partitions is small, so the data collected by the master won't be too much network and memory resource.

This program ran in under 20 seconds (the slow program took 338 seconds). Both programs calculate the same total for the number of upruns of length k.

'''
 The first seven lines set up the Spark Context
 and erase any existing HDFS output
'''
from pyspark import SparkConf, SparkContext 
import subprocess, sys, random
conf = SparkConf()
conf.setMaster("yarn-client")
conf.setAppName("Uprun Counter")
conf.set("spark.executor.memory","2g")
sc = SparkContext(conf = conf)

def prepare_input(hdfsDir):
  '''
  This function returns an RDD of floats obtained
  from the provided input directory on HDFS
  '''
  def convertToFloat(w):
    '''
    LOCAL function convertToFloat(w)
    - given a text string w, attempt to convert it into
    a floating point number, but if that fails, just
    return 0.0
    '''
    r = 0.0  # default value
    try: 
      r = float(w)
    except ValueError:
      pass
    return r

  text_file = sc.textFile(hdfsDir) # splits are in here
  # next line may not be necessary, getting rid of unicode
  text_file = text_file.map(lambda line: line.encode('ascii','ignore'))
  tokens = text_file.flatMap(lambda line: line.split()) 
  numbers = tokens.map(lambda word: convertToFloat(word))       
  return numbers 

def runUp(vlist):
  '''
  LOCAL function used to say whether a row has sorted order values
  - Given vlist = [list of elements], return True if the list 
  is in increasing order, otherwise return False
  '''
  if len(vlist) <= 1: return True 
  compares = [ vlist[i] < vlist[i+1] for i in range(len(vlist)-1) ]
  return all(compares)

def partition_upruns(baseRDD,k):
  '''
  Count the runs of length k that are up-sequences, 
  returning an RDD of p numbers, where p is the number
  of partitions in baseRDD
  '''
  def localRun(part_iter):
    '''
    LOCAL function to look at all the sequences of length k
    within a partition, and return a count of how many are
    increasing sequences
    '''
    count, sequence = 0, []
    for element in part_iter:
      sequence.append(element) # add element at end of current sequence
      if len(sequence) < k: continue
      if runUp(sequence): count += 1   
      sequence.pop(0) # remove first element, so next can be added to end
    yield count
  return baseRDD.mapPartitions(localRun)

def tail_sequences(indxRDD,k):
  '''
  This function returns an RDD consisting of runs of 
  length 1, 2, 3, ..., k-1 which end at the last element 
  of a partition. Thus, for each partition, k-1 sequences
  are generated. Also, each of these sequences is put into
  (key,value) form, where the value is the sequence and 
  the key is the index, within baseRDD, where the sequence
  starts.
  '''
  def emit_tail(iter):
    '''
    LOCAL function for a mapPartitions(), with the 
    implicit additional input being sizedict, which 
    generates the tail runs described above
    '''
    sequence = []
    for pair in iter:
      sequence.append(pair)
      if len(sequence) > k-1: sequence.pop(0)

    # right here, sequence has k-1 items (unless supersmall RDD)
    for j in range(len(sequence)):
      val,indx = sequence[j]  # get a pair, to see where it starts
      yield (indx,[v for (v,i) in sequence[j:]]) 

  tailRDD = indxRDD.mapPartitions(emit_tail)
  return tailRDD

def head_sequences(indxRDD,k):
  '''
  This function returns an RDD consisting of runs of 
  length 1, 2, 3, ..., k-1 which begin at the first element 
  of a partition. Thus, for each partition, k-1 sequences
  are generated. Also, each of these sequences is put into
  (key,value) form, where the value is the sequence and 
  the key is the index, within baseRDD, where the sequence
  starts.
  '''
  def emit_head(iter):
    '''
    LOCAL function for a mapPartitions(), with the 
    implicit additional input being sizedict, which 
    generates the head runs described above
    '''
    sequence = []
    for pair in iter:
      sequence.append(pair)
      if len(sequence) >= k-1: break

    # right here, sequence has k-1 items (unless supersmall RDD)
    val,start = sequence[0] # start is index of first element
    for j in range(len(sequence)):
      yield (start,[v for (v,i) in sequence[:j+1]]) 

  headRDD = indxRDD.mapPartitions(emit_head)
  return headRDD

def left_overs(sequences_with_index,k):
  '''
  This function is given some pairs of the form (index,seq)
  as produced by tail_sequences() and head_sequences() above,
  and it tries to construct sequences of length k that 
  logically join together, with respect to the baseRDD.
  Then, for each of these, it counts how many are increasing
  sequences and returns the total.
  '''
  count = 0
  for (start,seq) in sequences_with_index:
    size = len(seq)  
    otherSize = k-size  # this is the other half
    otherStart = start + size # where other starts
    for (canstart,remainder) in sequences_with_index:
      if len(remainder) == otherSize and canstart == otherStart:
         tryseq = seq + remainder 
         if runUp(tryseq): count += 1
  return count
   
def upruns(k):
  baseRDD = prepare_input("/user/hadoop/eegsample")
  T = partition_upruns(baseRDD,k)
  N = T.sum() # this is the approximate count
  indxRDD = baseRDD.zipWithIndex()
  V = tail_sequences(indxRDD,k).collect()
  W = head_sequences(indxRDD,k).collect()
  M = left_overs(V+W,k)
  return N+M  

# HERE is where to set k
total = upruns(3)
print "****", total

Read Data via RDD into Dataframe

This examples sketches the idea of handling data that consists of groups of lines, rather than one logical unit of data per line. The idea is to use mapPartitions() to handle, on a per partition basis, the raw input fed into that partitition. We have a custom function that can read multiple lines and assemble a logical unit of data from that. For this example, it goes further: a pyspark.sql.types.Row object is created, and this is used to make a Dataframe.

The program uses some poor style elements, and has bugs. This was meant to show a point in class.

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession 
from pyspark.sql.types import Row
import subprocess, sys, random
conf = SparkConf()
sc = SparkContext(conf = conf)
spark =  SparkSession.builder.getOrCreate()

def getaddresses(partition_iterator):
  """
  This function will be called via the mapPartitions() method,
  to iterate through all the lines of a chunk of input on one
  partition. The format of the input looks like this:

    Name
    Telephone (this is an optional line)
    Address (one or more lines)
    City, State, Zip

  The logic of the code is basically a state-machine, which 
  advances from the state of having no address to having a 
  more complete address, and when it has an entire address,
  it yields a Row object for that address. 

  The outcome of mapPartitions(getaddresses) will be that a 
  new RDD partition will be built consisting of Row objects
  as elements of the new RDD
  """
  Name, Telephone, Address, City, State, Zip = None, None, None, None, None, None
  for line in partition_iterator:
    line = line.decode("utf8").encode("Ascii","ignore")
    if line.startswith("#"): continue  # ignore comment lines
    if line.strip() == '':   continue  # ignore blank lines 
    if Name == None: 
      Name = line.strip()  # remove leading and trailing blanks
      continue             # then get next line
    # we have the name
    if Telephone == None:   
      temp = line.split()  # get all the parts of a phone number
      isNumber = True 
      for word in temp:
        isNumber = isNumber and word.isdigit() # found this trick in Python documentation
      if isNumber:         # true if all parts are numeric
        Telephone = line.strip()
        continue
    # figure out if this line ends with something looking like a zip code
    temp = line.strip().split()
    lastword = temp[-1]    # lastword might be a zip

    # past the telephone, could be address  
    if Address == None and not (lastword.isdigit() and len(lastword)>=5):
      Address = line.strip()
      continue
    if not lastword.isdigit():
      Address = ' / ' + line.strip()  # multiline address
      continue

    # format should be City, State Zipcode
    Zip = lastword
    State = temp[-2]   # grab the state 
    City = ''          # accumulate the city name (can be multiple words)
    for word in temp:
      if word.endswith(","):
        City += word
        break
      City += word

    # Now we have it all, except maybe Telephone is missing
    if Telephone == None: Telephone = ''

    # make a Row object with all the parts
    NewRow = Row(NAME=Name,TELEPHONE=Telephone,
                  ADDRESS=Address,CITY=City,STATE=State,ZIP=Zip)
    
    # yield the new Row object and reset all the parts for next group of lines
    yield NewRow
    Name, Telephone, Address, City, State, Zip = None, None, None, None, None, None

# Test on a local file addresses.text 
baseRDD = sc.textFile("file:///home/herman/Spark/addresses.txt") 
rowsRDD = baseRDD.mapPartitions(getaddresses)
addressDataFrame = spark.createDataFrame(rowsRDD)
display = addressDataFrame.show()
print(display)

SparkExamples (last edited 2017-03-23 14:45:17 by Ted Herman)