martes, 31 de enero de 2023

MicroBatching en consumidores de Kafka

Algunas veces montamos sistemas más complejo de lo necesario porque desconocemos cómo funciona la tecnología que usamos.

Por ejemplo en la oficina un proyecto estaba proponiendo en un escenario con una etl en tiempo real que mueve datos de una base de datos a otro sistema usando kafka streams y kafka connect.

No saber como funciona la el consumidor de kafka, puede hacer que si el comportamiento por defecto no encaja, se aumente innecesariamente la complejidad del sistema que se implementa.

En este caso, el problema viene cuando en la base de datos de origen se realizan batches por la noche y en ese caso necesitan hacer una llamada por batch en vez de uno en uno para escribir los datos en el sistema origen.

El api que tiene que usar el proyecto para enviar los datos al otro sistema no tiene un buen rendimiento y han habilitado un método que permite el envío por lotes. Así que en vez de hacer una llamada por cada mensaje los puedes agrupar y mandar un lote.

Así que el equipo realiza una aplicación de kafka streams que dependiendo a qué hora se procesa el mensaje la salida se escribe en un mensaje u otro. Como podemos ver en el diagrama, cuando los mensajes se procesan por la noche, la información la mandan a un bulk topic que serán procesados por un conector de kafka connect manda por lotes los mensajes. Mientras que por el día el flujo de datos con menos intensidad continua por otro topic y otro sink

📝 Esto no es una buena idea

En esta solución hay varias cosas que pintan mal. Primero dos topics iguales, dos conectores… así que tenemos un sistema más complejo de lo esperado, con dos conectores en vez de uno que mantener y además dos topics con la misma estructura y posiblemente los mismos datos que generan un montón de dudas, como imposible asegurar un orden o coordinar la carga, imaginaos que el conector del bulk topic se atasca y deja de procesar (por el motivo qué sea) pasa ¿Qué pasa cuando acaba la noche y el conector de normal funciona?

Primero, no creo que sea necesario dos topics, si todos los mensajes representan lo mismo y tienen el mismo formato de datos… ¿para qué dos topics? Por otro lado, en vez de tener dos conectores, se puede tener uno y en función de cuantos mensajes haya leído el conector se puede mandar un batch o de uno en uno. Pero… ¿cómo controlo cuantos mensajes puede leer el consumidor de kafka? En la documentación podemos leer como se comporta el consumidor. Dónde nos explican que el consumidor sigue una estrategia de polling

The consumer provides two configuration settings to control the behavior of the poll loop:

  • max.poll.interval.ms: By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from poll(long). The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call poll often enough.
  • max.poll.records: Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.

Así que si sabemos cuantos mensajes como máximo podemos enviar al api cuando procesamos por lotes podemos fijarlo en max.poll.records y según la calidad del servicio y la latencia que queramos tener podemos decir cuanto tiempo espera para leer los mensajes de kafka el consumidor con max.poll.interval.ms

Solo hay un truco, para modificar la configuración de los conectores en un cluster de kafka connect hay que habilitar la posibilidad de que el conector sobrescriba las configuraciones del cliente de kafka y poner en la configuración del conector los las propiedades anteriormente citadas con el prefijo consumer.override. Y aun se podría optimizar más y podemos hacer un sink más inteligente, pero con estas sencillas configuraciones se puede simplificar el sistema quedando algo como esto:


La configuración de los consumidores de kafka es algo que puede parecer fácil al principio pero puede llegar a ser muy complejo, un deep dive del consumidor (o de un sink connector de Kafka Connect) queda fuera del alcance de este post.

El objetivo de este post es explicar que siempre hay que profundizar un poco en la tecnología que se está usando y por otro lado siempre es buena idea leer la documentación oficial de los proyectos, posiblemente encuentres la explicación que necesitas

viernes, 29 de julio de 2022

Seguridad y burócratas

Esto es una dramatización de hechos reales, como un telefilm de sobremesa de A3

<Modo Faemino y Cansado>
Buena noshe

Buenah

¿Tu sabes con quién estás hablando?

No, ¿con quién?

Hay piltrafa!! soy el técnico de seguridá

ahí va, el de seguridad!!!! 

Sí, el mismo

¿Y usted que quiere? ¿Qué le puedo ofrecer?

Seguridá

Claro, claro, aquí todo es muy seguro

No sé yo, no sé yo.
Aquí hay mucho listo, pero el que dice que algo es seguro soy yo

Claro, por supuesto, ¿Quiere ver nuestro código? quiere que le explique como se despliega y qué hace nuestro servicio??

¿Tu me vas a explicar a mi lo que hace tu servicio? tu a mi??
¿Pero tu no sabes que yo soy el de seguridad?

Sí, sí

¿Te piensas que me interesa saber lo que hace tu aplicación? ¿o que me expliques el código?
Ni loco!!!! no voy a caer en tu ataque de ingenieria social... que yo sé quien es kevin mini y tu no... piltrafa

Vale, vale, perdone usted... si es que usted es el de seguridá

yo lo que voy a poner son unos scanners de dependencias a ver lo que tenemos aquí
¡¡¡¡¡OSTIÁ!!!!!

¿¿¿Capasao???

Una vulnerabilidá

¿Una vulnerabilidad?

Una vulnerabilidá CRÍTICA, C-R-Í-T-I-C-A

¡¡¡¡OSTIÁ!!!!! ¿Estás seguro?

Pero quien te crees que soy???, por supuesto!!!! lo dice el scanner

Pero si la descripción del CVE dice que esto solo ocurre en Windows y el software rueda en Linux 

Pero con qué lo has desarrollado?

Con un Mac pero eso... ¿Qué tiene que ver?

Tu sabes que Bill Gates sigue siendo accionista de Apple??

¡¡¡¡Qué cabrón!!!! Pero eso... 

 Es que no las piensas!!!! 

¡Hace años que el software está rondando en producción... y sin problemas

Eso no te lo crees ni tu chaval!! Joío Bill Gates y sus vulnerabilidades
Ahora te voy a analizar el código

Sabes que el proyecto está en Scala y no en Java...

Escala es lo de la miniaturas, yo creo que ya eres mayor para dejar de jugar con figuritas.
A ver que dice el análisis....
¡¡¡OSTIÁ!!!

¿¿¿Capasao???

Posible Null Pointer exception

Pero ahí hay un pattern matching y se trata el caso del null en la línea anterior

POSIBLE NULL POINTER EXCEPTION

Pero que está tratado

OTRO posible Null Pointer exception

Pero si eso es una clase autogenerada basada en un contrato avro y ese campo no es nullable

POSIBLE NULL POINTER EXCEPTION
¿Qué parte no entiendes?

Pero que eso no...

No digas más, lo que me faltaba

¿Qué?

Una línea con un largo de 81

pero que tengo una pantalla panorámica para programar

programar dice el tío... tú qué vas a saber programar si tienes una línea de 81 caracteres

QUE VA, QUE VA, QUE VA

yo leo a Kierkegaard
yo leo a Kierkegaard

</Modo Faemino y Cansado>

Tenía una entrada larguísima explicando la importancia de los perfiles técnicos y lo malo que son los burócratas que siguen ciegamente reglas o herramientas que no comprenden con el único objetivo pasar una lista de checks. La diferencia entre una vulnerabilidad y la capacidad de explotarla, entender el contexto, etc

Faemino y Cansado ¡Grandes!


lunes, 28 de febrero de 2022

Como reaccionar cuando te apagan la aplicaión

El otro día le comentaba a Pedro que para no perder datos en una aplicación de java si te matan el proceso lo único que hacía era controlar la InterrupException y a correr....

Pero Pedro me dijo que estaba equivocado y... tenía razón. Si quieres ejecutar código cuando a tu proceso cuando el sistema operativo manda un SIGTERM (por ejemplo un liveness probe en kubernetes, un kill, ...) lo correcto en java es registrar un ShootdownHook

Es decir algo tal que así

public class App 
{
    public static void main( String[] args )
    {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {    
             System.out.println("Inside Add Shutdown Hook : " + Thread.currentThread().getName()) ;
            }
           }); 
        System.out.println( "Starting " + Thread.currentThread().getName() );

        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            System.out.println( "INTERRRUPTED" );
        }
        System.out.println( "END" );
    }
}

Hay que pensar que este hook se va a ejecutar siempre. Se pueden crear tantos hooks como quieras.

En .NET tienes la variable de Environment.HasShutdownStarted, pero también puedes un código parecido al de java suscribiéndote al evento ProcessExit del AppDomain por ejemplo:

// See https://aka.ms/new-console-template for more information
Console.WriteLine($"Start {Thread.CurrentThread.ManagedThreadId}");

var thread = Thread.CurrentThread;
AppDomain.CurrentDomain.ProcessExit += (s, e) => Console.WriteLine(
    $@"Process exiting {Thread.CurrentThread.ManagedThreadId} 
    Main Thread state {thread.ThreadState.ToString()}");

try
{
    Environment.HasShutdownStarted
    Thread.Sleep(10000);    
}
catch (System.Exception)
{
    
    Console.WriteLine("Interrupted");
}

Console.WriteLine("END");


domingo, 17 de octubre de 2021

Back to the console... oh my posh mola!

Hace poco que he migrado de windows 10 a windows 11.
Em windows 10 tenía ya instalado WSL y el nuevo terminal de windows.
Además tenia el promt de powershell personalizado con oh my posh y el bash con powerline.

Cuando arranco el terminal de windows 11 algunos caracteres parecen extraños. como si me faltasen glifos en la funente u otra cosa.
Originalmente estaba usando la fuente Cascadia Code y la habia parcheado con los iconos que faltaba en Ned Fonts. Así que posiblemente al instalar Windows 11, sobreescibrió mi fuente parcheada por una nueva versión, ya que esta es la fuente que viene con windows terminal.

Así que comencé a mirar y lo primero que descubrí es que había una nueva versión de oh my posh y que ahora se puede usar en todos los sistemas operativos.
Por otro lado, resulta que en Ned Fonts hay una versión parcheada de Cascadia Code que se llama Caskaydia Cove. Así que ya no me van a sobreescribir la fuenta con actualizaciones.

Aunque no soy muy amigo de las personalizaciones, estoy muy contento con el poco esfuerzo necesario lo bien que quedan las lineas de comando personalizadas, así lucen mi poswershell y mi bash

Y si alguno tiene curiosidad estos son los temas que uso

domingo, 8 de noviembre de 2020

Proyectos mixtos Java\Scala con Maven


En algunas ocasiones puede resultar interesante hacer proyectos mixtos Java\Scala. 
Es algo sencillo de hacer y aunque la propia documentación de scala  te lleva al scala-maven-plugin no es fácil de encontrar.
Para empezar tienes que saltar varios enlaces y luego te hace sospechar que no estas en el sitio correcto cuando ves que el plugin no es de org.scala-lang. 

Al lío...
Para compilar en un mismo proyecto código java y scala, pondremos los fuentes de java bajo el path: src/main/java
y el código sacla bajo el path: src/main/scala

En el pom debemos de añadir la dependencia con scala y modificar el build para que use dos compiladores, el de por defecto de maven y el de scala, siguiendo la documentación acabaríamos con un pom como este:
 
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelversion>4.0.0</modelversion>

	<groupid>sandbox</groupid>
	<artifactid>testJavaAndScala</artifactid>
	<version>1.0-SNAPSHOT</version>
	<name>Test for Java + Scala compilation</name>
	<description>Test for Java + Scala compilation</description>

	<dependencies>
		<dependency>
			<groupid>org.scala-lang</groupid>
			<artifactid>scala-library</artifactid>
			<version>2.7.2</version>
		</dependency>
	</dependencies>

	<build>
		<pluginmanagement>
			<plugins>
				<plugin>
					<groupid>net.alchim31.maven</groupid>
					<artifactid>scala-maven-plugin</artifactid>
					<version>4.4.0</version>
				</plugin>
				<plugin>
					<groupid>org.apache.maven.plugins</groupid>
					<artifactid>maven-compiler-plugin</artifactid>
					<version>2.0.2</version>
				</plugin>
			</plugins>
		</pluginmanagement>
		<plugins>
			<plugin>
				<groupid>net.alchim31.maven</groupid>
				<artifactid>scala-maven-plugin</artifactid>
				<executions>
					<execution>
						<id>scala-compile-first</id>
						<phase>process-resources</phase>
						<goals>
							<goal>add-source</goal>
							<goal>compile</goal>
						</goals>
					</execution>
					<execution>
						<id>scala-test-compile</id>
						<phase>process-test-resources</phase>
						<goals>
							<goal>testCompile</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
			<plugin>
				<groupid>org.apache.maven.plugins</groupid>
				<artifactid>maven-compiler-plugin</artifactid>
				<executions>
					<execution>
						<phase>compile</phase>
						<goals>
							<goal>compile</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

Con esto en vuestro proyecto compila java y scala a la vez.
Si lo que reis ver en acción, hemos hecho un video dónde usamos los dos lenguajes en una aplicación springboot.


sábado, 17 de octubre de 2020

¿Qué es WebAssembly?

WebAssembly es un lenguaje de bajo nivel similar a ensamblador. Un conjunto de instrucciones que se almacenan en formato binario y se ejecuta en los principales navegadores. Los responsables del proyecto afirman que el rendimiento es cercano a ejecutar código nativo.

Un poco de historia...
Con el crecimiento de internet, y a pesar que las primeras versiones de javascript no gustaban a la mayoría de los desarrollos, el uso de JavaScript fue aumentando. Técnicas como la compactación de javascript para hacerlo más pequeño y fuese más rápido de cargar se generalizó.

Distintas voces comenzaron a hablar sobre la importancia de JS en el desarrollo web, por ejemplo en 2011 Scott Hanselman escribió el (polémico en su día) post Javascript es el ensamblador de la web

En el 2013 aparece asm.js un subconjunto de operaciones de javascript que permitía compilar a otros lenguajes como C

En el 2019 se cierra la primera versión del standard WebAssembly y es soportado por los 4 navegadores más populares en internet del momento: Chrome, Mozilla, Safari y Explorer.

Al igual que asm.js, si se compila un programa a WebAssembly se podrá ejecutar código en el navegador.

¿Qué significa esta tecnología? 
Esto abre muchas posibilidades, por ejemplo si quieres usar un sqlite en el lado del cliente de una aplicación web puedes hacerlo, pues alguien ha compilado sqlite a wasm. Puedes encontrar las releases aqui

Pero... ¿es cierto que WebAssembly ofrece un rendimiento similar al código nativo? Han portado la demo del Doom3. 

 

miércoles, 15 de abril de 2020

back to the console: listar consumidores de kafka y sus topics

Kafka tiene un montón de herramientas de líneas de comando, puedes consumir topics, producir mensajes y administrar topics, etc

Lo que no hay es una herramienta que liste los consumer groups y los topics que están usando.
Pero desde la consola podemos sacar esa información:

Con kafka-consumer-groups.sh --list se listan los consumer groups que hay en el cluster.
Con kafka-consumer-groups.sh --describe --topic lista el offset del consumer group para cada topic y partición.

Así que si hacemos un for y recorremos todos los consumer groups y con awk extraemos de la salida el nombre del topic demos sacar la relación de topics y consumer groups.
Tan solo nos queda un paso que es eliminar los duplicados, eso lo hacemos con sort -u que nos devolverá las lineas ordenadas sin duplicados.

Por ejemplo:
~$ for t in $(/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 2>/dev/null); do /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group $t 2>/dev/null | awk -v var="$t," '{if (NR>2) print var$1}' ; done | sort -u
console-consumer-48665,test
console-consumer-58591,test

viernes, 3 de abril de 2020

Apache Kafka. una pequeña introducción

Seguro que en los últimos años has oído hablar de Apache Kafka. En esta serie de post intentaremos explicar qué es Kafka y como funciona.

Un poco de historia
Kafka fue creado en Linkedin por un equipo lideardo por Jay Kreps y entre los que destacan Neha Narkhende and Jun Rao.
Este equipo desarrolló Kafka para solucionar un problema de integración entre distintos sistemas dentro de LinkedIn.

Jay explica como y porqué diseñó Kafka en este post
Muy, muy, pero que muy resumido, en vez de hacer una integración de sistemas uno a uno, como sería esto:
 

Asi que en vez de todas estas sincronizaciones personalizadas, se propone un único punto de integración que acepte  propuso un único punto de integración, donde todos los sistemas publicasen la información en ese punto y todos los sistemas que consumían información serían capaces de suscribirse y consumir la información que ellos necesitasen:
Esa caja central (Unifield Log) acabaría siendo lo que es hoy Kafka.
El invento funcionó tan bien que Jay, Neha y Jay fundaron Confluent una empresa que se dedica a vender servicios y soporte profesional de Kafka

¿Qué es Apache Kafka?
En la pagina del proyecto apache, kafka se define como una plataforma de streaming distribuida con estas tres características:
  • Flujos de mensajes basado en productores y subscriptores, parecidos a una cola de mensajería o un sistema de mensajes empresarial
  • Almacena flujos de registros de manera tolerante a fallos y duradera
  • Procesa  flujos de registros a medida que se generan.

Mientras que Jay en el post que vimos anteriormente define kafka como un log distribuido y escalable.

Mensajes, topics, productores y consumidores

La unidad básica de kafka es el mensaje. Un mensaje en kafka es un array de bytes, no tiene formato específico. Los mensajes se almacenan en topics. Un topic estará dividido en una o más particiones. Pero para simplificar las explicaciones, por ahora hablaremos de topics con una única partición.


Un mensaje de kafka tiene un timestamp y mensaje y opcionalmente puede llevar una cabecera con metadatos y una clave.
Un mensaje en kafka es inmutable, una vez escrito no se puede modificar.
Cuando un productor escribe un mensaje lo pone en la última posición del fichero, de la misma manera que cuando escribes en un log de una aplicación cada nuevo mensaje va al final del fichero.


Los nodos de un cluster de kafka se llaman brokers. En los brokers se almacenan los topics. Cuando un productor se le manda un mensaje a un broker, este coloca el nuevo mensaje siempre al final del topic.
Al igual que los mensajes de log de las aplicaciones, los productores añaden los mensajes al final de cada partición y no pueden ser modificados.



Los consumidores de Kafka leen los mensajes desde en el mismo sentido desde el más antiguo hasta al el último publicado.


Kafka no es una cola de mensajería
Antes de seguir me gustaría dejar claro una cosa, Kafka no es una cola de mensajería. Es algo bastante común, mucha gente cuando empieza a hablar o trabajar con kafka piensa que es una cola de mensajería, pero no lo es.

  • Una cola de mensajería siempre garantiza que los mensajes de entregan al consumidor en orden. Kafka no garantiza siempre el orden de los mensajes (veremos como funciona el orden en kafka más adelante)
  • Una cola de mensajería sirve un mensaje una única vez. Cuando un consumidor leer un mensaje, este se desencola, así que desaparece. En kafka, un mensaje puede ser leído varias veces. 
Escalabilidad, Particiones y Orden
Kafka permite dividir el topic en distintas particiones.
Cuando tenemos un topic dividido en particiones, el productor decide en qué partición poner el mensaje. Si los mensajes llevan clave, el productor hará un hash de la clave módulo del numero de particiones. por ejemplo para un topic de 3 particiones los restos de la división son 0, 1 o 2.
Mientras que si el mensaje no tiene clave, aplicará una estrategia de round robin de manera que el reparto de mensajes es uniforme entre particiones.

Kafka también permite agrupar los consumidores en consumergroups. Los consumidores del mismo grupo no comparten particiones de kafka. Esto significa, por ejemplo que puedes tener dos procesos en paralelo procesando mensajes de kafka de un mismo topic y te garantiza que no van a procesar el mismo mensaje.
En resumidas cuentas, las particiones te indican el numero máximo de consumidores por consumer group. Es decir si quieres escalar una aplicación que consume de kafka lo podrás hacer hasta que tengas el mismo numero de consumidores que particiones del topic.
En el siguiente gráfico podemos ver como el consumergroup App1 tiene tres consumidores leyendo de un topic. Ese topic tiene dos particiones, el consumidor 2 está asignado a la partición 0 y el consumidor 1 está asignado a la partición 1. Como el topic tiene solo dos particiones, el consumidor 3 no está consumiendo mensajes.


De aquí sacamos dos conclusiones:
  • El numero de particiones de un tópic indica el número máximo de consumidres dentro de un consumer group. Así que la escalabilidad de las aplicaciones consumidoras depende del numero de particiones de un tópic.
  • No se garantiza el orden. Únicamente kafka garantiza el orden de lectura dentro de una partición.

Recordad que el productor por defecto utiliza la clave del mensaje para decidir en qué partición está. Así que si necesitas orden puedes tenerlo en cierta medida. Por ejemplo si estás haciendo un sistema logístico y quieres saber tener todos los eventos un pedido en orden, deberías publicar todos los eventos en kafka usando como clave el id unico del pedido.
O si estas haciendo streaming de una tabla de base de datos deberías usar como clave la PK de la tabla para que todos los cambios del registro lleguen en orden, etc..

Vida de los mensajes, retención y compactación
En una cola de mensajería los mensajes desaparecen cuando son leídos. En kafka no. Una de las propiedades de un topic de kafka es la cleanup.policy. Esta propiedad tiene tres posibles valores:
  • delete: Es el valor por defecto, Pasado un periodo de tiempo definido en la propiedad retention.ms (no es exactamente así, pero no vamos a entrar en más detalles por ahora)
  • compact: Se garantiza que se mantendrá el último valor de los mensajes con una misma clave. el resto de mensajes se eliminarán. si el último mensaje es un tombstone (clave con mensaje nulo) el mensaje también se eliminará 
  • both: Ejecuta las dos estrategias, así que tienes una compactación por clave y además pasado cierto tiempo se descartan los mensajes.
Siguiendo con el símil del log de una aplicación, es muy posible que si guardabas el log en ficheros usases alguna estrategia de rolling, creando un dichero por día, mes o semana, o partiendolo por tamaño del fichero.
En las particiones de kafka se hace lo mismo. Las particiones se reparten en segmentos. El segmento activo es el último, donde se escriben los nuevos mensajes. dependiendo de la configruracion del topìc, pasado un tiempo o un tamaño determinado el segmento se cierra y se abre otro.

Es sobre los segmentos cerrados donde se aplican las operaciones de compact y delete. En concreto delete no borra mensajes, borra segmentos de la partición.


Aun faltan muchas cosas que contar para esta introducción a kafka, pero creo que el post está quedando muy largo así que una vez revisado partes básicas como mensajes, topics, particiones, consumidores y productores... es un buen momento para parar.

Próximo post... las Réplicas.