Analytics para millones de eventos que tenemos al día por eso en este artículo describimos cómo organizamos Kafka, Dataflow y BigQuery juntos para ingerir y transformar una gran cantidad de eventos.
Cuando se agregan restricciones de escala y latencia, la conciliación y la reordenación se convierten en un desafío, así es como lo abordamos.
La parte 2 de este articulo se centra en cómo usamos y abusamos de Redshift para proporcionar esta información a nuestros usuarios finales.
En la publicidad digital, las operaciones diarias generan una gran cantidad de eventos que debemos rastrear para informar de manera transparente el rendimiento de la campaña. Estos eventos provienen de:
- Interacciones de los usuarios con los anuncios, enviados por el navegador. Estos eventos se denominan eventos de seguimiento y pueden ser estándar (inicio, finalización, pausa, reanudación, etc.) o eventos personalizados provenientes de creatividades interactivas creadas . Recibimos alrededor de 10 mil millones de eventos de seguimiento al día.
- Los eventos provienen de nuestros back-ends, con respecto a los detalles de las subastas de anuncios en su mayor parte (procesos de licitación en tiempo real). Generamos más de 60 mil millones de estos eventos diariamente, antes del muestreo, y deberíamos duplicar este número en 2018.
En el artículo nos centramos en el seguimiento de los eventos, ya que se encuentran en la ruta más crítica de nuestro negocio.
Los eventos de seguimiento son enviados por el navegador a través de HTTP a un componente dedicado que, entre otras cosas, los pone en un tema Kafka . Analytics es uno de los consumidores de estos eventos (más sobre esto más adelante).
Contamos con un equipo de Analytics cuya misión es cuidar estos eventos y se define de la siguiente manera:
- Ingerimos la creciente cantidad de troncos,
- Los transformamos en datos orientados al negocio,
- Que servimos de manera eficiente y adaptada a cada audiencia.
Para cumplir con esta misión, construimos y mantenemos un conjunto de herramientas de procesamiento y tuberías. Debido al crecimiento orgánico de la compañía y los requisitos de nuevos productos, desafiamos regularmente nuestra arquitectura .
Por qué nos mudamos a BigQuery analytics
En 2016, nuestra pila de Analytics se basaba en una arquitectura lambda(Storm, Spark, Cassandra) y tuvimos varios problemas:
- La escala de datos hizo imposible tenerlo todo en una sola tabla de Cassandra, lo que impidió una consulta cruzada eficiente.
- Era una infraestructura compleja con duplicación de código para las capas de lotes y de velocidad, lo que nos impide lanzar nuevas funciones de manera eficiente.
- Al final fue difícil de escalar y no era rentable,
En ese momento teníamos varias opciones posibles. Primero, podríamos haber construido un lambda mejorado, pero solo habría pospuesto los problemas que enfrentábamos.
Consideramos varias alternativas prometedoras como Druid y BigQuery . Finalmente, decidimos migrar a BigQuery debido a su gran conjunto de características.
Con BigQuery analytics somos capaces de:
- Trabajar con eventos crudos,
- Utilice SQL como un lenguaje de procesamiento de datos eficiente,
- Utilice BigQuery como el motor de procesamiento,
- Facilita el acceso explicativo a los datos (en comparación con Spark SQL o Hive),
Gracias a un plan de tarifa plana , nuestro uso intensivo (consulta y almacenamiento) es rentable.
Sin embargo, nuestro contexto técnico no era ideal para BigQuery. Queríamos usarlo para almacenar y transformar todos nuestros eventos provenientes de múltiples temas de Kafka. No pudimos alejar nuestros clusters de Kafka de AWS ni usar Pub / Sub , el equivalente administrado de Kafka en GCP, ya que estos clusters son utilizados por algunos de nuestros componentes de publicación de anuncios también alojados en AWS. Como resultado, tuvimos que enfrentar los desafíos de operar una infraestructura multi-cloud.
Hoy en día, BigQuery es nuestro sistema de almacenamiento de datos , donde nuestros eventos de seguimiento se reconcilian con otras fuentes de datos.
Ingestión
Cuando se trata de eventos de seguimiento, el primer problema que enfrenta es el hecho de que tiene que procesarlos sin orden, con retrasos desconocidos .
La diferencia entre la hora en que realmente ocurrió el evento (hora del evento) y la hora en que el sistema (tiempo de procesamiento) observa el evento varía desde el milisegundo hasta varias horas. Estos grandes retrasos no son tan raros y pueden ocurrir cuando el usuario pierde su conexión o activa el modo de vuelo entre las sesiones de navegación.
Para obtener más información sobre los desafíos del procesamiento de datos en tiempo real, recomendamos echar un vistazo a Google Cloud Next ’17 talk «Cómo avanzar y retroceder entre el procesamiento por lotes y por flujo ».
La dura realidad del streaming.
Dataflow es un sistema de transmisión administrado diseñado para abordar los desafíos que enfrentamos con la naturaleza caótica de los eventos . Dataflow tiene un modelo unificado de transmisión y programación por lotes, siendo la transmisión la característica principal.
Nos vendieron según las promesas de Dataflow y probamos con franqueza el modo de transmisión. Desafortunadamente, después de abrirlo al tráfico de producción real, tuvimos una sorpresa desagradable: los costos de inserción de BigQuery .
Nuestras estimaciones se basaron en el tamaño de los datos comprimidos (es decir, el volumen real de bytes que atraviesa la red) y no en el tamaño del formato de datos sin procesar de BigQuery. Afortunadamente, ahora esto está documentado para cada tipo de datos para que pueda hacer los cálculos.
En aquel entonces, habíamos subestimado este costo adicional en un factor de 100, lo que casi duplicó el costo de todo nuestro proceso de ingesta (Dataflow + BigQuery).
También enfrentamos otras limitaciones como el límite de velocidad de 100,000 eventos / s , que estaba peligrosamente cerca de lo que estábamos haciendo.
La buena noticia es que hay una manera de evitar completamente la limitación de inserciones de transmisión: carga por lotes en BigQuery .
Idealmente, nos hubiera gustado utilizar Dataflow en modo de transmisión con BigQuery en modo de proceso por lotes. En ese momento no había un sumidero por lotes de BigQuery para flujos de datos ilimitadosdisponibles en el SDK de flujo de datos.
Luego consideramos desarrollar nuestro propio sumidero personalizado. Desafortunadamente, era imposible agregar un sumidero personalizado a un flujo de datos ilimitado en ese momento (ver Planes de flujo de datos para agregar soporte para sumideros personalizados que escriben datos ilimitados en una versión futura ; ahora es posible que Beam sea el SDK oficial de flujo de datos) .
No teníamos más remedio que cambiar nuestro trabajo de flujo de datos al modo por lotes . Gracias al modelo unificado de Dataflow, solo fue una cuestión de unas pocas líneas de código.
Y afortunadamente para nosotros, pudimos permitirnos el retraso adicional en el procesamiento de datos introducido por el cambio al modo por lotes.
En el futuro, nuestra arquitectura de ingesta actual se basa en Scio , una API de Scala para Dataflow abierta de Spotify. Como se dijo anteriormente, Dataflow soporta de forma nativa Pub / Sub, pero la integración de Kafka fue menos madura.
Tuvimos que extender Scio para habilitar la persistencia del punto de control de compensación y habilitar un paralelismo eficiente.
Analytics: Un pipeline de micro lotes.
Nuestra arquitectura resultante es una cadena de trabajos por lotes de flujo de datos de 30 minutos, programados secuencialmente para leer un tema de Kafka y escribir en BigQuery utilizando trabajos de carga.
Una de las claves fue encontrar la duración ideal del lote . Descubrimos que existe un punto ideal para lograr la mejor compensación entre costo y rendimiento de lectura (por lo tanto, latencia). La variable a ajustar es la duración de la fase de lectura de Kafka.
Para terminar con la duración del lote completo, debe agregar la fase de escritura a BigQuery (no proporcional, pero estrechamente vinculada a la duración de la lectura), y una constante que es la duración de arranque y apagado.
Cabe destacar :
- Una fase de lectura demasiado corta reducirá la proporción entre las fases de lectura y no lectura. En un mundo ideal, una proporción de 1: 1 significa que debes poder leer tan rápido como estás escribiendo. En el ejemplo anterior, tenemos una fase de lectura de 20 minutos, para un lote de 30 minutos (relación 3: 2). Esto significa que debemos poder leer 1.5 veces más rápido de lo que escribimos. Una pequeña proporción significa una necesidad de instancias más grandes.
- Una fase de lectura demasiado larga simplemente aumentará la latencia entre el momento en que ocurrió el evento y cuando está disponible en BigQuery.
Analytics: La optimización del rendimiento
Los trabajos de flujo de datos se inician secuencialmente por razones de simplicidad y una administración de fallas más sencilla.
Es un intercambio de latencia que estamos dispuestos a tomar. Si un trabajo falla, simplemente volvemos a la última compensación Kafka comprometida.
Tuvimos que modificar la topología de nuestros clusters Kafka y aumentar el número de particiones para poder desapilar mensajes más rápido. Dependiendo de las transformaciones que realice en Dataflow, lo más probable es que el factor limitante sea la capacidad de procesamiento o el rendimiento de la red.
Para un paralelismo eficiente, siempre debe intentar mantener un número de subprocesos de la CPU que es un divisor del número de particiones que tiene (corolario: es bueno tener un número de particiones Kafka que sea un número altamente compuesto ).
En el raro caso de retrasos, podemos ajustar el trabajo con secuencias de lectura más largas. Al usar lotes más grandes, también podemos ponernos al día con el retraso a expensas de la latencia.
Para manejar la mayoría de las situaciones , dimensionamos Dataflow para poder leer 3 veces más rápido que el ritmo real. Una lectura de 20 minutos con una sola instancia de n1-highcpu-16 puede desapilar 60 minutos de mensajes.
En nuestro caso de uso, terminamos con una latencia de diente de sierra que oscila entre 3 min (duración mínima de la fase de escritura BQ) y 30 min (duración total de un trabajo).
Analytics: Transformación
Los datos sin procesar son inevitablemente voluminosos, tenemos demasiados eventos y no podemos consultarlos como están.
Necesitamos agregar estos datos sin procesar para mantener un bajo tiempo de lectura y volúmenes compactos. Así es como lo hacemos en BigQuery:
A diferencia de los procesos ETL tradicionales donde los datos se transformanantes de cargarlos , elegimos almacenarlos primero (ELT) , en un formato sin procesar.
Tiene dos ventajas principales:
- Nos permite tener acceso a todos y cada uno de los eventos sin procesar para fines de análisis y depuración,
- Simplifica toda la cadena al permitir que BigQuery realice las transformaciones con un dialecto SQL simple pero poderoso.
Nos hubiera gustado escribir directamente en la tabla de eventos en bruto que se divide diariamente. No pudimos porque un lote de flujo de datos debe definirse con un destino específico (tabla o partición) y podría incluir datos destinados a diferentes particiones. Resolvemos este problema cargando cada lote en una tabla temporal y luego comenzamos a transformarlo.
Para cada una de estas tablas temporales por lotes, ejecutamos un conjunto de transformaciones, materializadas como consultas SQL que dan salida a otras tablas.
Una de estas transformaciones simplemente agrega todos los datos a la gran tabla de eventos sin procesar, particionada por día.
Otra de estas transformaciones es el resumen : una agregación de los datos, dado un conjunto de dimensiones. Todas estas transformaciones son idempotentes y pueden volver a ejecutarse de manera segura en caso de error o necesidad de reprocesamiento de datos.
Rollups
Consultar la tabla de eventos sin procesar directamente es bueno para fines de depuración y análisis profundo, pero es imposible lograr un rendimiento aceptable al consultar una tabla de esta escala, sin mencionar el costo de dicha operación.
Para darle una idea, esta tabla solo tiene una retención de 4 meses, contiene 1 billón de eventos, para un tamaño cercano a 250 TB .
En el ejemplo anterior, contamos con 3 recuentos de eventos: Hora , ID de anuncio , ID de sitio web . Los eventos también son pivotados y transformados en columnas. El ejemplo muestra una reducción de tamaño de 2.5x, mientras que la realidad está más cerca de 70x .
En el contexto masivamente paralelo de BigQuery, el tiempo de ejecución de las consultas no se ve muy afectado, la mejora se mide en la cantidad de espacios utilizados.
Los rollups también nos permiten dividir los datos en partes pequeñas: los eventos se agrupan en tablas pequeñas para una hora determinada (hora del evento, no el tiempo de procesamiento).
Por lo tanto, si necesita consultar los datos durante una hora determinada, consultará una sola tabla (<10M filas, <10GB).
Los rollups son un tipo de agregación de propósito general que hacemos para poder consultar todos los eventos de manera más eficiente, dado un gran conjunto de dimensiones.
Hay otros casos de uso en los que queremos vistas dedicadas de los datos. Cada uno de ellos puede implementar un conjunto de transformaciones específicas para terminar con una tabla especializada y optimizada.
Límites de un servicio gestionado.
BigQuery, tan poderoso como puede ser, tiene sus límites:
- BigQuery no permite consultas a varias tablas que tienen esquemas diferentes (incluso si la consulta no está utilizando los campos que difieren). Tenemos un script para actualizar cientos de tablas cuando necesitamos agregar un campo.
- BigQuery no admite la caída de columnas . No es un gran problema, pero no ayuda a pagar la deuda técnica.
- Consulta de varias horas: BigQuery admite comodines en el nombre de la tabla , pero el rendimiento es tan malo que tenemos que generar consultas que consulten explícitamente cada tabla con UNION ALL.
- Siempre debemos unir estos eventos con datos alojados en otras bases de datos (por ejemplo, para hidratar eventos con más información sobre una campaña publicitaria), pero BigQuery no lo admite (todavía). Actualmente tenemos que copiar regularmente tablas enteras a BigQuery para poder unir los datos en una sola consulta.
Alegrías de la transferencia de datos entre nubes
Con la infraestructura de publicación de anuncios en AWS y un clúster Kafka compartido con muchos otros componentes, no tenemos más remedio que transferir una gran cantidad de datos entre las nubes de AWS y GCP , lo que no es fácil y ciertamente no es barato.
Ubicamos nuestras instancias de flujo de datos (por lo tanto, el punto de entrada de GCP principal) lo más cerca posible de nuestra infraestructura de AWS y, afortunadamente, los enlaces existentes entre AWS y GCP son lo suficientemente buenos como para que podamos usar VPN administradas.
Aunque nos encontramos con cierta inestabilidad al ejecutar estas VPN, logramos solucionarlo usando un simple script que lo apaga y enciende nuevamente. Nunca hemos enfrentado problemas lo suficientemente grandes como para justificar el costo de un enlace dedicado.
Una vez más, el costo es algo que debe vigilar de cerca y, en lo que respecta a la salida, es difícil de evaluar antes de ver la factura . Elegir cuidadosamente cómo comprimir los datos es una de las mejores ventajas para reducir estos costos.
Solo a mitad de camino
Tener todos estos eventos en BigQuery no es suficiente . Para aportar valor a la empresa, los datos deben hidratarse con diferentes reglas y métricas. Además, BigQuery no está diseñado para casos de uso en tiempo real.
Debido a los límites de concurrencia y la latencia de consulta incompresible de 3 a 5 segundos (aceptable e inherente a su diseño), BigQuery debe combinarse con otras herramientas para servir aplicaciones (paneles de control, interfaces de usuario web, etc.).
Esta tarea es realizada por nuestro servicio de analytics, un componente de Scala que se conecta a BigQuery para generar informes a pedido (hojas de cálculo) y mercados de datos personalizados (actualizados diariamente o por hora).
Este servicio específico es necesario para manejar la lógica de negocios. Sería demasiado difícil mantener como SQL y generar mercados de datos utilizando la transformación de la tubería de lo contrario.
Elegimos AWS Redshift para almacenar y servir nuestros almacenes de datos . Aunque puede parecer que no es una opción obvia para servir aplicaciones orientadas al usuario, Redshift funciona para nosotros porque tenemos un número limitado de usuarios concurrentes.
Además, el uso de un almacén de clave / valor hubiera requerido más esfuerzo de desarrollo. Al mantener una base de datos relacional intermedia, se facilita el consumo de datos.
Hay mucho que decir sobre cómo construimos, mantenemos y consultamos esos almacenes de datos a escala. La segunda parte de esta serie de artículos trata sobre eso: Cómo usamos y abusamos de Redshift para entregar nuestros datos.
Si te gusto este artículo también puedes leer otros dos temas interesantes en nuestra sección de Analytics.
Gracias por leer y yo estoy esperando para escuchar sus preguntas:)
Esté atento y Feliz Analytics!.
PD Si quiere aprender más sobre el mundo de Analytics, también puede seguirnos en Instagram , encuéntreme en linkedin o en Facebook. Me encantaría saber que te sumes!.