Spark Windowing and Aggregation Functions for DataFrames

This post provides example usage of the Spark “lag” windowing function with a full code example of how “lag” can be used to find gaps in dates. Windowing and aggregate functions are similar in that each works on some kind of grouping criteria. The difference is that with aggregates Spark generates a unique value for each group, based on some calculation like computing the maximum value (within group) of some column. Windowing functions use grouping to compute a value for each record in the group.

For example, lets say you were a sports ball statistician that needs to calculate:

  • for each team in league: the average number of goals scored by all players on that team
  • for each player on each team: an ordering of players by top scorer, and for each player ‘P’, the delta between ‘P’ and the top scorer.

You’d generate the first stat using groupBy and the ‘average’ aggregate function. Note that for each (by team) group, you get one value.

Input 
-----

Team    Player      Goals
----    ------      -----

Bears   
        Joe         4  \
        Bob         3    ->   (4+3+2) /3 = 3
        Mike        2  /

Sharks
        Lou         2  \                   
        Pancho      4    ->   (2+4+0) /3 = 2
        Tim         0  /

Output
------

Team    Average
-----   -------
Bears   3
Sharks  2

You could generate the second stat with an expression like Window.partitionBy(“team”) which groups players, by team, then for each team you would compute the max score, and then for each player you’d compute the delta between that player’s score and the max. The expression to do that would look something like :
withColumn(“delta”, max($”score”).over( Window.partitionBy(“team”) )

The full code example goes into more details of the usage. But conceptually, your inputs and outputs would look something like the following:


    Input 
    -----

    Team    Player      Goals                       
    ----    ------      -----

    Bears   
            Joe         4  \
            Bob         3    ->   max(4,3,2) = 4
            Mike        2  /

    Sharks
            Lou         4  \                   
            Pancho      2    ->   max(2,4,0) = 4
            Tim         0  /

    Output
    ------

    Team    Player      Goals   Delta
    -----   -------     -----   -----
    Bears   Joe         4       0       <--- We have 3 values 
    Bears   Bob         3       1       <--- for each team.  
    Bears   Mike        2       2       <--- One per player, 
    Sharks  Lou         4       0       <--- not one value  
    Sharks  Pancho      2       2       <--- for each team, 
    Sharks  Tim         0       4       <--- as in previous example

Full Code Example

Now, let’s look at another example backed by some code. For this one, let’s imagine we are managing our sports ball team and we need each player to regularly certify for non-use of annabolic steroids. For a given auditing period we will give any individual player a pass for one lapse (defined by an interval where a previous non-use certification has expired and a new certification has not entered into effect.) Two or more lapses, and Yooouuuu’re Out ! We give the complete code listing below, followed by a discussion.

package org


import java.io.PrintWriter

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._



object DateDiffWithLagExample extends App {

  lazy val sparkConf =
    new SparkConf() .setAppName("SparkySpark") .setMaster("local[*]")
  lazy val sparkSession =
    SparkSession .builder() .config(sparkConf).getOrCreate()
  val datafile = "/tmp/spark.lag.demo.txt"

  import DemoDataSetup._
  import org.apache.spark.sql.functions._
  import sparkSession.implicits._

  sparkSession.sparkContext.setLogLevel("ERROR")

  val schema = StructType(
    List(
      StructField(
        "certification_number", IntegerType, false),
      StructField(
        "player_id", IntegerType, false),
      StructField(
        "certification_start_date_as_string", StringType, false),
      StructField(
        "expiration_date_as_string", StringType, false)
    )
  )


  writeDemoDataToFile(datafile)

  val df =
    sparkSession.
      read.format("csv").schema(schema).load(datafile)
  df.show()

  val window =
    Window.partitionBy("player_id")
      .orderBy("expiration_date")
  val identifyLapsesDF = df
    .withColumn(
      "expiration_date",
      to_date($"expiration_date_as_string", "yyyy+MM-dd"))
    .withColumn(
      "certification_start_date",
      to_date($"certification_start_date_as_string", "yyyy+MM-dd"))
    .withColumn(
      "expiration_date_of_previous_as_string",
      lag($"expiration_date_as_string", 1, "9999+01-01" )
        .over(window))
    .withColumn(
      "expiration_date_of_previous",
      to_date($"expiration_date_of_previous_as_string", "yyyy+MM-dd"))
    .withColumn(
      "days_lapsed",
      datediff(
        $"certification_start_date",
        $"expiration_date_of_previous"))
    .withColumn(
      "is_lapsed",
      when(col("days_lapsed") > 0, 1) .otherwise(0))

  identifyLapsesDF.printSchema()
  identifyLapsesDF.show()

  val identifyLapsesOverThreshold =
    identifyLapsesDF.
      groupBy("player_id").
      sum("is_lapsed").where("sum(is_lapsed) > 1")
  identifyLapsesOverThreshold.show()
}


object DemoDataSetup {
  def writeDemoDataToFile(filename: String): PrintWriter = {
    val data =
      """
        |12384,1,2018+08-10,2018+12-10
        |83294,1,2017+06-03,2017+10-03
        |98234,1,2016+04-08,2016+08-08
        |24903,2,2018+05-08,2018+07-08
        |32843,2,2017+04-06,2018+04-06
        |09283,2,2016+04-07,2017+04-07
      """.stripMargin

    // one liner to write string:  not exception or encoding safe. for demo/testing only
    new PrintWriter(filename) { write(data); close }
  }
}

We begin by loading the input data below (only two players) via the
DemoDataSetup.writeDemoDataToFile method.

 +-------+------+------------+-------------+
|cert_id|player|cert_start |cert_expires |
+-------+------+------------+-------------+
| 12384| 1| 2018+08-10| 2018+12-10|
| 83294| 1| 2017+06-03| 2017+10-03|
| 98234| 1| 2016+04-08| 2016+08-08|
| 24903| 2| 2018+05-08| 2018+07-08|
| 32843| 2| 2017+04-06| 2018+04-06|
| 9283| 2| 2016+04-07| 2017+04-07|
+-------+----+------------+---------------+

Next we construct three DataFrames. The first reads in the data (using a whacky non-standard date format just for kicks.) The second uses the window definition below

  val window = 
     Window.partitionBy("player_id") .orderBy("expiration_date")

which groups records by player id, and orders records from earliest certification to latest. For each record this expression

   .withColumn(
      "expiration_date_of_previous_as_string",
      lag($"expiration_date_as_string", 1, "9999+01-01" )
        .over(window)

will ensure that for any given record listing the start date of a certification period we get the expiration date of the previous period. We use ‘datediff’ to calculate the days elapsed between the expiration of the previous cert and the effective start date of the cert for the current record. Then we use when/otherwise to mark a given player as is_lapsed if the days elapsed calculation between the current record’s start date and the previous record’s end date yielded a number greater than zero.

Finally, we compute a third DataFrame – identifyLapsesOverThreshold – which this time uses an aggregation  (as opposed to windowing) function to
group by player id and see if any player’s sum of ‘is_lapsed’ flags is more than one.

The final culprit is player 1, who has two lapses and will thus be banished — should have just said No to steroids.

 +-----------+--------------+
| player_id|sum(is_lapsed)|
+-----------+--------------+
| 1| 2|
+-----------+--------------+

Can Adding Partitions Improve The Performance of Your Spark Job On Skewed Data Sets?

After reading a number of on-line articles on how to handle ‘data skew’ in one’s Spark cluster, I ran some experiments on my own ‘single JVM’ cluster to try out one of the techniques mentioned. This post presents the results, but before we get to those, I could not restrain myself from some nitpicking (below) about the definition of ‘skew’. You can quite easily skip the next section if you just want to get to the Spark techniques.

A Statistical Aside

Statistics defines a symmetric distribution as one in which the mean, median, and mode are all equal, and a skewed distribution as one where these properties do not hold. Many online resources use a conflicting definition of data skew, for example this one, which talks about skew in terms of “some data slices [having] more rows of a table than others”. We can’t use the traditional statistics definition of skew if our concern is unequal distribution of data across the partitions of our Spark tasks.

Consider a degenerate case where you have allocated 100 partitions to process a batch of data, and all the keys in that batch are from the same customer. Then, if we are using a hash or range partitioner, all records would be processed in one partition, while the other 99 would be idle. But clearly in this case the mean (average), the mode (most common value), and the median (the value ‘in the middle’ of the distribution) would all be the same. So, our data is not ‘skewed’ in the traditional sense, but definitely unequally distributed amongst our partitions. Perhaps a better term to use instead of ‘skewed’ would be ‘non-uniform’, but everyone uses ‘skewed’. So, fine. I will stop losing sleep over this and go with the data-processing literature usage of the term.

Techniques for Handling Data Skew

More Partitions

Increasing the number of partitions data may result in data associated with a given key being hashed into more partitions. However, this will likely not help when one or relatively few keys are dominant in the data. The following sections will discuss this technique in more detail.

Bump up spark.sql.autoBroadcastJoinThreshold

Increasing the value of this setting will increase the likelihood that the Spark query engine chooses the BroadcastHashJoin strategy for joins in preference to the more data intensive SortMergeJoin. This involves transmitting the smaller to-be-joined table to each executor’s memory, then streaming the larger table and joining row-by-row. As the size of the smaller table increases, memory pressure will also increase, and the viability of this technique will decrease.

Iterative (Chunked) Broadcast Join

When your smaller table becomes prohibitively large it might be worth considering the approach of iteratively taking slices of your smaller (but not that small) table, broadcasting those, joining with the larger table, then unioning the result. Here is a talk that explains the details nicely.

Adding salt

Add ‘salt’ to the keys of your data set by mapping each key to a pair whose first element is the original key, and whose second element is a random integer in some range. For very frequently occurring keys the range would be larger than for keys which occur with average or lower frequency.

Say you had a table with data like the one below:

        customerId  itemOrdered Quantity 
            USGOV   a-1         10 // frequently occurring
            USGOV   a-2         44 // frequently occurring
            USGOV   a-5         553// frequently occurring
            small1  a-1         2
            small1  a-1         4
            small3  a-1         2
            USGOV   a-5         553// frequently occurring
            small2  a-5         1

And you needed to join to a table of discounts to figure final price, like this:

        customerId  discountPercent
            USGOV   .010
            small1  .001
            small2  .001
            small3  .002

You would add an additional salt column to both tables, then join on the customerId and the salt, with the modified input to the join appearing as shown below. Note that ‘USGOV’ records used to wind up in one partition, but now, with the salt added to the key they will likely end up in one of three partitions (‘salt range’ == 3.) The records associated with less frequently occurring keys will only get one salt value (‘salt range’ == 1), as we don’t need to ensure that they end up in different partitions.

            customerId  salt  itemOrdered Quantity 
                USGOV   1     a-1         10   
                USGOV   2     a-2         44   
                USGOV   3     a-5         553  
                small1  1     a-1         2    
                small1  1     a-1         4    
                small3  1     a-1         2     
                USGOV   3     a-5         553
                small2  1     a-5         1

To ensure the join works, the salt column needs to be added to the smaller table, and for each random salt value associated with higher frequency keys we need to add new records (note there are now three USGOV records.) This will add to the size of the smaller table, but often this will be out-weighed by the efficiency gained from not having a few partitions loaded up with a majority of the data to be processed.

        customerId  salt discountPercent 
            USGOV   1    .010
            USGOV   2    .010
            USGOV   3    .010
            small1  1    .001
            small2  1    .001 
            small3  1    .002 

Adding More Partitions: Unhelpful When One Key Dominates

Before we look at code, lets consider a minimal contrived example where we have a data set of twelve records that needs to be distributed across our cluster, which we will accomplish by ‘mod’ing the key by the number of partitions. First consider a non-skewed data set where no key dominates, and 3 partitions. We see partition 0 gets filled with 5 items. While partition 2 get filled with three. This skewing is a result of the fact that we have very few partitions.

3 partitions: 0, 1, 2
distribute to partition via: key % 3

Uniform Data Set 

    key     partition
*   0       0 
    1       1
    2       2
*   3       0
    4       1
    5       2
*   6       0
    7       1
    8       2
*   9       0
    10      1
*   12      0 

Now, lets look at two skewed data sets, one in which one key (0) dominates, and another where the skewedness is the fault of two keys (0 and 12.) We will again partition by mod’ing by the number of available partitions. In both cases, partition 0 gets flooded with 8 of 12 records. Other partitions get only 2 records.

Skewed Data Set  -- One Key (0) Dominates


    key     partition
*   0       0
*   0       0
*   0       0
    1       1
    2       2
*   3       0
    4       1
    5       2
*   6       0
*   0       0
*   0       0
*   0       0




Skewed Data Set  -- No Single Key Dominates (0 & 12 occur most often)

    key     partition
*   0       0
*   0       0
*   0       0
    1       1
    2       2
*   3       0
    4       1
    5       2
*   6       0
*   12      0
*   12      0
*   12      0

Now let’s see what happens when we increase the number of partitions to 11, and distribute records across partitions by mod’ing by the same number. In the case where one key (0) dominates, we find that partition 0 still gets 7 out of 12 records. But when the ‘skew’ is spread across not one, but two keys (0 and 12), we find that only 3 out of 12 records end up in partition zero. This shows that the more ‘dominance’ is concentrated around a small set of keys (or one key, as often happens with nulls), the less we will benefit by simply adding partitions.

Skewed Data Set  -- One Key (0) Dominates



    key     partition
*   0       0
*   0       0
*   0       0
    1       1
    2       2
    3       3
    4       4
    5       5
*   6       6
*   0       0
*   0       0
*   0       0




Skewed Data Set  -- No Single Key Dominates (0/12 are most likely)

    key     partition
*   0       0
*   0       0
*   0       0
    1       1
    2       2
    3       3
    4       4
    5       5
    6       6
    12      1    
    12      1
    12      1

Adding More Partitions: A Simple Test Program

The main processing component of the test program I developed to explore what happens with skewed data does a mapPartitionsWithIndex over each partition of a Pair RDD with each pair consisting of the key, followed by the value. The keys are generated by the KeyGenerator object which will always generate 100 keys, either as a uniform distribution from 1 to 100, or as two flavors of skewed distribution, both of which have 55 random keys. The ‘oneKeyDominant’ distribution augments the 55 random keys with 55 0’s, while the ‘not oneKeyDominant’ distribution uses 3 high frequency keys: 0, 2, and 4, occurring 18, 18, and 19 times, respectively.

At the beginning of the mapPartitionsWithIndex we start a timer so we can see how much time it takes to completely process each partition. As we iterate over each key we call ‘process’ which emulates some complex processing by sleeping for 50 milliseconds.

    def process(key: (Int, Int), index: Int): Unit = {
      println(s"processing $key in partition '$index'")
      Thread.sleep(50)     // Simulate processing w/ delay
    }

    keysRdd.mapPartitionsWithIndex{
      (index, keyzIter) =>
        val start = System.nanoTime()
        keyzIter.foreach {
          key =>
            process(key, index)
        }
        val end = System.nanoTime()
        println(
          s"processing of keys in partition '$index' took " +
            s" ${(end-start) / (1000 * 1000)} milliseconds")
        keyzIter
    }
      .count
  }

The full code is presented in full below. As long as you use Spark 2.2+ you should be able to run this code by copy/pasting into any existing Spark project you might have. To reproduce the results we report in the next section, you need to manually set these variables:   numPartitions , useSkewed,  oneKeyDominant before you launch the application.

import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._

import scala.collection.immutable
import scala.util.Random


object KeyGenerator {
  val random = new Random()

  def getKeys(useSkewed: Boolean, 
              oneKeyDominant: Boolean)  : immutable.Seq[Int] = {

    def genKeys(howMany: Int,
                lowerInclusive: Int,
                upperExclusive: Int)  = {
      (1 to howMany).map{ i =>
        lowerInclusive + 
            random.nextInt(upperExclusive - lowerInclusive)
      }
    }

    val keys  =
      if (useSkewed) {
        val skewedKeys =
          if (oneKeyDominant)
            Seq.fill(55)(0)
        else
            Seq.fill(18)(0) ++ Seq.fill(18)(2) ++ Seq.fill(19)(4)

        genKeys(45, 1, 45) ++ skewedKeys
      }
      else {
        genKeys(100, 1, 100)
      }

    System.out.println("keys:" + keys);
    System.out.println("keys size:" + keys.size);

    keys
  }
}

object SaltedToPerfection extends App { 

  import KeyGenerator._
  def runApp(numPartitions: Int, 
             useSkewed: Boolean, 
             oneKeyDominant: Boolean) = {

    val keys: immutable.Seq[Int] = getKeys(useSkewed, oneKeyDominant)
    val keysRdd: RDD[(Int, Int)] =
      sparkSession.sparkContext.
        parallelize(keys).map(key => (key,key)). // to pair RDD
        partitionBy(new HashPartitioner(numPartitions))


    System.out.println("keyz.partitioner:" + keysRdd.partitioner)
    System.out.println("keyz.size:" + keysRdd.partitions.length)

    def process(key: (Int, Int), index: Int): Unit = {
      println(s"processing $key in partition '$index'")
      Thread.sleep(50)     // Simulate processing w/ delay
    }

    keysRdd.mapPartitionsWithIndex{
      (index, keyzIter) =>
        val start = System.nanoTime()
        keyzIter.foreach {
          key =>
            process(key, index)
        }
        val end = System.nanoTime()
        println(
          s"processing of keys in partition '$index' took " +
            s" ${(end-start) / (1000 * 1000)} milliseconds")
        keyzIter
    }
      .count
  }


  lazy val sparkConf = new SparkConf()
    .setAppName("Learn Spark")
    .setMaster("local[4]")

  lazy val sparkSession = SparkSession
    .builder()
    .config(sparkConf)
    .getOrCreate()


  val numPartitions = 50
  val useSkewed = true
  val oneKeyDominant = true

  runApp(numPartitions, useSkewed, oneKeyDominant)
  Thread.sleep(1000 * 600)    // 10 minutes sleep to explore with UI
}

Adding More Partitions: Test Results

The results obtained from running our test program accorded with the informal analysis we performed above on various cardinality=12 data sets, namely, that increasing the number of partitions is more helpful when more than one key dominates the distribution. When one key dominates, increasing partitions improved performance by 16% (see difference between runs 3 and 5), whereas when multiple keys dominate the distribution we saw an improvement of 29% (see difference between runs 2 and 4.)

Run     Partitions      Skew                        Job Duration

1       4               none                        2.057556 s
2       4               multiple dominant keys      3.125907 s
3       4               one dominant key            4.045455 s
4       50              multiple dominant keys      2.217383 s
5       50              one dominant key            3.378734 s



Performance improvements obtained by increasing partitions (4->50)

    one dominant key    
        Elapsed time difference between run 3 and 5
        (4.045455 - 3.378734) / 4.045455  = 16%

    multiple dominant keys
        Elapsed time difference between run 2 and 4
        (3.125907 - 2.217383) / 3.125907  = 29%

Assessing Significant Difference In Pairwise Combinations via the Marascuilo Procedure (in R)

Some time back, while tinkering with R, I coded up a version of the Marascuilo procedure and wrote up the results in a post to my old blog, which I am now resurrecting here. As you probably know, the Marascuilo procedure is used to analyze the difference between two proportions in a contingency table to determine if that difference in proportion is significant or not. The function I wrote will take every possible pairwise combination and print Y(es) or N(o) to indicate whether or not the differences in proportions are statistically significant.

For a real world-ish use case, imagine you are managing three hotels: the Grand Plaza (GP), Plaza Royale (PR), and the Plaza Prima (PP). One determinant of service quality at your hotels is the presence or absence of vermin (insects, rodents, etc.) Your staff has conducted a survey of guests at all three hotels in which they were asked, “were you bothered by any vermin during your stay?” You decide to use the Marascuilo Procedure to determine if any one (or more) hotels is/are significantly under-performing other hotels in the ‘infested with vermin’ category.

The R commands below show the frequency table that captures the survey responses, and shows how we would invoke our marascuilo function to determine which (if any) hotel’s performance is significantly different from the others. Note that our function outputs the results of three pairwise combinations. This is correct because we have three items and there 3 choose 2, i.e., three, ways to pick two items from a list of three.

> lines <- "
+          GP    PR     PP
+ Y        128   199    126
+ N        88    33     66 
+ "
> 
> con <- textConnection(lines)
> tablefoo <- read.table(con, header=TRUE)
> close(con)
> 
> 
> marascuilo(tablefoo) 

      pair      abs.diff             critical.range       significant
[1,] "GP | PR" "0.265166028097063"  "0.0992354018215412" "Y"
[2,] "GP | PP" "0.0636574074074074" "0.117201905174372"  "N"
[3,] "PR | PP" "0.201508620689655"  "0.100947721261772"  "Y"

The results indicate no significant difference between the performance of the Grand Plaza (GP) and the Plaza Prima (PP) on the metric in question. However both the Grand Plaza and the Plaza Prima (PP) are shown to differ significantly from the Plaza Royale (PR). The Plaza Royale guest’s proportion of ‘yes’ responses to the vermin question was the highest of the three hotels. Therefore, if you decide to take any action to address this problem, you should probably start with the Plaza Prima. (A flame thrower might help.)

The Code

#   marascuilo - 
# 
#   Perform the Marascuilo procedure on all pairwise combinations of 
#   proportion differences from a contingency table to see which one
#   (if any) is significant.
# 
#   Arguments are:
#
#       dataFrame:
#           a data.frame with named rows and columns. The 
#           names of the groups being compared are assumed to be the columns.
#
#       confidence:
#           the degree of confidence with which to estimate the chi squared constant.
#           the default is .95.
#
marascuilo = function(dataFrame,confidence=.95) {

 chiResult = chisq.test (dataFrame, correct=FALSE )
 xSquared = chiResult$statistic

 # Generate all possible pair-wise combinations of groups
 colNames = names(dataFrame)
 combos = combn(colNames , 2)
 numCombos = dim(combos)[2]  # combos is an array of pairs, we want the length


 # Allocate matrix (initially 0 rows) for results
 results = matrix(nrow=0, ncol=5, dimnames=getResultsColumNames() )

 chiSquaredConstant = calcChiSquaredConstant(dataFrame, confidence)
 for (i in 1: numCombos) { 
   newRow = testSignificanceOfAbsDiffVsCriticalRange(
                        dataFrame, combos, i, chiSquaredConstant ) 
    results = rbind(results, newRow)        # append new row to results
 }


 # sort results so that the pair differences that most strikingly exceed 
 # the critical range appear toward the top.
 sortedResults = results[  order( results[,'abs.diff-critical.range'] ) , ]
 return (sortedResults )
}


calcChiSquaredConstant = function(dataFrame,confidence) {
  nRows = dim(dataFrame)[1]  
  nCols = dim(dataFrame)[2]  

  degreesFreedom =  (nRows-1) * (nCols-1) 
  chiSquaredConstant = sqrt( qchisq(confidence,degreesFreedom) )

  return (chiSquaredConstant)
}


getResultsColumNames =  function (numRows) {
   return ( 
        list( 
            c(), 
            c('pair', 'abs.diff', 'critical.range', 'abs.diff-critical.range', 'significant')
        ) 
   )
}

# test significance for ith combination
#
testSignificanceOfAbsDiffVsCriticalRange = function(
                dataFrame, combos, i,  chiSquaredConstant) {

   results = matrix(nrow=1, ncol=5, dimnames=getResultsColumNames() )

   pair1=combos[1,i]
   pair2=combos[2,i]

   # sum column denoted by name 'pair1' into groupTotal1 
   groupTotal1 = sum( dataFrame[ , pair1])  
   groupTotal2 = sum( dataFrame[ , pair2])  # do same thing for pair2... 

   p1 = dataFrame[1, pair1] / groupTotal1 
   p2 = dataFrame[1, pair2] / groupTotal2
   p1Not = (1 - p1)
   p2Not = (1 - p2)

    absDiff = abs( p2  - p1 )

    criticalRange = chiSquaredConstant  * 
                        sqrt(p1*p1Not/groupTotal1 + p2*p2Not/groupTotal2)
    results[1, 'pair'] = paste(pair1,"|",pair2) 
    results[1, 'abs.diff'] = round(absDiff,3)
    results[1, 'critical.range'] = round(criticalRange ,3)
    results[1, 'abs.diff-critical.range'] = round(absDiff - criticalRange ,3)


    if (absDiff > criticalRange) {
        results[1, 'significant'] = 'Y'
    } else {
        results[1, 'significant'] = 'N'
    } 

    return(results)
}

How To Inspect Attribute Info of Nodes in a JQuery Select List

I haven’t done front-end programming for a while, but assuming JQuery is not yet dead, it might be worth resurrecting this post from my old blog for the benefit of those interested in dumping the content of nodes in a JQuery select list. I was never able to find an ‘official’ way to do this kind of debugging — but that doesn’t mean there isn’t one. This post just presents the technique that I ended up using, which boils down to using good old fashioned DOM functions to figure out a node’s name and attributes.

The code below uses a JQuery select expression to grab all nodes that have an “id” attribute, whose “selected” attribute is “true”, and whose name ends in “man”. We iterate through those nodes (actually only one is found), and dump out any inner content using DOM functions.

Upon saving this code snippet to a local file system and opening in your browser you should see the pop-up message ‘got a node’ with the attributes of the found node enumerated. One caveat: if you see no pop-up alert, this might be due to link rot: the JQuery reference from the “text/javascript” might have gotten stale. In this case you’ll find an error in the Javascript console reading something like: “$ is not defined” (that means JQuery is not available.) The fix would be to update the link reference to correctly point to the latest JQuery distribution.

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
                    "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
 <script type="text/javascript"
        src="http://jqueryui.com/latest/jquery-1.3.2.js"></script>

  <script>
  $(document).ready(function(){

 var gatherAttributes = function(attributes) {
        var attribs = ""
        if (attributes) {
            for (var i = 0; i < attributes.length; i++) {
                attribs = attribs + "| "
                    + attributes[i].nodeName + "=" +
                        attributes[i].nodeValue
            }

        }
        return attribs

    }

    var dumpNode = function(node) {
        alert("got a node: " + node.nodeName + " with attributes:  " +
              gatherAttributes(node.attributes)  +
              " /  and inner content = " + $(node).html());
    }

    alert("HI");

    $("input[selected=true][id][name$='man']").each (
             function() {
                 alert("got a match " + $(this).val());
                dumpNode(this);

  });

  });

  </script>

</head>
<body>
  <input id="man-news" name="man-news" >        foo1</input>
  <input name="milkman" >                       foo2</input>
  <input id="fetterman" name="new-fetterman" >  foo3</input>
  <input selected="true" id="letterman" name="new-letterman" >  foo3</input>
  <input name="newmilk" >                       foo4</input>
</body>
</html>

Configuring the Xerces XML Parser With Content Model Defaults

My previous post on JSON schema included a slight dig at XML, which perhaps wasn’t really warranted. True, XML is clunkier and more verbose than JSON, but it has its strong points. The clincher for me in past projects has been the superior expressiveness of XML’s schema format: XSD. XSD has quite a few capabilities that JSON schema lacks, such as referential integrity constraints,   and the ability to specify attribute defaults in one’s content model. This article provides a quick overview of the latter feature via a code sample that illustrates how to configure Apache Xerces.  The configuration we present enables you to read in XML content such that it is auto-populated with the proper default values for attributes, even if the original source XML does not contain any definition at all for those attributes.

Why is this useful? Well, suppose you have developed a content model that allows your users to configure some run time data. For example, say you have a game with actors that can be animals or people. You define an XSD (schema) which allows game configurers to define a cast of characters for the game using XML, like this:

 <animal name="Rover"/>
 <person name="Bob"/>
 <animal name="Fluffy"/>

Each character type has an associated class which defines the behavior of the character in the game. You provide defaults for each character, but you also allow your game configurers to define and reference their own classes. In typical usage, let’s assume that your configurers will want to go with the defaults. In this case you don’t want them  to have to tediously type out the default class name for each character instance. Forcing them to do so would  likely result in typos (and ClassNotFound errors), and would potentially hinder your ability to refactor the names of your default classes when you release new versions of your game.

So you develop an XSD similar to the one shown below.

<xs:schema
        xmlns="http://com.lackey/dog" targetNamespace="http://com.lackey/dog"
        attributeFormDefault="unqualified"
        elementFormDefault="qualified"
        xmlns:xs="http://www.w3.org/2001/XMLSchema">

  <xs:element name="animal" type="animalType"  />
  <xs:element name="person" type="personType"  />

  <xs:complexType name="animalType">
    <xs:simpleContent>
      <xs:extension base="xs:string">
        <xs:attribute type="xs:string"
            name="name" use="required"/>
        <xs:attribute type="xs:string"
            name="behaviorClass"
            default="com.lackey.animal.behavior.AnimalBehavior"
            use="optional"/>
      </xs:extension>
    </xs:simpleContent>
  </xs:complexType>

  <xs:complexType name="personType">
    <xs:simpleContent>
      <xs:extension base="xs:string">
        <xs:attribute type="xs:string"
            name="name" use="required"/>
        <xs:attribute type="xs:string"
            name="behaviorClass"
            default="com.lackey.animal.behavior.PersonBehavior"
            use="optional"/>
      </xs:extension>
    </xs:simpleContent>
  </xs:complexType>
</xs:schema>

The key feature to note is the definition  of the ‘behaviorClass’ attribute for each model type (animalType and personType), which looks like this:

<xs:attribute type="xs:string"
   name="behaviorClass"
   default="com.lackey.animal.behavior.AnimalBehavior"
   use="optional"/>

Users may elect to leave out the ‘behaviorClass’ attribute from their character definitions, but if you use a validating XML parser, such as Xerces, and configure it as shown in the remainder of this article, when you read and process the XML you will see that the parser fills in the behaviorClass attribute with the correct default.

For example, if your source XML was:

<animal xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"     xsi:schemaLocation="http://com.lackey/dog /tmp/animal.xsd"     xmlns="http://com.lackey/dog"     
name="rover"/>

The parser would deliver the following content to you (this is called the “Post Schema Validation Infoset” if you want to explore the theory in more depth):

 <animal    
xmlns="http://com.lackey/dog"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    behaviorClass="com.lackey.animal.behavior.AnimalBehavior"    name="rover"    
xsi:schemaLocation="http://com.lackey/dog /tmp/animal.xsd"/>

The next chunk of (Groovy) code presents a unit test which illustrates how to configure Xerces to inject content model defaults.

import org.testng.annotations.Test
import org.w3c.dom.Document
import org.xml.sax.InputSource
import org.xml.sax.SAXException

import javax.xml.parsers.DocumentBuilder
import javax.xml.parsers.DocumentBuilderFactory
import javax.xml.parsers.ParserConfigurationException
import javax.xml.transform.OutputKeys
import javax.xml.transform.Transformer
import javax.xml.transform.TransformerException
import javax.xml.transform.TransformerFactory
import javax.xml.transform.dom.DOMSource
import javax.xml.transform.stream.StreamResult

public class ParsingTest {


    String xmlDoc =
            """
<animal xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://com.lackey/dog /tmp/animal.xsd"
         xmlns="http://com.lackey/dog"
         name="rover"/>
"""

    String xmlSchema =
            """<?xml version="1.0" encoding="UTF-8"?>
<xs:schema
        xmlns="http://com.lackey/dog" targetNamespace="http://com.lackey/dog"
        attributeFormDefault="unqualified"
        elementFormDefault="qualified"
        xmlns:xs="http://www.w3.org/2001/XMLSchema">

  <xs:element name="animal" type="animalType"  />
  <xs:element name="person" type="personType"  />

  <xs:complexType name="animalType">
    <xs:simpleContent>
      <xs:extension base="xs:string">
        <xs:attribute type="xs:string"
            name="name" use="required"/>
        <xs:attribute type="xs:string"
            name="behaviorClass"
            default="com.lackey.AnimalBehavior"
            use="optional"/>
      </xs:extension>
    </xs:simpleContent>
  </xs:complexType>

  <xs:complexType name="personType">
    <xs:simpleContent>
      <xs:extension base="xs:string">
        <xs:attribute type="xs:string"
            name="name" use="required"/>
        <xs:attribute type="xs:string"
            name="behaviorClass"
            default="com.lackey.PersonBehavior"
            use="optional"/>
      </xs:extension>
    </xs:simpleContent>
  </xs:complexType>

</xs:schema>
"""


    @Test(enabled = true)
    public void testHappyPath() {
        validateXml(xmlDoc)
    }

    private void validateXml(String xmlText) {
        String xmlPath = "/tmp/animal.xml"
        String xsdPath = "/tmp/animal.xsd"

        File xml = new File(xmlPath)
        File xsd = new File(xsdPath)

        // write file content to temp files
        xml.text = xmlText;
        xsd.text = xmlSchema;

        println "path to xml is " + xml.canonicalPath
        println "path to xsd is " + xsd.canonicalPath

        Document doc =
                parseToDom(
                        xmlText,
                        "http://com.lackey/dog", xsd.path)
        System.out.println("doc:" + doc);

        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        printDocument(doc, baos);
        def parsedDoc = baos.toString()
        System.out.println("parsedDoc:" + parsedDoc);
        assert parsedDoc.contains(
                "behaviorClass=\"com.lackey.AnimalBehavior")

    }

    public static Document parseToDom(final String xmlContent,
                                      final String nameSpace,
                                      final String xsdPath)
            throws ParserConfigurationException,
                    SAXException,
                    IOException {
        final DocumentBuilderFactory dbf =
                    DocumentBuilderFactory.newInstance();
        if (null != xsdPath) {
            final File xsd = new File(xsdPath);
            if (!xsd.exists()) {
                throw new IllegalArgumentException(
                        "no xsd found at path: $xsdPath");
            }
            dbf.setNamespaceAware(true);
            dbf.setAttribute(
                    "http://apache.org/xml/features/validation/schema",
                    Boolean.TRUE);
            dbf.setAttribute(
                    "http://xml.org/sax/features/validation",
                    Boolean.TRUE);
            dbf.setAttribute(
                    "http://apache.org/xml/features/validation/schema/normalized-value",
                    Boolean.TRUE);
            dbf.setAttribute(
                    "http://apache.org/xml/features/validation/schema/element-default",
                    Boolean.TRUE);
            dbf.setAttribute(
                    "http://apache.org/xml/properties/schema/external-schemaLocation",
                    nameSpace + " " + xsdPath);
        }

        final DocumentBuilder db = dbf.newDocumentBuilder();
        final InputSource is = new InputSource();
        is.setCharacterStream(new StringReader(xmlContent));
        return db.parse(is);
    }

    public static void printDocument(Document doc,
                                     OutputStream out)
            throws IOException, TransformerException {
        TransformerFactory tf = TransformerFactory.newInstance();
        Transformer transformer = tf.newTransformer();
        transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "no");
        transformer.setOutputProperty(OutputKeys.METHOD, "xml");
        transformer.setOutputProperty(OutputKeys.INDENT, "yes");
        transformer.setOutputProperty(OutputKeys.ENCODING, "UTF-8");
        transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "4");

        transformer.transform(new DOMSource(doc),
                new StreamResult(new OutputStreamWriter(out, "UTF-8")));
    }
}

Reducing Integration Hassles With JSON Schema Contracts

I recently worked on a project where the ‘contract’ between service consumers and providers consisted primarily of annotated mock-ups of the JSON responses one would obtain from each of a given service’s end-points. A much better way of expressing the contract for a service is to use a standard schema format. If your stuck with XML, use XML schema. If you are using JSON then there are tools and libraries (presented below ) which will help you use JSON schema to express a service’s contract. This article will assume that you have gone through the available JSON schema documentation and have a basic ideas of how to use it. It assumes that you are developing on a JVM-based platform, and most of the recipes will be helpful for Java developers (although our example of dynamic schema validation is presented using a bit of Scala.)

Why Use JSON Schema As Your Contract ?

Suppose you are supporting a JSON-based service, with your contract expressed in some type of “by-example” format rather than the JSON schema standard. Now one of the components consuming your service throws an exception while parsing a response. The developer of said client service comes to you and says “your service has a problem”. Well, both of you then have to pore over the examples that define your service’s responses and figure out if the response sent in this instance honors or violates the implicit contract. This is a very manual process with room for mistakes, and at the worst, can lead to finger pointing and debates about whether the response is correct. Not fun.

However, if the server and client teams on your project come to agreement on a schema for each JSON response, then the task of figuring out if a given response is correct boils down to simply running a validation tool where the inputs are the response document in question, and the schema to which it must conform. If the validator reports no errors then you are off the hook, with no debate.

Json Schema Tools

This section describes how to install and use various tools for auto-generation of JSON schema from sample documents, generation of sample instance documents from schema, and schema validation. As long as your environment is configured with Java 1.8, Python 2.7+, and the pip installer, then the provided set-up instructions should work on either Linux or Mac (at least they worked for me!)

Auto-generating JSON Schema From Instance Documents

genson is a utility for auto-generating JSON schema from instance documents. It can be installed via the command

    sudo pip install genson==0.1.0   # install it

Next try generating a schema for a simple document.

    echo '{ "foo": 100 }'  > /tmp/foo.json
    cat /tmp/foo.json | genson | tee /tmp/foo.schema 

foo.schema should contain the following content:

    {
      "$schema": "http://json-schema.org/schema#",
      "required": [
        "foo"
      ],
      "type": "object",
      "properties": {
        "foo": {
          "type": "integer"
        }
      }
    }

Sometimes you will be generating multiple schemas from a related set of JSON documents (e.g., you might be starting from a set of sample responses from a legacy service with no defined schema, which you plan to retrofit .) In this case you will definitely want to familiarize yourself with the $ref keyword which lets you refactor commonly occurring fragments of schema code into one place (even a different file.)

generation of sample instance documents from schema

Once you have a schema you can feed it into a tool, such as this one from Liquid Technologies, to facilitate generation of mock data that you can use for testing.

Command LINE TOOLS FOR Schema validation

The best command line tool I have found for JSON schema validation is json-schema-validator. Its current documentation indicates support for JSON Schema draft v4 which is a bit behind the latest draft (7, at the time of this writing.) So, if you need the latest spec-supported features in your schemas, you should take extra care to ensure this tool is right for your needs.

Assuming you have gone through the previous step of installing and testing genson, you can download and verify the validator via the commands below (if you are on a Mac without wget, then please try curl):

wget 'https://bintray.com/fge/maven/download_file?file_path=com%2Fgithub%2Ffge%2Fjson-schema-validator%2F2.2.6%2Fjson-schema-validator-2.2.6-lib.jar' -O /tmp/validator.jar

# now validate your sample document against the schema you created above

cd /tmp ;  java -jar validator.jar /tmp/foo.schema /tmp/foo.json

You should see:

validation: SUCCESS

Now let’s see how the tool reports validation failures. Deliberately mess up your instance document (so it no longer conforms to the schema) via the command:

cat /tmp/foo.json |  sed -e's/foo/zoo/' > /tmp/bad.json

cd /tmp ; java -jar validator.jar /tmp/foo.schema /tmp/bad.json

You should see error output which includes the line:

"message" : "object has missing required properties ([\"foo\"])",

On THE FLY SCHEMA VALIDATION At RUN-TIME

When previously discussed, the json-schema-validator was shown in command line mode. As a bonus you can also embed this this project’s associated Java library into any of your services that require run-time validation of arbitrary instance documents against a schema. The code snippet below (available as a project here)  is written in Scala, but you could easily use this in Java projects as well.


import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.JsonNode
import com.github.fge.jackson.JsonLoader
import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import com.fasterxml.jackson.databind._
import com.github.fge.jsonschema.core.report.ProcessingReport

object SchemaValidator {
lazy val mapper: ObjectMapper = new ObjectMapper
lazy val jsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
lazy val schemaNode: JsonNode = JsonLoader.fromResource("/schema.json")
lazy val schema: JsonSchema = jsonSchemaFactory.getJsonSchema(schemaNode)

def validateWithReport(json: String): Boolean = {
val bytes: Array[Byte] = json.getBytes("utf-8")
val parser: JsonParser = mapper.getFactory.createParser(bytes)
val node: JsonNode = mapper. readTree( parser)
val validationResult: ProcessingReport = schema.validate(node)
if (validationResult.isSuccess) {
true
} else {
val errMsg =
  s"Validation error. Instance=$json, msg=$validationResult"
System.out.println("errMsg:" + errMsg)
false
}
}
}

object FakeGoodWebService {
def getJsonResponse = """{ "foo": 100 }"""
}

object FakeBadWebService {
def getJsonResponse = """{ "zoo": 100 }"""
}
import com.fasterxml.jackson.core.JsonParser
import com.fasterxml.jackson.databind.JsonNode
import com.github.fge.jackson.JsonLoader
import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import com.fasterxml.jackson.databind._
import com.github.fge.jsonschema.core.report.ProcessingReport

object SchemaValidator {
lazy val mapper: ObjectMapper = new ObjectMapper
lazy val jsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
lazy val schemaNode: JsonNode = JsonLoader.fromResource("/schema.json")
lazy val schema: JsonSchema = jsonSchemaFactory.getJsonSchema(schemaNode)

def validateWithReport(json: String): Boolean = {
val bytes: Array[Byte] = json.getBytes("utf-8")
val parser: JsonParser = mapper.getFactory.createParser(bytes)
val node: JsonNode = mapper. readTree( parser)
val validationResult: ProcessingReport = schema.validate(node)
if (validationResult.isSuccess) {
true
} else {
val errMsg = s"Validation error. Instance=$json, msg=$validationResult"
System.out.println("errMsg:" + errMsg)
false
}
}
}

object FakeGoodWebService {
def getJsonResponse = """{ "foo": 100 }"""
}

object FakeBadWebService {
def getJsonResponse = """{ "zoo": 100 }"""
}


object JsonSchemaValidationDemo extends App {
import SchemaValidator._

val goodResult =
validateWithReport(
FakeGoodWebService.getJsonResponse)
System.out.println("result:" + goodResult);

val badResult =
validateWithReport(
FakeBadWebService.getJsonResponse)
System.out.println("result:" + badResult);
}





object JsonSchemaValidationDemo extends App {
import SchemaValidator._

val goodResult = validateWithReport(FakeGoodWebService.getJsonResponse)
System.out.println("result:" + goodResult);

val badResult = validateWithReport(FakeBadWebService.getJsonResponse)
System.out.println("result:" + badResult);
}



We have stashed the ‘foo’ schema from our previous discussion into src/main/resources and the object constructor for SchemaValidator loads that schema into the ‘schema’ variable. We then call validateWithReport from JsonSchemaValidationDemo first with a valid response from a mock of a nicely behaving web service, then we feed validateWithReport a JSON response from a misbehaving web service. The resultant output is shown below.

result:true
errMsg:Validation error. Instance={ "zoo": 100 }, 
    msg=com.github.fge.jsonschema.core.report.ListProcessingReport: failure
--- BEGIN MESSAGES ---
error: object has missing required properties (["foo"])
    level: "error"
    schema: {"loadingURI":"#","pointer":""}
    instance: {"pointer":""}
    domain: "validation"
    keyword: "required"
    required: ["foo"]
    missing: ["foo"]
---  END MESSAGES  ---

result:false

Conclusion

Miscommunication and incorrect assumptions are most likely at what formally trained project managers call “interface points at subsystem boundaries” (you can read up more here.) But now you have some tools for minimizing the thrash and churn that can occur around these interface points.

License

This work is licensed under the Creative Commons Attribution 4.0 International License. Use as you wish, but if you can, please give attribution to the Data Lackey Labs Blog.