In [1]:
import java.sql.{Date,Timestamp}

In [2]:
spark.version

2.4.8

## to_date

In [3]:
import org.apache.spark.sql.functions.to_date

val df = Seq(
    ("notebook","2019-01-01"),
    ("notebook", "2019-01-10"),
    ("small_phone", "2019-01-15"),
    ("small_phone", "2019-01-30")
).toDF("device", "purchase_date").sort("device","purchase_date")

df.dtypes

df = [device: string, purchase_date: string]


Array((device,StringType), (purchase_date,StringType))

In [4]:
df.withColumn("purchase_date",to_date($"purchase_date")).dtypes

Array((device,StringType), (purchase_date,DateType))

## to_date, custom format

In [5]:
import org.apache.spark.sql.functions.to_date

val df = Seq(
    ("notebook","27/12/2019"),
    ("notebook", "01/12/2019"),
    ("small_phone", "23/01/2019"),
    ("small_phone", "27/12/2019")
).toDF("device", "purchase_date").sort("device","purchase_date")

df = [device: string, purchase_date: string]


[device: string, purchase_date: string]

In [6]:
%%dataframe
df

device,purchase_date
notebook,01/12/2019
notebook,27/12/2019
small_phone,23/01/2019
small_phone,27/12/2019


In [7]:
%%dataframe
df.withColumn("purchase_date",to_date($"purchase_date", "dd/MM/yyyy"))

device,purchase_date
notebook,2019-12-01
notebook,2019-12-27
small_phone,2019-01-23
small_phone,2019-12-27


## to_timestamp

In [8]:
val df = Seq(
    ("notebook","2019-01-01 00:00:00"),
    ("notebook", "2019-01-10 13:00:00"),
    ("small_phone", "2019-01-15 12:00:00"),
    ("small_phone", "2019-01-30 09:30:00")
).toDF("device", "purchase_time").sort("device","purchase_time")

df = [device: string, purchase_time: string]


[device: string, purchase_time: string]

In [9]:
df.dtypes

Array((device,StringType), (purchase_time,StringType))

In [10]:
import org.apache.spark.sql.functions.to_timestamp

In [11]:
df.withColumn("purchase_time",to_timestamp($"purchase_time")).dtypes

Array((device,StringType), (purchase_time,TimestampType))

## to_timestamp custom format

In [12]:
val df = Seq(
    ("notebook","27/12/2019 12:00"),
    ("notebook", "01/12/2019 00:00"),
    ("small_phone", "23/01/2019 12:00"),
    ("small_phone", "27/12/2019 12:00")
).toDF("device", "purchase_time").sort("device","purchase_time")

df.dtypes

df = [device: string, purchase_time: string]


Array((device,StringType), (purchase_time,StringType))

In [13]:
%%dataframe
df.withColumn("purchase_time",to_timestamp($"purchase_time"))

device,purchase_time
notebook,
notebook,
small_phone,
small_phone,


In [14]:
%%dataframe
df.withColumn("purchase_time",to_timestamp($"purchase_time","d/M/y H:m"))

device,purchase_time
notebook,2019-12-01 00:00:00.0
notebook,2019-12-27 12:00:00.0
small_phone,2019-01-23 12:00:00.0
small_phone,2019-12-27 12:00:00.0


## timestamp to date

In [15]:
val df = Seq(
    ("notebook",Timestamp.valueOf("2019-01-29 12:00:00")),
    ("notebook", Timestamp.valueOf("2019-01-01 00:00:00")),
    ("small_phone", Timestamp.valueOf("2019-01-15 23:00:00")),
    ("small_phone", Timestamp.valueOf("2019-01-01 09:00:00"))
).toDF("device", "purchase_time").sort("device","purchase_time")

df.dtypes

df = [device: string, purchase_time: timestamp]


Array((device,StringType), (purchase_time,TimestampType))

In [16]:
%%dataframe
df.withColumn("purchase_date",to_date($"purchase_time"))

device,purchase_time,purchase_date
notebook,2019-01-01 00:00:00.0,2019-01-01
notebook,2019-01-29 12:00:00.0,2019-01-29
small_phone,2019-01-01 09:00:00.0,2019-01-01
small_phone,2019-01-15 23:00:00.0,2019-01-15


## date to timestamp with zero hours

In [17]:
import java.sql.Date
import org.apache.spark.sql.functions.to_timestamp

val df = Seq(
    ("notebook",Date.valueOf("2019-01-29")),
    ("notebook", Date.valueOf("2019-01-01")),
    ("small_phone", Date.valueOf("2019-01-15")),
    ("small_phone", Date.valueOf("2019-01-01"))
).toDF("device", "purchase_date").sort("device","purchase_date")

df = [device: string, purchase_date: date]


[device: string, purchase_date: date]

In [18]:
%%dataframe
df

device,purchase_date
notebook,2019-01-01
notebook,2019-01-29
small_phone,2019-01-01
small_phone,2019-01-15


In [19]:
%%dataframe
df.withColumn("purchase_time",to_timestamp($"purchase_date"))

device,purchase_date,purchase_time
notebook,2019-01-01,2019-01-01 00:00:00.0
notebook,2019-01-29,2019-01-29 00:00:00.0
small_phone,2019-01-01,2019-01-01 00:00:00.0
small_phone,2019-01-15,2019-01-15 00:00:00.0


## custom date/timestamp formatting

In [20]:
import java.sql.Timestamp
import org.apache.spark.sql.functions.date_format

val df = Seq(
    ("notebook",Timestamp.valueOf("2019-01-29 12:00:00")),
    ("notebook", Timestamp.valueOf("2019-01-01 00:00:00")),
    ("small_phone", Timestamp.valueOf("2019-01-15 23:00:00")),
    ("small_phone", Timestamp.valueOf("2019-01-01 09:00:00"))
).toDF("device", "purchase_time").sort("device","purchase_time")

df = [device: string, purchase_time: timestamp]


[device: string, purchase_time: timestamp]

In [21]:
%%dataframe
df.withColumn("formatted_purchase_time",date_format($"purchase_time","y-MM"))

device,purchase_time,formatted_purchase_time
notebook,2019-01-01 00:00:00.0,2019-01
notebook,2019-01-29 12:00:00.0,2019-01
small_phone,2019-01-01 09:00:00.0,2019-01
small_phone,2019-01-15 23:00:00.0,2019-01


## current timestamp

In [35]:
import org.apache.spark.sql.functions.current_timestamp

val df = Seq(
    ("foo"), 
    ("bar"), 
    ("baz") 
).toDF("col1")

df = [col1: string]


[col1: string]

In [36]:
%%dataframe
%%scan

df
  .withColumn("now", current_timestamp)

col1,now
foo,2022-06-12 01:51:20.683
bar,2022-06-12 01:51:20.683
baz,2022-06-12 01:51:20.683


## current date

In [37]:
import org.apache.spark.sql.functions.current_date

val df = Seq(
    ("foo"), 
    ("bar"), 
    ("baz") 
).toDF("col1")

df = [col1: string]


[col1: string]

In [38]:
%%dataframe
%%scan

df
  .withColumn("today", current_date)

col1,today
foo,2022-06-12
bar,2022-06-12
baz,2022-06-12


## hour

In [39]:
import org.apache.spark.sql.functions.hour

val df = Seq(
    ("foo", "2019-01-01 01:00:00.000"), 
    ("bar", "2019-01-01 12:30:00.000"), 
    ("baz", "2019-01-01 23:01:00.000") 
).toDF("col1", "some_timestamp")

df = [col1: string, some_timestamp: string]


[col1: string, some_timestamp: string]

In [40]:
%%dataframe
df

col1,some_timestamp
foo,2019-01-01 01:00:00.000
bar,2019-01-01 12:30:00.000
baz,2019-01-01 23:01:00.000


In [41]:
%%dataframe
%%scan
df
  .withColumn("hour", hour($"some_timestamp"))

col1,some_timestamp,hour
foo,2019-01-01 01:00:00.000,1
bar,2019-01-01 12:30:00.000,12
baz,2019-01-01 23:01:00.000,23


## beginning of week

In [42]:
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions._

// dummy dataframe for testing
val df = Seq(
    ("2018-12-28"), 
    ("2019-01-01"), 
    ("2019-01-04") 
).toDF("source_date")

df = [source_date: string]


[source_date: string]

In [43]:
%%dataframe
df

source_date
2018-12-28
2019-01-01
2019-01-04


In [44]:
// turn a day name (like "Wed") to its position on the week (e.g. 3)
def dayNameToIndex(col: Column) : Column = {
    when(col.isNull, null)
       .when(col === "Sun", 0)
       .when(col === "Mon", 1)
       .when(col === "Tue", 2)
       .when(col === "Wed", 3)
       .when(col === "Thu", 4)
       .when(col === "Fri", 5)
       .when(col === "Sat", 7)
}

dayNameToIndex: (col: org.apache.spark.sql.Column)org.apache.spark.sql.Column


In [45]:
%%dataframe
// need to use expr because the number of days to subtract is a column value
df
  .withColumn("day_index", dayNameToIndex(date_format(col("source_date"), "E")))
  .withColumn("week_start", expr("date_sub(source_date, day_index)"))
  .drop("day_index")

source_date,week_start
2018-12-28,2018-12-23
2019-01-01,2018-12-30
2019-01-04,2018-12-30
