Cómo crear una canalización de Kubeflow Machine Learning (Parte 1)

Google Cloud anunció recientemente un proyecto de código abierto para simplificar la operacionalización de las tuberías de aprendizaje automático . En este artículo, lo guiaré a través del proceso de tomar un modelo TensorFlow del mundo real existente y operacionalizar la capacitación, evaluación, implementación y reentrenamiento de ese modelo utilizando Kubeflow Pipelines (KFP en este artículo).

El código completo para este artículo está en GitHub.

1. Cree un clúster Kubernetes con tuberías instaladas (una vez)

KFP naturalmente requiere un clúster Kubernetes para ejecutarse. Utilice este comando para iniciar un clúster del motor Kubernetes de Google (GKE) y permitir que KFP administre el clúster ( create_cluster.sh en el repositorio):

 #! / Bin / bash 
  
 CLUSTERNAME = mykfp 
 ZONE = us-central1-b
 gcloud config set compute / zone $ ZONE 
 Los clústeres de contenedores beta de gcloud crean $ CLUSTERNAME  
 --cluster-version 1.11.2-gke.18 --enable -autoupgrade  
 --zone $ ZONE  
 --scopes cloud-platform  
 --enable-cloud-logging  
 --enable-cloud-Monitoring  
 --machine -tipo n1-standard-2  
 --num-nodos 4
 kubectl create clusterrolbinding ml-pipeline-admin-binding --clusterrole = cluster-admin --user = $ (cuenta de obtención de valor de gcloud config)

La creación de un clúster GKE tarda aproximadamente 3 minutos, por lo que navegue hasta la sección GKE de la consola GCP y asegúrese de que el clúster esté listo y listo.

Una vez que el clúster esté activo, instale ML tuberías en el grupo GKE er ( 2_deploy_kubeflow_pipelines.sh en el repositorio):

 #! / bin / bash 
 PIPELINE_VERSION = 0.1.2 
 kubectl create -f  https: //storage.jpg .com / ml-pipeline / release / $ PIPELINE_VERSION / bootstrapper.yaml 

Cambie la versión anterior a la última versión de en la página de la versión . La implementación de este paquete llevará varios minutos. Puede ver el estado del trabajo y esperar a que el número de ejecuciones exitosas cambie a 1. O puede decirle a kubectl que espere a que finalice la creación:

 #! / Bin / bash
 jobname = $ (kubectl get job | tail -1 | awk & # 039; {print $ 1} & # 039;) 
 kubectl wait --for = condition = complete --timeout = 5m $ jobname

Mientras esperaba el software Para completar la instalación, puede continuar leyendo

¡Palabras de moda hasta el final!

2. Descripción de la tubería

Hay un par de formas para crear tuberías. La forma “dev-ops” es usar Python3 y Docker. Una forma más amigable para los científicos de datos es usar los cuadernos Jupyter. En una publicación posterior, te mostraré el método del cuaderno Jupyter, pero en esta publicación, te mostraré el mecanismo Python3-Docker. Puede ser útil entender esto para que sepa qué sucede detrás de la escena cuando toma la ruta de Jupyter.

Lo ilustraré en un modelo de aprendizaje automático para predecir el peso de un bebé. Este modelo tiene los siguientes pasos: (a) extraer datos de BigQuery, transformarlos y escribir los datos transformados en Cloud Storage. (b) Capacite un modelo de TensorFlow Estimator API y realice un ajuste hiperparamétrico del modelo (c) Una vez que se determine la mejor velocidad de aprendizaje, el tamaño del lote, etc., adiestre el modelo por más tiempo y con más datos usando esos parámetros model to Cloud ML Engine.

Aquí está el código de Python que describe la tubería anterior ( mlp_babyweight.py en el repositorio):

 preprocess = dsl.ContainerOp (
 name = 
 ] & # 039; preprocess & # 039; 
   image =  & # 039; gcr.io/cloud-training-demos/babyweight-pipeline-bqtocsv: último & # 039; 
 argumentos = [
     & # 039; - proyecto & # 039; proyecto, 
     & # 039; - modo & # 039;  & # 039; cloud & # 039; 
     & # 039; - bucket & # 039; bucket 
]
 file_outputs = { & # 039; bucket & # 039; :  & # 039; /output.txt' } 
)
 hparam_train = dsl.ContainerOp (
 name =  & # 0 39; hipertrain & # 039; 
   image =  & # 039; gcr.io/cloud-training-demos/babyweight-pipeline-hypertrain: últimos & # 039; 
 argumentos = [
 preprocess.outputs [ & # 039; bucket & # 039; ] 
]
 file_outputs = { & # 039; jobname & # 039; :  & # 039; /output.txt' } 
)
 train_tuned = dsl.ContainerOp (
 name =  & # 039; traintuned & # 039; 
 , 
   imagen =  & # 039; gcr.io/cloud-training-demos/babyweight-pipeline-traintuned-trainer: último & # 039; 
   argumentos = [
 hparam_train. genera [ & # 039; nombre de trabajo & # 039; ]
 bucket 
]
 file_outputs = { & # 039; train & # 039; :  & # 039; /output.txt' } 
) 
 train_tuned.set_memory_request ( & # 039; 2G & # 039; 
 train_tuned.set cpu  & # 039; 1 & # 039; ) [19659007] deploy_cmle = dsl.ContainerOp (
 name =  & # 039; deploycmle & # 039; 
   image =  & # 039; gcr.io/cloud-training-demos/ babyweight-pipeline-deploycmle: últimos & # 039; 
 argumentos = [
 train_tuned.outputs [ & # 039; train & # 039; ] # modeldir 
     ] & # 039; peso del bebé & # 039; 
     & # 039; mlp & # 039; 
  ]
 file_outputs = {
     & # 039; model & # 039; :  & # 039; /model.txt' 
     & # 039; versión & # 039; :  & # 039; /version.txt' 
   } 
) 

Cada uno de los pasos anteriores es un contenedor Docker. La salida de un contenedor Docker es la entrada a otro paso posterior: es un gráfico acíclico dirigido, que cuando se carga en la interfaz de usuario de las tuberías, tendrá este aspecto:

ML Pipeline

(El extremo a extremo completo la aplicación también incluye un quinto paso, para implementar una aplicación web que proporciona una interfaz de usuario al modelo.)

Observe el paso de preproceso:

 preprocess = dsl.ContainerOp (
 name =  & # 039; preprocess & # 039; 
   image =  & # 039; gcr.io/cloud-training-demos/babyweight-pipeline-bqtocsv: más reciente & # 039; 
 argumentos = [
     & # 039; - proyecto & # 039; proyecto, 
     & # 039; - modo & # 039;  & # 039; cloud & # 039 ; 
     & # 039; - bucket & # 039; bucket 
]
 file_outputs = { & # 039; bucket & # 039; :  & # 039; /output.txt' } 
)

Tenga en cuenta que utiliza dos parámetros de entrada: el proyecto y d la cubeta – y pone su salida (la cubeta que contiene los datos preprocesados) en /output.txt.[19659002◆Asíqueelsegundopasoobtieneestenombredecubetautilizandopreprocessoutputs[‘bucket’] y pone su salida (el nombre del trabajo de la prueba de ajuste de hiperparámetro más exitoso) que luego se usa en el tercer paso de la tubería como hparam_train.outputs [‘jobname’].

¡Bastante sencillo!

Por supuesto, esto conlleva dos preguntas: ¿de qué manera el primer paso recibe el proyecto? y el balde que necesita? Segundo, ¿cómo se implementan los pasos individuales?

El proyecto y el grupo se obtienen a través de los parámetros de la tubería:

 def train_and_deploy (
 project = dsl.PipelineParam (name = & # 039; project & # 039 ;, value = & # 039; cloud-training-demos & # 039;), 
 bucket = dsl.PipelineParam (name = & # 039; bucket & # 039 ;, value = & # 039; cloud-training-demos-ml & # 039; ) 
):

Esencialmente, el usuario final proporcionará el proyecto y el paquete cuando se ejecute la canalización. La IU se rellenará previamente con los valores predeterminados que proporcioné anteriormente:

Los parámetros de canalización son proporcionados por el usuario final

El resto de esta sección trata sobre cómo se implementan los pasos individuales. Nuevamente, como mencioné, voy a explicar la forma en que Python3-Docker implementa las cosas. En un post posterior, explicaré el camino de Jupyter. Entonces, si está seguro de que nunca irá a la ruta Docker, solo hojee esta sección o pase a la Sección 3.

2a. Preproceso

En cada paso, debe crear un contenedor Docker. Esencialmente, se trata de un programa independiente (bash, Python, C ++, lo que sea) con todas sus dependencias muy bien especificadas para que KFP pueda ejecutarlas en el clúster.

En mi caso, mi código de preprocesamiento es un programa Python independiente que usa Apache Beam y espera ser ejecutado en Cloud Dataflow. Todo el código está en un solo programa de línea de comandos llamado transform.py :

El código de preprocesamiento está en un solo archivo llamado transform.py

Mi Dockerfile tiene que especificar todas las dependencias. Afortunadamente, KFP tiene un montón de contenedores de muestras y uno de ellos tiene todas las dependencias que necesitaría. Por lo tanto, simplemente heredar de ella. Mi Dockerfile es, por lo tanto, todas las 4 líneas:

 FROM gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:latestorrect19459011◆RUN mkdir / babyweight 
 COPY transform .py / babyweight 
 ENTRYPOINT ["python", "/babyweight/transform.py"]

Esencialmente, copio transform.py en el contenedor Docker y digo que el punto de entrada es para ejecutar ese archivo.

Luego puedo construir mi contenedor Docker y publicarlo en gcr.io en mi proyecto en build.sh :

 CONTAINER_NAME = babyweight-pipeline-bqtocsv
docker compilación -t $ {CONTAINER_NAME}. 
 docker tag $ {CONTAINER_NAME GCR.NG. io / $ {PROJECT_ID} / $ {CONTAINER_NAME}: $ {TAG_NAME} 
 docker push gcr.io/${PROJECT_ID}/${CONTAINER_NAME}:${TAG_NAME Editor19659009IDThis is, of course, the_nombre que especifiqué para el paso de preprocesamiento.

2b. Entrenamiento y ajuste de hiperparámetro en CMLE

Realizaré entrenamiento y ajuste de hiperparámetro en Cloud ML Engine usando bash para enviar el trabajo usando gcloud :

 gcloud ml-engine jobs enviar entrenamiento $ JOBNAME  
 --region = $ REGION  
 --module-name = trainer.task  
 --package-path = $ {CODEDIR} / babyweight / trainer  
 --job-dir = $ OUTDIR  
 --staging-bucket = gs: // $ BUCKET  
 --scale-tier = STANDARD_1  
 --config = hyperparam.yaml  
 --runtime-version = $ TFVERSION  
   - stream-logs   
 -  
 --bucket = $ {BUCKET}  
 --output_dir = $ {OUTDIR}  
 --eval_steps = 10  
 --train_examples = 20000
# escriba el archivo de salida para el siguiente paso en la tubería 
 echo $ JOBNAME> /output.txt[19659009◆Notaqueutilizo-registrosdesecuenciasqueelcomandogcloudesperafinalizaciónytengaencuentaqueescriboelnombredeltrabajoqueformapartedelcontratoconelsiguientepasoenelcódigodecanalización

Esta vez, crearé mi Dockerfile heredando de un contenedor que ya tiene gcloud instalado, y git clonar el repositorio que contiene el código del entrenador real:

 DESDE google / cloud-sdk: la última
 EJECUTE mkdir -p / babyweight / src &&  
 cd / babyweight / src &&  
 git clone  https://github.com/GoogleCloudPlatform/training -data-analyst 
 COPY train.sh hyperparam.yaml ./[19659007◆ENTRYPOINT["bash", "./train.sh"]

2c. Capacitación local en Kubernetes

Los pasos de preprocesamiento e ajuste de hiperparámetros anteriores aprovecharon los servicios administrados. Todo lo que KFP estaba haciendo era enviar los trabajos y permitir que los servicios administrados hicieran lo suyo. ¿Qué hay de hacer algo en el clúster Kubernetes en el que se está ejecutando KFP?

Solo para mezclar un poco las cosas, permítame hacer el siguiente paso (entrenamiento) localmente en el clúster GKE. Para hacer eso, simplemente puedo ejecutar un contenedor de Docker que invoca python directamente:

 NEMBEDS = $ (los trabajos de gcloud ml-engine describen $ HYPERJOB - formato & # 039; valor (trainingOutput.trials.hyperparameters.nembeds.slice ( 0)) & # 039;) 
 TRIALID = $ (los trabajos de gcloud ml-engine describen $ HYPERJOB --format & # 039; valor (trainingOutput.trials.trialId.slice (0)) & # 039;) [19659007] ...
 OUTDIR = gs: // $ {BUCKET} / babyweight / hyperparam / $ TRIALID 
 python3 -m trainer.task  
 --job-dir = $ OUTDIR  
 - -bucket = $ {BUCKET}  
 --output_dir = $ {OUTDIR}  
 --eval_steps = 10  
 --nnsize = $ NNSIZE  
 --batch_size = $ BATCHSIZE  
 --nembeds = $ NEMBEDS  
 --train_examples = 200000

Hay un problema al realizar un trabajo en el clúster. ¿Cómo sabe que el clúster no está realizando alguna tarea que está agotando toda la memoria disponible? La operación de contenedor traintuned “reserva” la cantidad necesaria de memoria y CPU. kubeflow se encargará de programar el trabajo solo cuando los recursos necesarios estén disponibles:

 train_tuned.set_memory_request (& # 039; 2G & # 039;) 
 train_tuned.set_cpu_request (& # 039; 1 & # 039;)

2d. Implementación en Cloud ML Engine

Implementación en Cloud ML Engine también usa gcloud, así que deploy.sh y build.sh en ese directorio deberían parecer bastante familiares:

 gcloud ml-engine las versiones crean $ { MODEL_VERSION}  
 --model $ {MODEL_NAME} - origin $ {MODEL_LOCATION}  
 --runtime-version $ TFVERSION
 echo $ MODEL_NAME> /model.txtcuelgue19459011 busque en $ MODEL_VERSION / version.txt

y

 DE google / cloud-sdk: la última
 RUN mkdir -p / babyweight / src &&  
 cd / babyweight / src &&  
 git clone ] : //github.com/GoogleCloudPlatform/training-data-analyst 
 COPY deploy.sh ./[19659007◆ENTRYPINT["bash", "./deploy.sh"]

Así como entrenamos en el grupo de Kubernetes, también podemos implementar en Kubernetes (ver este componente para un ejemplo ).

2e. Compile DSL

Ahora que se ha creado todo el contenedor de Docker para todos los pasos, podemos enviar la canalización a KFP excepto … que KFP nos pide que compilemos nuestro archivo Python3 de tubería en un lenguaje específico del dominio. Lo hacemos con una herramienta llamada dsl-compile que viene con el SDK de Python3. Entonces, primero instale ese SDK ( 3_install_sdk.sh ):

 pip3 instale python-dateutil  https://storage.googleapis.com/ml-pipeline/release/0.1.2/kfp .tar.gz  - actualizar

Luego, compile el DSL:

 dsl-compile --py mlp_babyweight.py --output mlp_babyweight.tar.gz

Es este el archivo tar que desea subir a la interfaz de usuario de tuberías ML en la Sección 3.

3. Canalización de carga

El servidor de la interfaz de usuario se ejecuta en el clúster GKE en el puerto 80. Haga un reenvío de puertos para que pueda acceder a él desde su computadora portátil como puerto 8085 ( 4_start_ui.sh ):

 export NAMESPACE = kubeflow 
 kubectl port-forward -n $ {NAMESPACE} $ (kubectl get pods -n $ {NAMESPACE} --selector = service = embajador -o jsonpath = & # 039; {. items [0]. metadata.name} & # 039;) 8085: 80

Ahora, abra su navegador en http: // localhost: 8085 / pipeline y cambie a la pestaña Pipelines. Luego, cargue el archivo tar.gz creado en la Sección 2e como canalización.

Esto simplemente hace que el gráfico esté disponible. Probablemente ejecutará esto con varias combinaciones de configuraciones. Entonces, comienza un experimento para mantener todas estas carreras. Nombré mi experimento “blog” y luego creé una carrera. Nombré la ejecución ‘try1’ y la configuré con la canalización que se acaba de cargar:

La tubería ahora comienza a ejecutarse. Puedo ver cada paso que se está ejecutando y los registros de cada uno:

4. Experimentar, desarrollar, volver a entrenar, evaluar …

El código fuente completo incluye algunas cosas que pasé por alto. Obviamente, el código completo no apareció completamente formado y no funcionó la primera vez. En lugar de comenzar la tubería desde cero cada vez, la configuré para que pudiera comenzar en cualquier paso (el paso en el que estaba trabajando).

Sin embargo, cada paso depende de la salida del paso anterior. ¿Como hacer esto? Esta fue mi solución:

 if start_step <= 2: 
     hparam_train = dsl.ContainerOp (
 name = & # 039; hypertrain & # 039;, 
. La imagen debe ser una compilación. -tiempo cadena 
 imagen = & # 039; gcr.io/cloud-training-demos/babyweight-pipeline-hypertrain: último & # 039;, 
 argumentos = [
 preprocess.outputs [& # 039; bucket & # 039;] 
]
 file_outputs = {& # 039; jobname & # 039 ;: & # 039; /output.txt'} 
) 
   else: 
 hparam_train = ObjectDict ({
 & # 039; produce & # 039 ;: {
 & # 039; nombre de trabajo & # 039 ;: & # 039; babyweight_181008_210829 & # 039; 
}) 
]) 
 [19659009] Esencialmente, la operación de contenedor se crea si el start_step ≤ 2. De lo contrario, simplemente creo un diccionario con una salida "conocida" del paso anterior. De esta manera, simplemente puedo establecer que start_step sea 4 para comenzar con el cuarto paso, omitiendo los pasos anteriores. Sin embargo, los pasos posteriores tendrán las entradas que esperan de cualquier paso anterior.

Mi experimento consistió en varias ejecuciones:

Por ejemplo, try1 falló después de ejecutar con éxito 3 pasos, y podría comenzar desde el paso 4. Necesitaba dos trata de obtener el paso 4 a la derecha. Luego, agregué el paso 5 (que implementa una aplicación AppEngine para afrontar el servicio web). Luego, volví y probé diferentes variaciones de la etapa 3.

4b. Recapacitación

Por supuesto, no vas a entrenar a un modelo una vez y lo dejas para siempre (¿verdad?). Querrás volver a entrenar el modelo una vez que tengas más datos. En nuestro modelo de peso bebé, podríamos imaginar volver a entrenar el modelo una vez que tengamos un año más de datos. La forma en que manejo esto en la canalización es solicitar un StartYear como parámetro de entrada:

 def train_and_deploy (
 project = dsl.PipelineParam (name = & # 039; project & # 039 ;, value = & # 039 ; cloud-training-demos & # 039;), 
 bucket = dsl.PipelineParam (name = & # 039; bucket & # 039 ;, value = & # 039; cloud-training-demos-ml & # 039;), 
 startYear = dsl.PipelineParam (name = & # 039; startYear & # 039 ;, value = & # 039; 2000 & # 039;) 
):

El código de preproceso en el paso bqtocsv solo extrae filas en / después del año de inicio:

 WHERE year> = start_year

y nombra los archivos de salida para que no obstruyan la salida anterior:

 os.path.join (OUTPUT_DIR, & # 039; train_ {}. csv & # 039 ;, start_year)

y el entrenamiento ahora sucede en un conjunto de datos más grande porque la coincidencia del patrón es para el tren *. ¡Eso es!

Por supuesto, hay otras formas de manejar el reciclaje. Podríamos tomar el modelo anterior y entrenar solo en los datos más recientes. La semántica como esta no es difícil de incorporar: use convenciones de nomenclatura para ayudar a la tubería a hacer lo correcto.

4c. ¿Evaluación? ¿Cuadernos?

Normalmente, por supuesto, usted no lleva a un modelo entrenado a producción inmediatamente. En su lugar, realizará una evaluación y tal vez pruebas A / B antes de cambiar todo el tráfico.

Además, no se desarrollará en los contenedores Docker. Tuve suerte aquí porque mi modelo babyweight era prácticamente un paquete de Python y bastante conveniente para Dockerize. ¿Qué sucede si realiza todo el desarrollo de su modelo en un cuaderno Jupyter?

Cómo hacer una evaluación y cómo convertir un cuaderno en una canalización son temas para otra publicación de blog. Mira este espacio.

5. ¡Hazlo!

El código completo para este artículo está en GitHub. Clone el repositorio y siga los pasos en README.

También, lea La publicación del blog de Amy Unruh sobre cómo comenzar con las tuberías de Kubeflow.

Dejá un comentario