Time for an Article on The Spark SQL Time Function ‘from_utc_timestamp’

TLDR

If your Spark application needs to perform date functions such as ‘weekday’ on timestamped records, then the timestamps of the records you ingest will be ideally formatted per the ISO 8601 standard (with time zone specified). Assuming you have some influence over the format of your data source, then adhering to this guideline will ensure that the string representation of the timestamps you parse will be mapped into a consistent internal representation, no matter which default time zone your JVM is set to when you do the parsing. Next, you need to decide whether you want the human-readable, String-formatted result of your date function to be dependent on your JVM’s default timezone, or have this setting parameterized (the latter being better for testability).

Avoiding Dependencies On JVM Time Zone Setting When Parsing

When working with time I like to express my inputs, and store my intermediate results in UTC time. This means inputs ideally arrive in the form of ISO 8601 formatted strings, and intermediate results are stored either as (i) a Long numeric value which is interpreted as a number-of-seconds offset from the instant at which the clock struck midnight in Greenwhich, England on the first day of the Unix Epoch: January, 1970 — or alternatively (ii), as a Java 8 Instant which also wraps a Long which represents the seconds offset from the epoch.

When initially faced with ingesting timestamped data I opted to use to_utc_timestamp and got results that were the exact opposite of what I expected. I won’t go into a tangent on why this happened, I will just save you some pain by recommending that you go with this function’s inverse, from_utc_timestamp, when parsing timestamp strings. The Scaladoc for this function is:

  /**
   * Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders
   * that time as a timestamp in the given time zone. For example, 'GMT+1' would yield
   * '2017-07-14 03:40:00.0'.
   *
   * @param ts A date, timestamp or string. If a string, the data must be in a format that can be
   *           cast to a timestamp, such as `yyyy-MM-dd` or `yyyy-MM-dd HH:mm:ss.SSSS`
   * @param tz A string detailing the time zone that the input should be adjusted to, such as
   *           `Europe/London`, `PST` or `GMT+5`
   * @return A timestamp, or null if `ts` was a string that could not be cast to a timestamp or
   *         `tz` was an invalid value
   * @group datetime_funcs
   * @since 1.5.0
   */

So, on reading this, I thought that whatever date/time valued string I provided would be interpreted as being in UTC (it says exactly that!), and that I would get the same results irrespective of where in the world I was running my service (i.e., regardless of the default timezone setting of the JVM running the app.) So to test things out I launched the Spark shell from where I live (California, GMT-8, at the time of this writing), and I tried a date/time string set to the start of the Unix epoch: ‘1970-01-01 00:00’. Given this input I expected the long value of the internal representation of the stored timestamp would be zero, but the value I got was 28800, as shown below.

import org.apache.spark.sql.types._

val df = List(("1970-01-01 00:00")).toDF("timestr").
              withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
              withColumn("utc_utc_timestamp_as_int",   'utcAssumeStringUTC.cast(IntegerType))

df.show()

// Result:
//  +----------------+-------------------+------------------------+
//  |         timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int|
//  +----------------+-------------------+------------------------+
//  |1970-01-01 00:00|1970-01-01 00:00:00|                   28800|
//  +----------------+-------------------+------------------------+

After fiddling with the inputs a bit and browsing the source I realized that what I need to do was to use ISO 8601 strings for my timestamps, which seemed to work well, even though the Spark Scaladoc doesn’t explicitly mention this. After settling on this format, I ran my scriptlet twice, once with the default timezone for my machine, with this command

> spark-shell  

and once for GMT (Greenwhich Mean Time, a.k.a. UTC) with this command.

> spark-shell  --driver-java-options "-Duser.timezone=UTC"

Both times I got a timestamp with an Long value of zero, as expected:

import org.apache.spark.sql.types._
  val df = List(("1970-01-01T00:00:00+00")).toDF("timestr").
        withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
        withColumn("utc_utc_timestamp_as_int",   'utcAssumeStringUTC.cast(IntegerType))
df.show()

// Result:
// +--------------------+-------------------+------------------------+
// |             timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int|
// +--------------------+-------------------+------------------------+
// |1970-01-01T00:00:...|1970-01-01 00:00:00|                       0|
// +--------------------+-------------------+------------------------+

(A side note on the column utcAssumeStringUTC: when output via show() it will render as a string differently depending on your JVM’s default time zone setting. But if you cast it to an integer it will correctly show up as zero.)

Next I applied the weekday column transformation on the time stamp column utcAssumeStringUTC (below), and I found that the output I got again differed depending on the default timezone setting of the JVM running my code. This scriptlet:

import org.apache.spark.sql.types._
  val df = List(("1970-01-01T00:00:00+00")).toDF("timestr").
        withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
        withColumn("utc_utc_timestamp_as_int",   'utcAssumeStringUTC.cast(IntegerType)).
        withColumn("weekday", date_format($"utcAssumeStringUTC", "EEEE"))
df.show()

gave the result

+--------------------+-------------------+------------------------+---------+
|             timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int|  weekday|
+--------------------+-------------------+------------------------+---------+
|1970-01-01T00:00:...|1969-12-31 16:00:00|                       0|Wednesday|
+--------------------+-------------------+------------------------+---------+

when run from a Spark shell launched with no special JVM options (resulting in a timezone setting of GMT-8 on my machine), and it gave the result

+--------------------+-------------------+------------------------+--------+
|             timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int| weekday|
+--------------------+-------------------+------------------------+--------+
|1970-01-01T00:00:...|1970-01-01 00:00:00|                       0|Thursday|
+--------------------+-------------------+------------------------+--------+

when run from a Spark shell launched like this:

spark-shell  --driver-java-options "-Duser.timezone=UTC"

From an end-user perspective this is typically the behavior I would want. Let’s say an event I was processing happened at the start of the Unix epoch. This is a timezone independent concept. At the time of the event I might have been in Greenwhich England, in which case my calendar and wall clock would have shown Thursday, Jan 1, 1970 12:00 midnight, respectively. But I might have been at home in California, in which case I would have seen Wednesday, Dec 12, 1969 4pm. When I display the day of the week that the event happened for a human user, that user will most likely expect a result in the context of the timezone they are currently in.

Now, although our user’s may expect this, we probably don’t want our code to be producing different results for the same inputs depending on the default timezone of the JVM running our app. For this reason, I would recommend parameterizing the timezone context that your application will use to display these types of results. A better way to display the day of the week that an event happened is to do one more round of conversion using from_utc_timestamp and the parameterized value of your desired timezone context, like this:

val timezone = "America/Los_Angeles"

import org.apache.spark.sql.types._
  val df = List(("1970-01-01T00:00:00+00")).toDF("timestr").
        withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
        withColumn("x", from_utc_timestamp('utcAssumeStringUTC,  timezone )).
        withColumn("weekday", date_format($"x", "EEEE") )

df.show()

No matter what time zone is in effect for your JVM, the output will be the same (as show below). Of course in a real app you would not hard code this value, but instead read it from some type of config file, with a fall back to the default timezone setting of your JVM.

+--------------------+-------------------+-------------------+---------+
|             timestr| utcAssumeStringUTC|                  x|  weekday|
+--------------------+-------------------+-------------------+---------+
|1970-01-01T00:00:...|1970-01-01 00:00:00|1969-12-31 16:00:00|Wednesday|
+--------------------+-------------------+-------------------+---------+

Assessing Significant Difference In Pairwise Combinations via the Marascuilo Procedure (in R)

Some time back, while tinkering with R, I coded up a version of the Marascuilo procedure and wrote up the results in a post to my old blog, which I am now resurrecting here. As you probably know, the Marascuilo procedure is used to analyze the difference between two proportions in a contingency table to determine if that difference in proportion is significant or not. The function I wrote will take every possible pairwise combination and print Y(es) or N(o) to indicate whether or not the differences in proportions are statistically significant.

For a real world-ish use case, imagine you are managing three hotels: the Grand Plaza (GP), Plaza Royale (PR), and the Plaza Prima (PP). One determinant of service quality at your hotels is the presence or absence of vermin (insects, rodents, etc.) Your staff has conducted a survey of guests at all three hotels in which they were asked, “were you bothered by any vermin during your stay?” You decide to use the Marascuilo Procedure to determine if any one (or more) hotels is/are significantly under-performing other hotels in the ‘infested with vermin’ category.

The R commands below show the frequency table that captures the survey responses, and shows how we would invoke our marascuilo function to determine which (if any) hotel’s performance is significantly different from the others. Note that our function outputs the results of three pairwise combinations. This is correct because we have three items and there 3 choose 2, i.e., three, ways to pick two items from a list of three.

> lines <- "
+          GP    PR     PP
+ Y        128   199    126
+ N        88    33     66 
+ "
> 
> con <- textConnection(lines)
> tablefoo <- read.table(con, header=TRUE)
> close(con)
> 
> 
> marascuilo(tablefoo) 

      pair      abs.diff             critical.range       significant
[1,] "GP | PR" "0.265166028097063"  "0.0992354018215412" "Y"
[2,] "GP | PP" "0.0636574074074074" "0.117201905174372"  "N"
[3,] "PR | PP" "0.201508620689655"  "0.100947721261772"  "Y"

The results indicate no significant difference between the performance of the Grand Plaza (GP) and the Plaza Prima (PP) on the metric in question. However both the Grand Plaza and the Plaza Prima (PP) are shown to differ significantly from the Plaza Royale (PR). The Plaza Royale guest’s proportion of ‘yes’ responses to the vermin question was the highest of the three hotels. Therefore, if you decide to take any action to address this problem, you should probably start with the Plaza Prima. (A flame thrower might help.)

The Code

#   marascuilo - 
# 
#   Perform the Marascuilo procedure on all pairwise combinations of 
#   proportion differences from a contingency table to see which one
#   (if any) is significant.
# 
#   Arguments are:
#
#       dataFrame:
#           a data.frame with named rows and columns. The 
#           names of the groups being compared are assumed to be the columns.
#
#       confidence:
#           the degree of confidence with which to estimate the chi squared constant.
#           the default is .95.
#
marascuilo = function(dataFrame,confidence=.95) {

 chiResult = chisq.test (dataFrame, correct=FALSE )
 xSquared = chiResult$statistic

 # Generate all possible pair-wise combinations of groups
 colNames = names(dataFrame)
 combos = combn(colNames , 2)
 numCombos = dim(combos)[2]  # combos is an array of pairs, we want the length


 # Allocate matrix (initially 0 rows) for results
 results = matrix(nrow=0, ncol=5, dimnames=getResultsColumNames() )

 chiSquaredConstant = calcChiSquaredConstant(dataFrame, confidence)
 for (i in 1: numCombos) { 
   newRow = testSignificanceOfAbsDiffVsCriticalRange(
                        dataFrame, combos, i, chiSquaredConstant ) 
    results = rbind(results, newRow)        # append new row to results
 }


 # sort results so that the pair differences that most strikingly exceed 
 # the critical range appear toward the top.
 sortedResults = results[  order( results[,'abs.diff-critical.range'] ) , ]
 return (sortedResults )
}


calcChiSquaredConstant = function(dataFrame,confidence) {
  nRows = dim(dataFrame)[1]  
  nCols = dim(dataFrame)[2]  

  degreesFreedom =  (nRows-1) * (nCols-1) 
  chiSquaredConstant = sqrt( qchisq(confidence,degreesFreedom) )

  return (chiSquaredConstant)
}


getResultsColumNames =  function (numRows) {
   return ( 
        list( 
            c(), 
            c('pair', 'abs.diff', 'critical.range', 'abs.diff-critical.range', 'significant')
        ) 
   )
}

# test significance for ith combination
#
testSignificanceOfAbsDiffVsCriticalRange = function(
                dataFrame, combos, i,  chiSquaredConstant) {

   results = matrix(nrow=1, ncol=5, dimnames=getResultsColumNames() )

   pair1=combos[1,i]
   pair2=combos[2,i]

   # sum column denoted by name 'pair1' into groupTotal1 
   groupTotal1 = sum( dataFrame[ , pair1])  
   groupTotal2 = sum( dataFrame[ , pair2])  # do same thing for pair2... 

   p1 = dataFrame[1, pair1] / groupTotal1 
   p2 = dataFrame[1, pair2] / groupTotal2
   p1Not = (1 - p1)
   p2Not = (1 - p2)

    absDiff = abs( p2  - p1 )

    criticalRange = chiSquaredConstant  * 
                        sqrt(p1*p1Not/groupTotal1 + p2*p2Not/groupTotal2)
    results[1, 'pair'] = paste(pair1,"|",pair2) 
    results[1, 'abs.diff'] = round(absDiff,3)
    results[1, 'critical.range'] = round(criticalRange ,3)
    results[1, 'abs.diff-critical.range'] = round(absDiff - criticalRange ,3)


    if (absDiff > criticalRange) {
        results[1, 'significant'] = 'Y'
    } else {
        results[1, 'significant'] = 'N'
    } 

    return(results)
}