In [None]:
import os

from pipeline.backend.config import Backend, WorkMode
from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataIO
from pipeline.component import HeteroLR
from pipeline.component import Intersection
from pipeline.component import Reader
from pipeline.interface import Data
from pipeline.interface import Model
from pipeline.runtime.entity import JobParameters

In [None]:
DATA_BASE = "/data/projects/fate"

# Prepare and Upload Data

In [None]:
guest = 10000
backend = Backend.EGGROLL
work_mode = WorkMode.CLUSTER

partition = 4

guest_train_data = {"name": "breast_hetero_guest", "namespace": f"experiment"}

host_train_data = {"name": "breast_hetero_host", "namespace": f"experiment"}

pipeline_upload = PipeLine().set_initiator(role="guest", party_id=guest).set_roles(guest=guest)
# add upload data info
# original csv file path
pipeline_upload.add_upload_data(file=os.path.join(DATA_BASE, "examples/data/breast_hetero_guest.csv"),
 table_name=guest_train_data["name"], # table name
 namespace=guest_train_data["namespace"], # namespace
 head=1, partition=partition)

pipeline_upload.add_upload_data(file=os.path.join(DATA_BASE, "examples/data/breast_hetero_host.csv"),
 table_name=host_train_data["name"],
 namespace=host_train_data["namespace"],
 head=1, partition=partition)

# upload all data
pipeline_upload.upload(work_mode=work_mode, backend=backend, drop=1)

## Define Components of the Training

In [None]:
# parties config
guest = 10000
host = 10000
arbiter = 10000

# specify input data name & namespace in database
guest_train_data = {"name": "breast_hetero_guest", "namespace": "experiment"}
host_train_data = {"name": "breast_hetero_host", "namespace": "experiment"}

guest_eval_data = {"name": "breast_hetero_guest", "namespace": "experiment"}
host_eval_data = {"name": "breast_hetero_host", "namespace": "experiment"}

# initialize pipeline
pipeline = PipeLine()

# set job initiator
pipeline.set_initiator(role="guest", party_id=guest)
# set participants information
pipeline.set_roles(guest=guest, host=host, arbiter=arbiter)

# define Reader components to read in data
reader_0 = Reader(name="reader_0")
# configure Reader for guest
reader_0.get_party_instance(role="guest", party_id=guest).component_param(table=guest_train_data)
# configure Reader for host
reader_0.get_party_instance(role="host", party_id=host).component_param(table=host_train_data)

reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role="guest", party_id=guest).component_param(table=guest_eval_data)
reader_1.get_party_instance(role="host", party_id=host).component_param(table=host_eval_data)

# define DataIO components
dataio_0 = DataIO(name="dataio_0")
dataio_1 = DataIO(name="dataio_1")

# get DataIO party instance of guest
dataio_0_guest_party_instance = dataio_0.get_party_instance(role="guest", party_id=guest)
# configure DataIO for guest
dataio_0_guest_party_instance.component_param(with_label=True, output_format="dense")
# get and configure DataIO party instance of host
dataio_0.get_party_instance(role="host", party_id=host).component_param(with_label=False)

# define Intersection components
intersection_0 = Intersection(name="intersection_0")
intersection_1 = Intersection(name="intersection_1")

# define HeteroLR component
hetero_lr_0 = HeteroLR(name="hetero_lr_0", early_stop="weight_diff", learning_rate=0.15, optimizer="rmsprop", max_iter=10, early_stopping_rounds=2,validation_freqs=1)

## Compose Pipeline of Training and Execute

In [None]:
# add components to pipeline, in order of task execution
pipeline.add_component(reader_0)
pipeline.add_component(reader_1)
pipeline.add_component(dataio_0, data=Data(data=reader_0.output.data))
# set dataio_1 to replicate model from dataio_0
pipeline.add_component(dataio_1, data=Data(data=reader_1.output.data), model=Model(dataio_0.output.model))
# set data input sources of intersection components
pipeline.add_component(intersection_0, data=Data(data=dataio_0.output.data))
pipeline.add_component(intersection_1, data=Data(data=dataio_1.output.data))
# set train & validate data of hetero_lr_0 component
pipeline.add_component(hetero_lr_0, data=Data(train_data=intersection_0.output.data, validate_data=intersection_1.output.data))

# compile pipeline once finished adding modules, this step will form conf and dsl files for running job
pipeline.compile()

# fit model
job_parameters = JobParameters(backend=backend, work_mode=work_mode)
pipeline.fit(job_parameters)
# query component summary
import json
print(json.dumps(pipeline.get_component("hetero_lr_0").get_summary(), indent=4))

## Define Pipeline of Prediction and Execute

In [None]:
# predict
# deploy required components
pipeline.deploy_component([dataio_0, intersection_0, hetero_lr_0])

# initiate predict pipeline
predict_pipeline = PipeLine()

reader_2 = Reader(name="reader_2")
reader_2.get_party_instance(role="guest", party_id=guest).component_param(table=guest_eval_data)
reader_2.get_party_instance(role="host", party_id=host).component_param(table=host_eval_data)
# add data reader onto predict pipeline
predict_pipeline.add_component(reader_2)
# add selected components from train pipeline onto predict pipeline
# specify data source
predict_pipeline.add_component(pipeline,
 data=Data(predict_input={pipeline.dataio_0.input.data: reader_2.output.data}))
# run predict model
predict_pipeline.predict(job_parameters)