Su flujo de trabajo actual probablemente encadena varias funciones como en el siguiente ejemplo. Aunque es rápido, es probable que tenga muchos problemas:
- No se escala bien a medida que agrega complejidad.
- Tiene que hacer un seguimiento manual de qué funciones se ejecutaron con qué parámetro.
- Usted tiene que mantener manualmente un seguimiento de dónde se guardan los datos.
- Es difícil de leer.
import pandas as pd import sklearn.svm, sklearn.metrics def get_data (): datos = download_data () data = clean_data (data) data.to_pickle ('data.pkl') Defrproceso (datos): datos = apply_function (datos) datos de retorno # parámetros de flujo reload_source = True do_preprocess = True # Ejecutar flujo de trabajo if reload_source: obtener datos() df_train = pd.read_pickle ('data.pkl') if do_preprocess: df_train = preprocess (df_train) model = sklearn.svm.SVC () model.fit (df_train.iloc [:,:-1]df_train ['y']) print (sklearn.metrics.accuracy_score (df_train ['y']model.predict (df_train.iloc [:,:-1])))
¿Qué hacer al respecto?
En lugar de las funciones de cadena lineal, El código de ciencia de datos se escribe mejor como un conjunto de tareas con dependencias entre ellos. Ese es su flujo de trabajo de ciencia de datos debe ser un DAG.
Entonces, en lugar de escribir una función que hace:
def process_data (datos, parámetro): si parametro: datos = do_stuff (datos) más: datos = do_other_stuff (datos) data.to_pickle ('data.pkl') devolver datos
Es mejor escribir tareas que se pueden encadenar como DAG:
class TaskProcess (d6tflow.tasks.TaskPqPandas): # definir formato de salida def requiere (uno mismo): devuelve TaskGetData () # define dependencia def run (self): data = self.input (). load () # cargar datos de entrada data = do_stuff (data) # proceso de datos self.save (data) # save output data
Los beneficios de hacer esto son:
- Todas las tareas siguen el mismo patrón, sin importar cuán complejo sea su flujo de trabajo
- Tiene una entrada escalable
requiere ()
y función de procesamientoejecutada ()
- Puede cargar y guardar datos rápidamente sin tener que codificar los nombres de archivo
- Si la tarea de entrada no está completa se ejecutará automáticamente
- Si los datos de entrada o los parámetros cambian, la función se ejecutará automáticamente
Un ejemplo de DAG de aprendizaje automático
A continuación se muestra un ejemplo estilizado de un flujo de aprendizaje automático que se expresa como un DAG. Al final, solo necesitas ejecutar TaskTrain () y automáticamente sabrá qué dependencias ejecutar. Para ver un ejemplo completo, vea https://github.com/d6t/d6tflow/blob/master/docs/example-ml.md
import las pandas by pd import sklearn, sklearn.svm import d6tflow import luigi # definir flujo de trabajo class TaskGetData (d6tflow.tasks.TaskPqPandas): # save dataframe as parquet def run (self): datos = download_data () data = clean_data (data) self.save (data) # guarda rápidamente el marco de datos class TaskPreprocess (d6tflow.tasks.TaskCachePandas): # guardar datos en la memoria do_preprocess = luigi.BoolParameter (default = True) # parámetro para el preprocesamiento sí / no def requiere (uno mismo): return TaskGetData () # define dependencia def run (self): df_train = self.input (). load () # carga rápidamente los datos requeridos si self.do_preprocess: df_train = preprocess (df_train) self.save (df_train) class TaskTrain (d6tflow.tasks.TaskPickle): # guardar la salida como pickle do_preprocess = luigi.BoolParameter (por defecto = True) def requiere (uno mismo): return TaskPreprocess (do_preprocess = self.do_preprocess) def run (self): df_train = self.input (). load () model = sklearn.svm.SVC () model.fit (df_train.iloc [:,:-1]df_train ['y']) self.save (modelo) # Comprobar dependencias de tareas y su estado de ejecución d6tflow.preview (TaskTrain ()) '' ' └─ - [TaskTrain-{'do_preprocess': 'True'} (PENDING)] └─ - [TaskPreprocess-{'do_preprocess': 'True'} (PENDING)] └─ - [TaskGetData-{} (PENDING)] '' ' # Ejecutar la tarea de entrenamiento modelo incluyendo dependencias d6tflow.run (TaskTrain ()) '' ' ===== Luigi Execution Summary ===== Programado 3 tareas de las cuales: * 3 corrió con éxito: - 1 TaskGetData () - 1 TaskPreprocess (do_preprocess = True) - 1 TaskTrain (do_preprocess = True) '' ' # Cargar el resultado de la tarea en el marco de datos pandas y el objeto modelo para la evaluación del modelo model = TaskTrain (). output (). load () df_train = TaskPreprocess (). output (). load () print (sklearn.metrics.accuracy_score (df_train ['y']model.predict (df_train.iloc [:,:-1])))) # 0.9733333333333334
Conclusión
Escribir el código de aprendizaje automático como una serie lineal de funciones probablemente crea muchos problemas de flujo de trabajo. Debido a las complejas dependencias entre las diferentes tareas de ML, es mejor escribirlas como un DAG. https://github.com/d6t/d6tflow lo hace muy fácil. Alternativamente, puede usar luigi y flujo de aire pero están más optimizados para ETL que la ciencia de datos.
Bio: Norman Niemer es el científico jefe de datos en un gran gestor de activos donde ofrece información sobre inversiones basada en datos. Tiene una maestría en ingeniería financiera de Columbia University y una licenciatura en banca y finanzas de Cass Business School (Londres).
Original . Publicado de nuevo con permiso.