Time for an Article on The Spark SQL Time Function ‘from_utc_timestamp’

TLDR

If your Spark application needs to perform date functions such as ‘weekday’ on timestamped records, then the timestamps of the records you ingest will be ideally formatted per the ISO 8601 standard (with time zone specified). Assuming you have some influence over the format of your data source, then adhering to this guideline will ensure that the string representation of the timestamps you parse will be mapped into a consistent internal representation, no matter which default time zone your JVM is set to when you do the parsing. Next, you need to decide whether you want the human-readable, String-formatted result of your date function to be dependent on your JVM’s default timezone, or have this setting parameterized (the latter being better for testability).

Avoiding Dependencies On JVM Time Zone Setting When Parsing

When working with time I like to express my inputs, and store my intermediate results in UTC time. This means inputs ideally arrive in the form of ISO 8601 formatted strings, and intermediate results are stored either as (i) a Long numeric value which is interpreted as a number-of-seconds offset from the instant at which the clock struck midnight in Greenwhich, England on the first day of the Unix Epoch: January, 1970 — or alternatively (ii), as a Java 8 Instant which also wraps a Long which represents the seconds offset from the epoch.

When initially faced with ingesting timestamped data I opted to use to_utc_timestamp and got results that were the exact opposite of what I expected. I won’t go into a tangent on why this happened, I will just save you some pain by recommending that you go with this function’s inverse, from_utc_timestamp, when parsing timestamp strings. The Scaladoc for this function is:

  /**
   * Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders
   * that time as a timestamp in the given time zone. For example, 'GMT+1' would yield
   * '2017-07-14 03:40:00.0'.
   *
   * @param ts A date, timestamp or string. If a string, the data must be in a format that can be
   *           cast to a timestamp, such as `yyyy-MM-dd` or `yyyy-MM-dd HH:mm:ss.SSSS`
   * @param tz A string detailing the time zone that the input should be adjusted to, such as
   *           `Europe/London`, `PST` or `GMT+5`
   * @return A timestamp, or null if `ts` was a string that could not be cast to a timestamp or
   *         `tz` was an invalid value
   * @group datetime_funcs
   * @since 1.5.0
   */

So, on reading this, I thought that whatever date/time valued string I provided would be interpreted as being in UTC (it says exactly that!), and that I would get the same results irrespective of where in the world I was running my service (i.e., regardless of the default timezone setting of the JVM running the app.) So to test things out I launched the Spark shell from where I live (California, GMT-8, at the time of this writing), and I tried a date/time string set to the start of the Unix epoch: ‘1970-01-01 00:00’. Given this input I expected the long value of the internal representation of the stored timestamp would be zero, but the value I got was 28800, as shown below.

import org.apache.spark.sql.types._

val df = List(("1970-01-01 00:00")).toDF("timestr").
              withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
              withColumn("utc_utc_timestamp_as_int",   'utcAssumeStringUTC.cast(IntegerType))

df.show()

// Result:
//  +----------------+-------------------+------------------------+
//  |         timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int|
//  +----------------+-------------------+------------------------+
//  |1970-01-01 00:00|1970-01-01 00:00:00|                   28800|
//  +----------------+-------------------+------------------------+

After fiddling with the inputs a bit and browsing the source I realized that what I need to do was to use ISO 8601 strings for my timestamps, which seemed to work well, even though the Spark Scaladoc doesn’t explicitly mention this. After settling on this format, I ran my scriptlet twice, once with the default timezone for my machine, with this command

> spark-shell  

and once for GMT (Greenwhich Mean Time, a.k.a. UTC) with this command.

> spark-shell  --driver-java-options "-Duser.timezone=UTC"

Both times I got a timestamp with an Long value of zero, as expected:

import org.apache.spark.sql.types._
  val df = List(("1970-01-01T00:00:00+00")).toDF("timestr").
        withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
        withColumn("utc_utc_timestamp_as_int",   'utcAssumeStringUTC.cast(IntegerType))
df.show()

// Result:
// +--------------------+-------------------+------------------------+
// |             timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int|
// +--------------------+-------------------+------------------------+
// |1970-01-01T00:00:...|1970-01-01 00:00:00|                       0|
// +--------------------+-------------------+------------------------+

(A side note on the column utcAssumeStringUTC: when output via show() it will render as a string differently depending on your JVM’s default time zone setting. But if you cast it to an integer it will correctly show up as zero.)

Next I applied the weekday column transformation on the time stamp column utcAssumeStringUTC (below), and I found that the output I got again differed depending on the default timezone setting of the JVM running my code. This scriptlet:

import org.apache.spark.sql.types._
  val df = List(("1970-01-01T00:00:00+00")).toDF("timestr").
        withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
        withColumn("utc_utc_timestamp_as_int",   'utcAssumeStringUTC.cast(IntegerType)).
        withColumn("weekday", date_format($"utcAssumeStringUTC", "EEEE"))
df.show()

gave the result

+--------------------+-------------------+------------------------+---------+
|             timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int|  weekday|
+--------------------+-------------------+------------------------+---------+
|1970-01-01T00:00:...|1969-12-31 16:00:00|                       0|Wednesday|
+--------------------+-------------------+------------------------+---------+

when run from a Spark shell launched with no special JVM options (resulting in a timezone setting of GMT-8 on my machine), and it gave the result

+--------------------+-------------------+------------------------+--------+
|             timestr| utcAssumeStringUTC|utc_utc_timestamp_as_int| weekday|
+--------------------+-------------------+------------------------+--------+
|1970-01-01T00:00:...|1970-01-01 00:00:00|                       0|Thursday|
+--------------------+-------------------+------------------------+--------+

when run from a Spark shell launched like this:

spark-shell  --driver-java-options "-Duser.timezone=UTC"

From an end-user perspective this is typically the behavior I would want. Let’s say an event I was processing happened at the start of the Unix epoch. This is a timezone independent concept. At the time of the event I might have been in Greenwhich England, in which case my calendar and wall clock would have shown Thursday, Jan 1, 1970 12:00 midnight, respectively. But I might have been at home in California, in which case I would have seen Wednesday, Dec 12, 1969 4pm. When I display the day of the week that the event happened for a human user, that user will most likely expect a result in the context of the timezone they are currently in.

Now, although our user’s may expect this, we probably don’t want our code to be producing different results for the same inputs depending on the default timezone of the JVM running our app. For this reason, I would recommend parameterizing the timezone context that your application will use to display these types of results. A better way to display the day of the week that an event happened is to do one more round of conversion using from_utc_timestamp and the parameterized value of your desired timezone context, like this:

val timezone = "America/Los_Angeles"

import org.apache.spark.sql.types._
  val df = List(("1970-01-01T00:00:00+00")).toDF("timestr").
        withColumn("utcAssumeStringUTC", from_utc_timestamp('timestr,  "GMT")).
        withColumn("x", from_utc_timestamp('utcAssumeStringUTC,  timezone )).
        withColumn("weekday", date_format($"x", "EEEE") )

df.show()

No matter what time zone is in effect for your JVM, the output will be the same (as show below). Of course in a real app you would not hard code this value, but instead read it from some type of config file, with a fall back to the default timezone setting of your JVM.

+--------------------+-------------------+-------------------+---------+
|             timestr| utcAssumeStringUTC|                  x|  weekday|
+--------------------+-------------------+-------------------+---------+
|1970-01-01T00:00:...|1970-01-01 00:00:00|1969-12-31 16:00:00|Wednesday|
+--------------------+-------------------+-------------------+---------+