{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "from pipeline.backend.config import Backend, WorkMode\n", "from pipeline.backend.pipeline import PipeLine\n", "from pipeline.component import DataIO\n", "from pipeline.component import HeteroLR\n", "from pipeline.component import Intersection\n", "from pipeline.component import Reader\n", "from pipeline.interface import Data\n", "from pipeline.interface import Model\n", "from pipeline.runtime.entity import JobParameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "DATA_BASE = \"/data/projects/fate\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Prepare and Upload Data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "guest = 10000\n", "backend = Backend.EGGROLL\n", "work_mode = WorkMode.CLUSTER\n", "\n", "partition = 4\n", "\n", "guest_train_data = {\"name\": \"breast_hetero_guest\", \"namespace\": f\"experiment\"}\n", "\n", "host_train_data = {\"name\": \"breast_hetero_host\", \"namespace\": f\"experiment\"}\n", "\n", "pipeline_upload = PipeLine().set_initiator(role=\"guest\", party_id=guest).set_roles(guest=guest)\n", "# add upload data info\n", "# original csv file path\n", "pipeline_upload.add_upload_data(file=os.path.join(DATA_BASE, \"examples/data/breast_hetero_guest.csv\"),\n", " table_name=guest_train_data[\"name\"], # table name\n", " namespace=guest_train_data[\"namespace\"], # namespace\n", " head=1, partition=partition)\n", "\n", "pipeline_upload.add_upload_data(file=os.path.join(DATA_BASE, \"examples/data/breast_hetero_host.csv\"),\n", " table_name=host_train_data[\"name\"],\n", " namespace=host_train_data[\"namespace\"],\n", " head=1, partition=partition)\n", "\n", "# upload all data\n", "pipeline_upload.upload(work_mode=work_mode, backend=backend, drop=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define Components of the Training" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# parties config\n", "guest = 10000\n", "host = 10000\n", "arbiter = 10000\n", "\n", "# specify input data name & namespace in database\n", "guest_train_data = {\"name\": \"breast_hetero_guest\", \"namespace\": \"experiment\"}\n", "host_train_data = {\"name\": \"breast_hetero_host\", \"namespace\": \"experiment\"}\n", "\n", "guest_eval_data = {\"name\": \"breast_hetero_guest\", \"namespace\": \"experiment\"}\n", "host_eval_data = {\"name\": \"breast_hetero_host\", \"namespace\": \"experiment\"}\n", "\n", "# initialize pipeline\n", "pipeline = PipeLine()\n", "\n", "# set job initiator\n", "pipeline.set_initiator(role=\"guest\", party_id=guest)\n", "# set participants information\n", "pipeline.set_roles(guest=guest, host=host, arbiter=arbiter)\n", "\n", "# define Reader components to read in data\n", "reader_0 = Reader(name=\"reader_0\")\n", "# configure Reader for guest\n", "reader_0.get_party_instance(role=\"guest\", party_id=guest).component_param(table=guest_train_data)\n", "# configure Reader for host\n", "reader_0.get_party_instance(role=\"host\", party_id=host).component_param(table=host_train_data)\n", "\n", "reader_1 = Reader(name=\"reader_1\")\n", "reader_1.get_party_instance(role=\"guest\", party_id=guest).component_param(table=guest_eval_data)\n", "reader_1.get_party_instance(role=\"host\", party_id=host).component_param(table=host_eval_data)\n", "\n", "# define DataIO components\n", "dataio_0 = DataIO(name=\"dataio_0\")\n", "dataio_1 = DataIO(name=\"dataio_1\")\n", "\n", "# get DataIO party instance of guest\n", "dataio_0_guest_party_instance = dataio_0.get_party_instance(role=\"guest\", party_id=guest)\n", "# configure DataIO for guest\n", "dataio_0_guest_party_instance.component_param(with_label=True, output_format=\"dense\")\n", "# get and configure DataIO party instance of host\n", "dataio_0.get_party_instance(role=\"host\", party_id=host).component_param(with_label=False)\n", "\n", "# define Intersection components\n", "intersection_0 = Intersection(name=\"intersection_0\")\n", "intersection_1 = Intersection(name=\"intersection_1\")\n", "\n", "# define HeteroLR component\n", "hetero_lr_0 = HeteroLR(name=\"hetero_lr_0\", early_stop=\"weight_diff\", learning_rate=0.15, optimizer=\"rmsprop\", max_iter=10, early_stopping_rounds=2,validation_freqs=1)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compose Pipeline of Training and Execute" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# add components to pipeline, in order of task execution\n", "pipeline.add_component(reader_0)\n", "pipeline.add_component(reader_1)\n", "pipeline.add_component(dataio_0, data=Data(data=reader_0.output.data))\n", "# set dataio_1 to replicate model from dataio_0\n", "pipeline.add_component(dataio_1, data=Data(data=reader_1.output.data), model=Model(dataio_0.output.model))\n", "# set data input sources of intersection components\n", "pipeline.add_component(intersection_0, data=Data(data=dataio_0.output.data))\n", "pipeline.add_component(intersection_1, data=Data(data=dataio_1.output.data))\n", "# set train & validate data of hetero_lr_0 component\n", "pipeline.add_component(hetero_lr_0, data=Data(train_data=intersection_0.output.data, validate_data=intersection_1.output.data))\n", "\n", "# compile pipeline once finished adding modules, this step will form conf and dsl files for running job\n", "pipeline.compile()\n", "\n", "# fit model\n", "job_parameters = JobParameters(backend=backend, work_mode=work_mode)\n", "pipeline.fit(job_parameters)\n", "# query component summary\n", "import json\n", "print(json.dumps(pipeline.get_component(\"hetero_lr_0\").get_summary(), indent=4))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define Pipeline of Prediction and Execute" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# predict\n", "# deploy required components\n", "pipeline.deploy_component([dataio_0, intersection_0, hetero_lr_0])\n", "\n", "# initiate predict pipeline\n", "predict_pipeline = PipeLine()\n", "\n", "reader_2 = Reader(name=\"reader_2\")\n", "reader_2.get_party_instance(role=\"guest\", party_id=guest).component_param(table=guest_eval_data)\n", "reader_2.get_party_instance(role=\"host\", party_id=host).component_param(table=host_eval_data)\n", "# add data reader onto predict pipeline\n", "predict_pipeline.add_component(reader_2)\n", "# add selected components from train pipeline onto predict pipeline\n", "# specify data source\n", "predict_pipeline.add_component(pipeline,\n", " data=Data(predict_input={pipeline.dataio_0.input.data: reader_2.output.data}))\n", "# run predict model\n", "predict_pipeline.predict(job_parameters)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }