{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# RDD cơ bản\n", "- Programmer chỉ định số lượng partitions.\n", "- Driver tự phân chia partition đến các Workers tương ứng.\n", "- Master parameter chỉ định số lượng workers cụ thể.\n", "\n", "# Các hàm transformations\n", "- map(func): trả về tập dữ liệu phân tán mới bằng cách ánh xạ từng phần tử tập dữ liệu nguồn qua hàm func do programmer định nghĩa.\n", "- filter(func): trả về tập dữ liệu phân tán mới bằng cách lọc ra các phần tử tập dữ liệu nguồn thoả điều kiện hàm func định nghĩa.\n", "- distinct(): trả về tập dữ liệu phân tán mới chỉ chứa các phần tử riêng biệt từ tập dữ liệu nguồn.\n", "- flatMap(func): tương tự như map(), nhưng có thể ánh xạ các phần tử nguồn sang 0 hoặc nhiều phần tử ở tập dữ liệu mới. Hàm func thường trả về kiểu Seg thay vì phần tử đơn lẻ." ] }, { "cell_type": "code", "execution_count": 1, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "http://localhost:4040/jobs/\n" ] } ], "source": [ "print \"http://localhost:4040/jobs/\"" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 4, 6, 8]" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([1, 2, 3, 4])\n", "rdd.map(lambda x: x * 2).collect()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 4]" ] }, "execution_count": 3, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.filter(lambda x: x % 2 == 0).collect()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[2, 4, 1, 3]" ] }, "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([1, 4, 2, 2, 3])\n", "rdd.distinct().collect()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[[1, 6], [2, 7], [3, 8]]" ] }, "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([1, 2, 3])\n", "rdd.map(lambda x: [x, x + 5]).collect()" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 6, 2, 7, 3, 8]" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.flatMap(lambda x: [x, x + 5]).collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Các hàm actions\n", "- reduce(func): aggregate từng phần tử tập dữ liệu thông qua hàm func, hàm func nhận 2 đối số và trả về 1 giá trị.\n", "- take(n): trả về mảng n phần tử.\n", "- collect(): trả về tất cả các phần tử. CHÚ Ý: phải đảm bảo máy Driver đủ dung lượng để chứa kết quả trả về.\n", "- takeOrdered(n, key=func): trả về n phần tử sắp xếp tăng dần hoặc sắp xếp theo hàm key." ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "6" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([1, 2, 3])\n", "rdd.reduce(lambda a, b: a * b)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 2]" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.take(2)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 2, 3]" ] }, "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.collect()" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[5, 3, 2]" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([5, 3, 1, 2])\n", "rdd.takeOrdered(3, lambda s: -1 * s)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[1, 2, 3]" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.takeOrdered(3)" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "5\n" ] } ], "source": [ "lines = sc.textFile(\"sample_text.txt\", 4)\n", "print lines.count()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "5\n" ] } ], "source": [ "print lines.count()" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "5\n", "5\n" ] } ], "source": [ "lines = sc.textFile(\"sample_text.txt\", 4)\n", "lines.cache()\n", "print lines.count()\n", "print lines.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Key-Value RDDs\n", "- Tương tự như Map Reduce, Spark hỗ trợ Key-Value pairs.\n", "- Mỗi phần tử của Pair RDD là một cặp tuple.\n", "## Some Key-Value transformation\n", "- reduceByKey(func): trả về tập dữ liệu phân tán mới (K, V). Trong đó, các giá trị cho từng key được tổng hợp bằng hàm reduce func có dạng (V, V) -> V.\n", "- sortByKey(): trả về tập dữ liệu phân tán mới (K, V) sắp xếp tăng dần theo keys.\n", "- groupByKey(): trả về tập dữ liệu phân tán mới (K, Iterable)." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(1, 2), (3, 4)]" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([(1, 2), (3, 4)])\n", "rdd.collect()" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(1, 2), (3, 10)]" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([(1, 2), (3, 4), (3, 6)])\n", "rdd.reduceByKey(lambda a, b: a + b).collect()" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[(1, 'a'), (1, 'b'), (2, 'c')]" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd = sc.parallelize([(1, \"a\"), (2, \"c\"), (1, \"b\")])\n", "rdd.sortByKey().collect()" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "[(1, ),\n", " (2, )]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "rdd.groupByKey().collect()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# X.join(Y)\n", "- Trả về tất cả các phần tử RDD keys khớp với X và Y.\n", "- Mỗi cặp có định dạng (k, (v1, v2)). Trong đó, (k, v1) thuộc X và (k, v2) thuộc Y." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('a', (1, 2)), ('a', (1, 3))]" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", "y = sc.parallelize([(\"a\", 2), (\"a\", 3)])\n", "sorted(x.join(y).collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# X.leftOuterJoin(Y)\n", "- Với mỗi phần tử (k, v) thuộc X, kết quả trả về có thể là:\n", " - Tất cả các cặp (k, (v, w)) với w thuộc Y.\n", " - Hoặc các cặp (k, (v, None)) nếu không có phần tử nào thuộc Y có key là k." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('a', (1, 2)), ('b', (4, None))]" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", "y = sc.parallelize([(\"a\", 2)])\n", "sorted(x.leftOuterJoin(y).collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# X.rightOuterJoin(Y)\n", "- Với mỗi phần tử (k, w) thuộc Y, kết quả trả về có thể là:\n", " - Tất cả các cặp (k, (v, w)) với v thuộc X.\n", " - Hoặc các cặp (k, (None, w)) nếu không có phần tử nào thuộc X có key là k." ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('a', (1, 2)), ('b', (None, 4))]" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "x = sc.parallelize([(\"a\", 1)])\n", "y = sc.parallelize([(\"a\", 2), (\"b\", 4)])\n", "sorted(x.rightOuterJoin(y).collect())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# X.fullOuterJoin(Y)\n", "- Với mỗi phần tử (k, v) thuộc X, kết quả trả về có thể là:\n", " - Tất cả các cặp (k, (v, w)) với w thuộc Y.\n", " - Hoặc các cặp (k, (v, None)) nếu không có phần tử nào thuộc Y có key là k.\n", "- Với mỗi phần tử (k, w) thuộc Y, kết quả trả về có thể là:\n", " - Tất cả các cặp (k, (v, w)) với v thuộc X.\n", " - Hoặc các cặp (k, (None, w)) nếu không có phần tử nào thuộc X có key là k. " ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "x = sc.parallelize([(\"a\", 1), (\"b\", 4)])\n", "y = sc.parallelize([(\"a\", 2), (\"c\", 8)])\n", "sorted(x.fullOuterJoin(y).collect())" ] } ], "metadata": { "kernelspec": { "display_name": "Python 2", "language": "python", "name": "python2" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 2 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython2", "version": "2.7.15" }, "name": "04_spark_essentials", "notebookId": 1227613790179004 }, "nbformat": 4, "nbformat_minor": 1 }