Su código de aprendizaje automático es malo?

Su flujo de trabajo actual probablemente encadena varias funciones como en el siguiente ejemplo. Aunque es rápido, es probable que tenga muchos problemas:

  • No se escala bien a medida que agrega complejidad.
  • Tiene que hacer un seguimiento manual de qué funciones se ejecutaron con qué parámetro.
  • Usted tiene que mantener manualmente un seguimiento de dónde se guardan los datos.
  • Es difícil de leer.
import pandas as pd
import sklearn.svm, sklearn.metrics

def get_data ():
    datos = download_data ()
    data = clean_data (data)
    data.to_pickle ('data.pkl')

Defrproceso (datos):
    datos = apply_function (datos)
    datos de retorno

# parámetros de flujo
reload_source = True
do_preprocess = True

# Ejecutar flujo de trabajo
if reload_source:
    obtener datos()

df_train = pd.read_pickle ('data.pkl')
if do_preprocess:
    df_train = preprocess (df_train)
model = sklearn.svm.SVC ()
model.fit (df_train.iloc [:,:-1]df_train ['y'])
print (sklearn.metrics.accuracy_score (df_train ['y']model.predict (df_train.iloc [:,:-1])))

¿Qué hacer al respecto?

En lugar de las funciones de cadena lineal, El código de ciencia de datos se escribe mejor como un conjunto de tareas con dependencias entre ellos. Ese es su flujo de trabajo de ciencia de datos debe ser un DAG.

Entonces, en lugar de escribir una función que hace:

 def process_data (datos, parámetro):

    si parametro:
        datos = do_stuff (datos)
    más:
        datos = do_other_stuff (datos)

    data.to_pickle ('data.pkl')
    devolver datos

Es mejor escribir tareas que se pueden encadenar como DAG:

 class TaskProcess (d6tflow.tasks.TaskPqPandas): # definir formato de salida

    def requiere (uno mismo):
        devuelve TaskGetData () # define dependencia

    def run (self):
        data = self.input (). load () # cargar datos de entrada
        data = do_stuff (data) # proceso de datos
        self.save (data) # save output data

Los beneficios de hacer esto son:

  • Todas las tareas siguen el mismo patrón, sin importar cuán complejo sea su flujo de trabajo
  • Tiene una entrada escalable requiere () y función de procesamiento ejecutada ()
  • Puede cargar y guardar datos rápidamente sin tener que codificar los nombres de archivo
  • Si la tarea de entrada no está completa se ejecutará automáticamente
  • Si los datos de entrada o los parámetros cambian, la función se ejecutará automáticamente

Un ejemplo de DAG de aprendizaje automático

A continuación se muestra un ejemplo estilizado de un flujo de aprendizaje automático que se expresa como un DAG. Al final, solo necesitas ejecutar TaskTrain () y automáticamente sabrá qué dependencias ejecutar. Para ver un ejemplo completo, vea https://github.com/d6t/d6tflow/blob/master/docs/example-ml.md

 import las pandas by pd
import sklearn, sklearn.svm
import d6tflow
import luigi

# definir flujo de trabajo
class TaskGetData (d6tflow.tasks.TaskPqPandas): # save dataframe as parquet

    def run (self):
        datos = download_data ()
        data = clean_data (data)
        self.save (data) # guarda rápidamente el marco de datos

class TaskPreprocess (d6tflow.tasks.TaskCachePandas): # guardar datos en la memoria
    do_preprocess = luigi.BoolParameter (default = True) # parámetro para el preprocesamiento sí / no

    def requiere (uno mismo):
        return TaskGetData () # define dependencia

    def run (self):
        df_train = self.input (). load () # carga rápidamente los datos requeridos
        si self.do_preprocess:
            df_train = preprocess (df_train)
        self.save (df_train)

class TaskTrain (d6tflow.tasks.TaskPickle): # guardar la salida como pickle
    do_preprocess = luigi.BoolParameter (por defecto = True)

    def requiere (uno mismo):
        return TaskPreprocess (do_preprocess = self.do_preprocess)

    def run (self):
        df_train = self.input (). load ()
        model = sklearn.svm.SVC ()
        model.fit (df_train.iloc [:,:-1]df_train ['y'])
        self.save (modelo)

# Comprobar dependencias de tareas y su estado de ejecución
d6tflow.preview (TaskTrain ())

'' '
└─ - [TaskTrain-{'do_preprocess': 'True'} (PENDING)]
   └─ - [TaskPreprocess-{'do_preprocess': 'True'} (PENDING)]
      └─ - [TaskGetData-{} (PENDING)]
'' '

# Ejecutar la tarea de entrenamiento modelo incluyendo dependencias
d6tflow.run (TaskTrain ())

'' '
===== Luigi Execution Summary =====

Programado 3 tareas de las cuales:
* 3 corrió con éxito:
    - 1 TaskGetData ()
    - 1 TaskPreprocess (do_preprocess = True)
    - 1 TaskTrain (do_preprocess = True)
'' '

# Cargar el resultado de la tarea en el marco de datos pandas y el objeto modelo para la evaluación del modelo
model = TaskTrain (). output (). load ()
df_train = TaskPreprocess (). output (). load ()
print (sklearn.metrics.accuracy_score (df_train ['y']model.predict (df_train.iloc [:,:-1]))))
# 0.9733333333333334

Conclusión

Escribir el código de aprendizaje automático como una serie lineal de funciones probablemente crea muchos problemas de flujo de trabajo. Debido a las complejas dependencias entre las diferentes tareas de ML, es mejor escribirlas como un DAG. https://github.com/d6t/d6tflow lo hace muy fácil. Alternativamente, puede usar luigi y flujo de aire pero están más optimizados para ETL que la ciencia de datos.

Bio: Norman Niemer es el científico jefe de datos en un gran gestor de activos donde ofrece información sobre inversiones basada en datos. Tiene una maestría en ingeniería financiera de Columbia University y una licenciatura en banca y finanzas de Cass Business School (Londres).

Original . Publicado de nuevo con permiso.

Dejá un comentario