# Transformer

`Transformer` represents the logic unit executing on logical partitions of the input dataframe. The partitioning logic is not a concern of `Transformer`, it should be specified in a previous step. But you must understand the concept of partition in Fugue, please read [this](./partition.ipynb).

**It accepts these input DataFrame Types**: `LocalDataFrame`, `pd.DataFrame`, `List[List[Any]]`, `Iterable[List[Any]]`, `EmptyAwareIterable[List[Any]]`, `List[Dict[str, Any]]`, `Iterable[Dict[str, Any]]`, `EmptyAwareIterable[Dict[str, Any]]`

**It accepts these output DataFrame types**: `LocalDataFrame`, `pd.DataFrame`, `List[List[Any]]`, `Iterable[List[Any]]`, `EmptyAwareIterable[List[Any]]`, `List[Dict[str, Any]]`, `Iterable[Dict[str, Any]]`, `EmptyAwareIterable[Dict[str, Any]]`

Notice that `ArrayDataFrame` and other local dataframes can't be used as annotation, you must use `LocalDataFrame`.

`Transformer` requires users to be explicit on the output schema. `*` can represent the input dataframe schema, so `*,b:int` means the output will have an additional column. The schema can be specified by shema hint, decorator, or in the Fugue code.

## Why Explicit on Output Schema?

Normally computing frameworks can infer output schema, however, it is neither reliable nor efficient. To infer the schema, it has to go through at least one partition of data and figure out the possible schema. However, what if a transformer is producing inconsistent schemas on different data partitions? What if that partition takes a long time or fail? So to avoid potential correctness and performance issues, `Transformer` and `CoTransformer` output schemas are required in Fugue.

## Native Approach

The simplest way, with no dependency on Fugue. You just need to have acceptable annotations on input dataframe and output. In native approach, you must specify schema in the Fugue code.

In [None]:
from typing import Iterable, Dict, Any, List
import pandas as pd

def add(df:pd.DataFrame, n=1) -> pd.DataFrame:
 df["b"]+=n
 return df
 
def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
 yield next(df)
 return

In [None]:
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
 df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
 # with out schema hint you have to specify schema in Fugue code
 df = df.transform(add, schema="*").transform(add, schema="*", params=dict(n=2))
 # how to define partition for transformers to operate on
 # get smallest b of each partition
 df.partition(by=["a"], presort="b").transform(get_top, schema="*").show()
 # get largest b of each partition
 df.partition(by=["a"], presort="b DESC").transform(get_top, schema="*").show()

## With Schema Hint

When you need to reuse a transformer multiple times, it's tedious to specify the schema in Fugue code every time. You can instead, write a schema hint on top of the function, this doesn't require you to have Fugue dependency. The following code is doing the same thing as above but see how much shorter.

In [None]:
from typing import Iterable, Dict, Any, List
import pandas as pd

# schema: *
def add(df:pd.DataFrame, n=1) -> pd.DataFrame:
 df["b"]+=n
 return df
 
# schema: *
def get_top(df:Iterable[Dict[str,Any]]) -> Iterable[Dict[str,Any]]:
 yield next(df)
 return

In [None]:
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
 df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
 df = df.transform(add).transform(add, params=dict(n=2)) # see how parameters are set
 df.partition(by=["a"], presort="b").transform(get_top).show()
 df.partition(by=["a"], presort="b DESC").transform(get_top).show()

### Schema Hint Syntax

ONLY for `Transformer` the schema hint has special syntax that makes it very flexible. Please read [this](https://triad.readthedocs.io/en/latest/api/triad.collections.html#triad.collections.schema.Schema.transform) for detailed syntax, here we only show some examples.

In [None]:
from typing import Iterable, Dict, Any, List
import pandas as pd

# schema: *,c:int
def with_c(df:pd.DataFrame) -> pd.DataFrame:
 df["c"]=1
 return df

# schema: *-b
def drop_b(df:pd.DataFrame) -> pd.DataFrame:
 return df.drop("b", axis=1)

# schema: *~b,c
def drop_b_c_if_exists(df:pd.DataFrame) -> pd.DataFrame:
 return df.drop(["b","c"], axis=1, errors='ignore')

In [None]:
from fugue import FugueWorkflow

with FugueWorkflow() as dag:
 df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
 df = df.transform(with_c)
 df.show()
 df = df.transform(drop_b)
 df.show()
 df = df.transform(drop_b_c_if_exists)
 df.show()

## Decorator Approach

Decorator approach can do everything the schema hint can do, plus, it can take in a function to generate the schema.

In [None]:
import pandas as pd
from fugue import transformer

# df is the zipped DataFrames, **kwargs is the parameters passed in from fugue
@transformer(lambda df, **kwargs: df.schema+"c:int") # == @transformer("*,c:int") 
def with_c(df:pd.DataFrame) -> pd.DataFrame:
 df["c"]=1
 return df

with FugueWorkflow() as dag:
 df = dag.df([[0,1],[0,2],[1,3],[1,1]],"a:int,b:int")
 df = df.transform(with_c)
 df.show()

## Interface Approach

All the previous methods are just wrappers of the interface approach. They cover most of the use cases and simplify the usage. But for certain cases, you should implement interface, for example:

* Your output schema needs partition information, such as partition keys, schema, and current values of the keys
* You have an expensive but common initialization step for processing each logical partition, this should happen when initializaing physical partition

The biggest advantage of interface approach is that you can customize pyhisical partition level initialization, and you have all the up-to-date context variables to use.

In the interface approach, type annotations are not necessary, but again, it's good practice to have them.

The following case focuses on performance comparison, to see how to use context variables, read [CoTransfromer example](./cotransformer.ipynb#Interface-Approach)

In [None]:
from fugue import Transformer, FugueWorkflow, PandasDataFrame, DataFrame, LocalDataFrame
from triad.collections import Schema
from time import sleep
import pandas as pd
import numpy as np

def expensive_init(sec=5):
 sleep(sec)

def helper(ct=20) -> pd.DataFrame:
 np.random.seed(0)
 return pd.DataFrame(np.random.randint(0,10,size=(ct, 3)), columns=list('abc'))

class Median(Transformer):
 # this is invoked on driver side
 def get_output_schema(self, df):
 return df.schema + (self.params.get_or_throw("col", str),float)
 
 # on initialization of the physical partition
 def on_init(self, df: DataFrame) -> None:
 self.col = self.params.get_or_throw("col", str)
 expensive_init(self.params.get("sec",0))
 
 def transform(self, df):
 pdf = df.as_pandas()
 pdf[self.col]=pdf["b"].median()
 return PandasDataFrame(pdf)
 

with FugueWorkflow() as dag:
 dag.create(helper).partition(by=["a"]).transform(Median, params={"col":"m", "sec": 1}).show(rows=100) 

Notice that we set `self.col` in `on_init`, it's better to set it in `on_init` or `transform`. It's better not to set it in `get_output_schema` because that will need to be serialized and send to each workers if using a distributed engine, serialization can fail for some value types.

In order to show the benefit of `on_init` we also create an interfaceless version (which is a lot simpler), but you have to call `expensive_init` in that function for each logical partition. Also, in the run function, we set `num=2` to show the effect. So for `Median` transformer, the `expensive_init` will be called at most twice, but for `median` it will be called for more times.

Notice, the numbers may be off if you run this on binder.

In [None]:
from fugue_spark import SparkExecutionEngine
from timeit import timeit

# schema: *, m:double
def median(df:pd.DataFrame, sec=0) -> pd.DataFrame:
 expensive_init(sec)
 df["m"]=df["b"].median()
 return df

def run(engine, interfaceless, sec):
 with FugueWorkflow(engine) as dag:
 df = dag.create(helper)
 if interfaceless:
 df.partition(by=["a"], num=2).transform(median, params={"sec": sec}).show(rows=100)
 else:
 df.partition(by=["a"], num=2).transform(Median, params={"col":"m", "sec": sec}).show(rows=100)
 
engine = SparkExecutionEngine()
print(timeit(lambda: run(engine, True, 1), number=1))
print(timeit(lambda: run(engine, False, 1), number=1))