Este post muestra un ejemplo simple de uso de Spring Batch. Spring Batch es un componente más de la suite de Spring que en este caso nos permite implementar procesamientos batch de forma rápida y sencilla.
Llamaremos procesamiento batch al mecanismo que nos permite ejecutar un conjunto de operaciones similares pero de contenido diferente por lotes, es decir, como un todo, considerando el conjunto de operaciones como una única operación.
Para ello, Spring Batch hace uso de una metodología común a múltiples implementaciones de ejecuciones batch o por lotes. Esta metodología hace uso de los siguientes conceptos para definir y desarrollar los lotes a ejecutar:
- Job. Es el mecanismo principal de ejecución Sería la tarea encargada de ejecutar el conjunto de lotes programado. Una ventaja del job es que se puede programar para ejecutarse de forma recurrente, aunque este mecanismo no esta diseñado para sustituir a otros mecanismos o tecnologías de ejecución recurrente y programación como Quartz o similar, mucho más potentes y funcionales que el mecanismo por defecto de estos jobs. Un job estaría formado por steps o pasos de ejecución. Cada uno de esos pasos sería un lote. El job definiría el orden de ejecución de esos pasos o lotes.
- Step. Es la representación de un lote. Formaría parte de un job que se encargaria de ejecutarlo, bien en solitario o como parte de una ejecución múltiple de varios lotes diferentes. El step definiría una entrada, en forma de un Reader que leería los datos a procesar, un Processor, que procesaría los datos, es decir, implementaría la transformación del dato en sí, y un Writer que persistiría el procesamiento.
- Readers, processors y writers. Como se indica en el anterior punto, serían parte de cada paso y constituirían el flujo de datos de cada lote. El reader leería los datos uno a uno o por bloques, le pasaría los datos al processor que los transformaría, y estos irían despues al writer que, por bloques, persistiría los cambios. Todo el bloque procesado sería considerado una sola transacción.
- Otros elementos a tener en cuenta serían el lanzador de jobs, el respositorio donde se almacena la configuración y el estado de cada job, etc. pero en este ejemplo nos centraremos en los tres primero elementos antes mencionados.
En este ejemplo vamos a desarrollar una aplicación enterprise que al arrancar ejecuta automaticamente un job con un solo paso que lee de base de datos tres registros, los modifica, y guarda los cambios. Despues, desde una página web, podemos verificar que se hicieron los cambios, y también desde los ficheros de log.
1. Spring Boot y dependencias Maven
Partimos del ejemplo con Spring Data de este otro post. Añadimos la siguiente dependencia al pom.xml:
1 2 3 4 5 |
<!-- Add typical dependencies for a spring batch --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> |
Esta dependencia incluye toda la parafernalia necesaria para usar Spring Batch.
2. Configuración del batch
Configuraremos el batch usando una clase java. Creamos la siguiente clase:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
/** * @author lagarcia * */ @Configuration @EnableBatchProcessing public class BatchConfiguration { /** Entity manager factory */ @Autowired private EntityManagerFactory emFactory; /** * Superhero item reader. * * @return The superhero item */ @Bean public ItemReader<Superhero> reader() { JpaPagingItemReader<Superhero> ireader = new JpaPagingItemReader<>(); ireader.setEntityManagerFactory(this.getEmFactory()); ireader.setQueryString("select entity from Superhero entity order by entity.name desc"); ireader.setPageSize(5); return ireader; } /** * Superhero item processor. * * @return The superhero processor */ @Bean public ItemProcessor<Superhero, Superhero> processor() { return new SuperheroProcessor(); } /** * Superhero item writer. * * @param dataSource * - The datasource * @return The superhero item writer */ @Bean public ItemWriter<Superhero> writer(DataSource dataSource) { JpaItemWriter<Superhero> iwriter = new JpaItemWriter<>(); iwriter.setEntityManagerFactory(this.getEmFactory()); return iwriter; } /** * Spring batch job. * * @param jobs * @param s1 * @param listener * @return */ @Bean public Job superHeroJob(JobBuilderFactory jobs, Step s1, JobExecutionListener listener) { return jobs.get("superHeroJob").incrementer(new RunIdIncrementer()) .listener(listener).flow(s1).end().build(); } /** * Spring batch superhero job - Step 01 * * @param stepBuilderFactory * @param reader * @param writer * @param processor * @return */ @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Superhero> reader, ItemWriter<Superhero> writer, ItemProcessor<Superhero, Superhero> processor) { return stepBuilderFactory.get("step01") .<Superhero, Superhero> chunk(10).reader(reader) .processor(processor).writer(writer).build(); } /** * @return the emFactory */ public EntityManagerFactory getEmFactory() { return emFactory; } /** * @param emFactoryArg * the emFactory to set */ public void setEmFactory(EntityManagerFactory emFactoryArg) { emFactory = emFactoryArg; } } |
Esto es todo lo que se necesita para configurar el batch. Vamos paso a paso:
- Usamos @Configuration para indicar que es una clase de configuración. Después añadimos @EnableBatchProcessing para indicar que se configura Spring Batch.
- Enlazamos con el EntityManagerFactory que usaremos para leer y escribir de la base de datos embebida. Spring Boot ha configurado ya por defecto el datasource y lo ha enlazado con el EntityManagerFactory.
1 2 3 |
/** Entity manager factory */ @Autowired private EntityManagerFactory emFactory; |
- Definimos el job. En este caso el job tiene un solo paso (s1). Usamos el RunIdIncrementer() para asignarle un ID incremental a la ejecución del job, ya que Spring Batch almacena el estado de cada job en base de datos. También se le asocia un listener que estará escuchando a los eventos del job. Hablaremos de él más adelante.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
/** * Spring batch job. * * @param jobs * @param s1 * @param listener * @return */ @Bean public Job superHeroJob(JobBuilderFactory jobs, Step s1, JobExecutionListener listener) { return jobs.get("superHeroJob").incrementer(new RunIdIncrementer()) .listener(listener).flow(s1).end().build(); } |
- Definimos el lote, en este caso el paso que se ejecutará dentro del job. Especificamos que se escribirán los datos de diez en diez con el método chunk(). El lote o paso tendra un reader, un processor y un writer, como es habitual.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/** * Spring batch superhero job - Step 01 * * @param stepBuilderFactory * @param reader * @param writer * @param processor * @return */ @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Superhero> reader, ItemWriter<Superhero> writer, ItemProcessor<Superhero, Superhero> processor) { return stepBuilderFactory.get("step01") .<Superhero, Superhero> chunk(10).reader(reader) .processor(processor).writer(writer).build(); } |
- Definimos el reader. En este caso hacemos uso de un reader para JPA, ya que leemos de una base de datos y tenemos el modelo de datos definido con JPA. Usamos una query en JPQL y especificamos que se pagine lo leido de cinco en cinco elementos (setPageSize), que iran pasando al procesador.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
/** * Superhero item reader. * * @return The superhero item */ @Bean public ItemReader<Superhero> reader() { JpaPagingItemReader<Superhero> ireader = new JpaPagingItemReader<>(); ireader.setEntityManagerFactory(this.getEmFactory()); ireader.setQueryString("select entity from Superhero entity order by entity.name desc"); ireader.setPageSize(5); return ireader; } |
- Definimos el processor. Creamos una nueva clase (SuperheroProcessor.java) para ello. El processor recibe un objeto Superhero y devuelve también un Superhero (podría devolver otro tipo de dato). En este caso el procesador modifica un campo del POJO en función de su ID (name).
1 2 3 4 5 6 7 8 9 |
/** * Superhero item processor. * * @return The superhero processor */ @Bean public ItemProcessor<Superhero, Superhero> processor() { return new SuperheroProcessor(); } |
- Definimos el writer. Simplemente escribira de diez en diez en base de datos y hara commit sobre ese bloque de datos.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
/** * Superhero item writer. * * @param dataSource * - The datasource * @return The superhero item writer */ @Bean public ItemWriter<Superhero> writer(DataSource dataSource) { JpaItemWriter<Superhero> iwriter = new JpaItemWriter<>(); iwriter.setEntityManagerFactory(this.getEmFactory()); return iwriter; } |
3. Procesamiento de los datos
La clase que implementa el procesador seria tal que así:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
/** * @author lagarcia * */ public class SuperheroProcessor implements ItemProcessor<Superhero, Superhero> { /** A logger reference */ private static Logger logger = Logger.getLogger(SuperheroProcessor.class); /** * Process superhero. */ @Override public Superhero process(Superhero superheroArg) throws Exception { if (superheroArg != null) { logger.info("Procesando a " + superheroArg.getName()); switch (superheroArg.getName()) { case "Superman": { superheroArg.setWeakness("Kryptonite"); break; } case "Flash": { superheroArg.setWeakness("Speed of light"); break; } case "Dr. Magneto": { superheroArg.setWeakness("Plastic"); break; } } return superheroArg; } return null; } } |
Como se ha mencionado antes, recibe un objeto de tipo Superhero, lo modifica, y devuelve el mismo objeto.
4. Listener del job
Se define un listener para el job que escucha los eventos del mismo. En este caso se implementa el que afecta a la finalización del job para que muestre un mensaje en el log:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
/** * @author lagarcia * */ @Component public class BatchListener extends JobExecutionListenerSupport { /** A logger reference */ private static Logger logger = Logger.getLogger(BatchListener.class); /** * After job execution. */ @Override public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { logger.info("¡¡Job " + jobExecution.getJobId() + " finalizado!!"); } } } |
5. Ejecución
Al lanzar la aplicación podemos ver en los logs que el job se ejecutó correctamente (y tb al acceder a los datos desde la página web).
Se puede descargar el proyecto completo desde aquí. Y también en Github.