Creare Una Piccola Webapp Per Clustering Delle Vocali

Table of Contents

Essenziali per una webapp minimalista

Questo primo articolo tratterà principalmente come gestire la creazione di modelli, i file tabulati caricati e impostare una semplice dashboard.

In questi articoli utilizzerò:

  • il package Dash di python per poter creare la dashboard interattiva,
  • plotly per creare i grafici,
  • Docker per creare un container che sarà poi hostato serverless da un cloud provider,
  • gcloud command line tool per poter deployare il mio container su cloud run.

Cosa farà questa webapp

Questa webapp permetterà di caricare file tabulari, come csv, xls, ecc… contenenti le formanti delle vocali (F1, F2, F3 ed F4), ed effettuare clustering unsupervised delle vocali scegliendo tra vari modelli disponibili e impostando i parametri come si desidera. Il risultato finale sarà questo:

vowelclusteringapp.png

L'intero codice usato per la webapp si può trovare qui: https://github.com/andcarnivorous/vowel-cluster-dashapp

Mentre un'istanza della webapp è disponibile qui: https://vowel-cluster-j7bugp5hfa-ew.a.run.app

Classe per gestire i dati e applicare clustering

Per prima cosa creiamo una classe chiamata Modeller, questa classe creerà un modello usando il tipo di modello scelto e i dati forniti.

I modelli scelti per questa webapp saranno KMeans, Agglomerative Clustering e DBSCAN.

Creiamo la nostra classe, creando mappe dei modelli e dei loro parametri, così da poter gestire la selezione del modello da parte dell'utente e poter anche assicurarci che i parametri ricevuti siano sempre corretti e pertinenti al giusto modello.

import pandas as pd
import numpy as np
import plotly.express as px

from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.cluster import AgglomerativeClustering, DBSCAN
from time import time


class Modeller:

    models = {"KMeans": KMeans,
              "AgglomerativeClustering": AgglomerativeClustering,
              "DBSCAN": DBSCAN}
    params_map = {"KMeans": {"n_clusters": None, "n_init": None, "max_iter": None},
                  "AgglomerativeClustering": {"n_clusters": None},
                  "DBSCAN": {"eps": None, "min_samples": None}}

    def __init__(self, dataframe, model_type, **params):
        self.dataframe = dataframe
        self.model_type = model_type
        self.params = {k: v for k, v in params.items() if k in self.params_map[model_type]}


I **params ricevuti all'instanzazione del modello saranno praticamente i parametri, e con quelli presenti nell'attributo della classe ci assicureremo di non includere altri parametri non pertinenti che potrebbero causare errori.

Adesso creiamo un metodo per scegliere dal dataframe ricevuto solo le due colonne selezionate (potremmo fare clustering in più di due dimensioni, ma poi diventa difficile da plottare e più costoso da computare). Creeremo anche un metodo che applicherà lo z-score alle formanti, così da normalizzare i nostri dati.

def subset_dataframe(self, columns):
    if all(col in self.dataframe.columns for col in columns):
        try:
            X = self.dataframe.loc[:, columns]
        except KeyError:
            raise KeyError(f"Columns {columns} not present")
        return X

    if all(col.lower() in self.dataframe.columns for col in columns):
        try:
            columns = [col.lower() for col in columns]
            X = self.dataframe.loc[:, columns]
        except KeyError:
            raise KeyError(f"Columns {columns} not present")
        return X
    else:
        raise KeyError(f"Columns {columns} are not present in dataframe")

@staticmethod
def standardize(X):
    X_standardized = StandardScaler().fit_transform(X)
    return X_standardized

Finalmente, creiamo una metodo che prenderà le colonne richieste, applicherà il metodo di normalizzazione, creerà l'instanza del modello sklearn e fara fitting dei dati, restituendo le labels dei gruppi trovati.

def set_up_model(self, columns):
    if any(param is None for k, param in self.params.items()):
        return None
    X = self.subset_dataframe(columns)
    X_standardized = self.standardize(X)
    model = self.models[self.model_type]
    t0 = time()
    model = model(**self.params).fit(X_standardized)
    timing = time() - t0
    model.execution_time = timing
    self.dataframe[self.model_type] = model.labels_
    self.dataframe[self.model_type] = self.dataframe[self.model_type].astype("category")
    fig = px.scatter(self.dataframe, x=columns[0], y=columns[1], color=self.model_type)
    return fig

Per comodità, farò sì che il metodo set_up_model abbia come return il grafico già generato, ma sarebbe meglio usare una funzione in utils per questa azione o creare una classe che si prenda cura della generazione dei grafici.

La Dashboard

Devo ammettere che il codice che si usa per creare dashboard con dash lo trovo abbastanza brutto, tra indentazione e parentesi sembra di star programmando qualcosa che può confondere ancor più di Lisp, ma permette in generale di creare velocemente dashboard semplici e, grazie al supporto di plotly, è facile creare grafici e mostrarli direttamente e renderli anche interattivi.

Creiamo un'instanza della dashboard ed usiamo app.server che servirà per quando hosteremo l'app.

import base64
import io
import pandas as pd
import dash
import dash_bootstrap_components as dbc
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State

from modeller import Modeller
from configs import models, params, all_params, limits, columns

app = dash.Dash(__name__, external_stylesheets=[dbc.themes.LUX])
server = app.server

Per il layout, potete far riferimento alla repo, non ha molto senso andarci passo per passo attraverso.

Ciò che ci interessa sono i callback, che vengono utilizzati da dash per rendere i grafici interattivi.

Prima di tutto abbiamo bisogno di un callback per fare parsing dei file che saranno caricati:

@app.callback(Output('output-data-upload', 'children'),
              [Input('upload-data', 'contents'), Input("model-type", "value")] + inputs +
              [State('upload-data', 'filename'),
               State('upload-data', 'last_modified')])
def update_output(list_of_contents, model_type, n_init, max_iter, n_clusters, eps, min_samples, column1, column2, list_of_names, list_of_dates):
    if list_of_contents is not None:
        if column1 == column2:
            return html.P("Select different columns!")

        contents = list_of_contents[0]
        content_type, content_string = contents.split(',')
        decoded = base64.b64decode(content_string)
        try:
            if 'csv' in content_type:
                # Assume that the user uploaded a CSV file
                df = pd.read_csv(
                    io.StringIO(decoded.decode('utf-8')))
            elif 'xls' in content_type or list_of_names and list_of_names[0].endswith("xls"):
                # Assume that the user uploaded an excel file
                df = pd.read_excel(io.BytesIO(decoded))
            elif 'xlsx' in content_type or list_of_names and list_of_names[0].endswith("xlsx"):
                # Assume that the user uploaded an excel file
                df = pd.read_excel(io.BytesIO(decoded), engine="openpyxl")
            elif "text" in content_type:
                df = pd.read_fwf(
                    io.StringIO(decoded.decode('utf-8')))
        except Exception as e:
            print(e)
            return html.Div([
                'There was an error processing this file.'
            ])
        if len(df.columns) > 8:
            return html.P("Dataframe has too many columns!")
        model = Modeller(df, model_type, **dict(n_init=n_init, max_iter=max_iter,
                                                n_clusters=n_clusters, eps=eps,
                                                min_samples=min_samples))
        try:
            labels = model.set_up_model([column1, column2])
        except Exception as e:
            return html.P(str(e))
        if not labels:
            return html.P("Please, fill in all the parameters in green.")
        return dcc.Graph(id="chart", figure=labels,
                         style={'display': 'block',
                                'height': 600,
                                'width': 900,
                                'margin-left': 'auto',
                                'margin-right': 'auto'})
    return html.P("")

Abbiamo poi bisogno di un callback per gestire i parametri, vogliamo rendere accessibili solo quelli relativi al modello scelto, e un semplice callback per uploadare i file:


@app.callback(Output('output-params', 'children'),
              Input('model-type', 'value'))
def update_params(model_type):
    if model_type:
        parameters = all_params
        htmlparams = html.Div(
            [
                dcc.Input(
                    id=f"input_{k}",
                    type="number",
                    placeholder=f"{k}:{v}", max=limits[k],
                    readOnly=True if k not in params[model_type] else False,
                    style={
                        "color": "red" if k not in params[model_type] else "green"}
                )
                for k, v in parameters.items()
            ]
        )
    return htmlparams


@app.callback(Output('upload-data', 'children'),
              [Input('upload-data', 'contents')] +
              [State('upload-data', 'filename'), State('upload-data', 'last_modified')])
def upload_box(list_of_contents, list_of_names, list_of_dates):
    if list_of_contents is not None:
        contents = list_of_contents[0]
        content_type, content_string = contents.split(',')
        return html.P(f"{list_of_names[0]}")
    else:
        return html.Div(['Drag and Drop or ', html.A('Select Files')])


Con queste tre callback e la classe Modeller abbiamo praticamente la nostra webapp pronta.

Per provarla, basta aggiungere


if __name__ == '__main__':
    app.run_server(debug=False, host="0.0.0.0", port="8080")


alla fine del file ed eseguire il file usando il comando python3 main.py.

Date: 2021-03-08 Mon 00:00

Author: Andrew

Other posts