{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# The Flint Time Series Library\n", "[Flint](https://github.com/twosigma/flint) is a time series library for Apache Spark. The ability to analyze time series data at scale is critical for the success of finance and IoT applications based on Spark.\n", "Flint is [Two Sigma's](http://opensource.twosigma.com/) implementation of highly optimized time series operations in Spark.\n", "It performs truly parallel and rich analyses on time series data by taking advantage of the natural ordering in time series data to provide locality-based optimizations.\n", "\n", "Flint is an open source library for Spark based around the `TimeSeriesRDD`, a time series aware data structure, and a collection of time series utility and analysis functions that use `TimeSeriesRDD`s.\n", "Unlike `DataFrame` and `Dataset`, Flint's `TimeSeriesRDD`s can leverage the existing ordering properties of datasets at rest and the fact that almost all data manipulations and analysis over these datasets respect their temporal ordering properties.\n", "It differs from other time series efforts in Spark in its ability to efficiently compute across panel data or on large scale high frequency data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%classpath config resolver jitpack.io https://jitpack.io" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%classpath add mvn\n", "com.github.twosigma flint master-SNAPSHOT\n", "org.apache.spark spark-sql_2.11 2.2.1\n", "org.apache.spark spark-mllib_2.11 2.2.1\n", "org.scalanlp breeze_2.10 0.13.2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Creates spark session\n", "\n", "import com.twosigma.flint.timeseries.CSV\n", "import org.apache.spark.sql.SparkSession\n", "\n", "val spark = SparkSession.builder()\n", " .appName(\"Simple Application\")\n", " .master(\"local[4]\")\n", " .config(\"spark.ui.enabled\", \"false\")\n", " .getOrCreate()\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "//Creates a TimeSeriesRDD from a CSV file\n", "\n", "val tsRdd = CSV.from(\n", " spark.sqlContext,\n", " \"../resources/data/flint-demo.csv\",\n", " header = true,\n", " dateFormat = \"yyyyMMdd HH:mm:ss.SSS\",\n", " sorted = true\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Basic operations\n", "\n", "import org.apache.spark.sql.types.IntegerType\n", "import org.apache.spark.sql.types.DoubleType\n", "import org.apache.spark.sql.Row\n", "\n", "def changeTimeFunction(id: Int, time: Long) : Long = {\n", " return if (id == 3) time + 25 else time\n", "}\n", "\n", "val priceAsInteger = tsRdd.cast(\"price\" -> IntegerType)\n", "val filteredRowsByPrice = tsRdd.keepRows { row: Row => row.getAs[Double](\"price\") > 4.0 }\n", "val timeColumnOnly = tsRdd.keepColumns(\"time\")\n", "val withoutIdColumn = tsRdd.deleteColumns(\"id\")\n", "val renamedColumns = tsRdd.renameColumns(\"id\" -> \"ticker\", \"price\" -> \"highPrice\")\n", "val updatedTimeColumn = tsRdd.setTime {\n", " row: Row =>\n", " changeTimeFunction(row.getAs[Int](\"id\"), row.getAs[Long](\"time\"))\n", "}\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Crate columns\n", "val newHighPriceColumn = tsRdd.addColumns(\n", " \"highPrice\" -> DoubleType -> {\n", " r: Row => r.getAs[Double](\"price\") + 1.5\n", " }\n", ")\n", "\n", "val results = tsRdd.addColumnsForCycle(\n", " \"adjustedPrice\" -> DoubleType -> { rows: Seq[Row] =>\n", " rows.map { row => (row, row.getAs[Double](\"price\") * rows.size) }.toMap\n", " }\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "//Group functions\n", "import com.twosigma.flint.timeseries.Windows\n", "val groupedByCycle = tsRdd.groupByCycle()\n", "\n", "val intervals = tsRdd\n", ".keepRows { row: Row => row.getAs[Long](\"time\") % 100 == 0 }\n", ".keepRows { row: Row => row.getAs[Int](\"id\") == 3}\n", ".keepColumns(\"time\")\n", "\n", "val groupedByInterval = tsRdd.groupByInterval(intervals)\n", "val groupedByWindows = tsRdd.addWindows(Windows.pastAbsoluteTime(\"1000ns\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Temporal join functions\n", "val leftTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long](\"time\") % 100 == 0 }\n", ".keepColumns(\"time\", \"price\")\n", "val rightTSRdd = tsRdd.keepRows { row: Row => row.getAs[Long](\"time\") % 100 != 0 }\n", ".keepColumns(\"time\", \"id\")\n", "\n", "val leftJoin = leftTSRdd.leftJoin(rightTSRdd, tolerance = \"50ns\")\n", "val futureLeftJoin = leftTSRdd.futureLeftJoin(rightTSRdd, tolerance = \"50ns\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//Summarize functions\n", "import com.twosigma.flint.timeseries.Summarizers\n", "val summarizedCycles = tsRdd.summarizeCycles(Summarizers.sum(\"price\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "//stat.regression\n", "\n", "import breeze.linalg.DenseVector\n", "import org.apache.spark.mllib.random.RandomRDDs\n", "import com.twosigma.flint.math.stats.regression.WeightedLabeledPoint\n", "import com.twosigma.flint.math.stats.regression.OLSMultipleLinearRegression\n", "\n", "// Generate a random data set from a linear model with beta = [1.0, 2.0] and intercept = 3.0\n", "val data = WeightedLabeledPoint.generateSampleData(spark.sparkContext, DenseVector(1.0, 2.0), 3.0)\n", "\n", "// Fit the data using the OLS linear regression.\n", "val model = OLSMultipleLinearRegression.regression(data)\n", "\n", "// Retrieve the estimate beta and intercept.\n", "val denseVector = model.estimateRegressionParameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spark.close()" ] } ], "metadata": { "kernelspec": { "display_name": "Scala", "language": "scala", "name": "scala" }, "language_info": { "codemirror_mode": "text/x-scala", "file_extension": ".scala", "mimetype": "", "name": "Scala", "nbconverter_exporter": "", "version": "2.11.11" } }, "nbformat": 4, "nbformat_minor": 2 }