Skip to content

Commit 1e62ca2

Browse files
author
iusaspb
committed
Init commit
0 parents  commit 1e62ca2

28 files changed

+1480
-0
lines changed

.gitigrone

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# Intellij
2+
.idea
3+
*.iws
4+
*.iml
5+
*.ipr
6+
7+
# maven
8+
/target
9+
/log

README.adoc

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
= CDC Synchronization of PostgreSQL and Elasticsearch with Spring Boot
2+
3+
*Motivation*
4+
5+
Some colleagues of mine use database and _Elasticsearch_(_ELK_) together where the database is the main data source, and _ELK_ is the search facade: the data is first stored in the database, and then the changes are written to the index.
6+
7+
Here is a typical example of such code that uses _Spring Data_:
8+
9+
@Override
10+
@Transactional
11+
public ProductDto update(ProductDto dto) {
12+
var jpaEntity = jpaRepository.save(dto2jpa(Objects.requireNonNull(dto)));
13+
var elkEntity = elkRepository.save(jpa2elk(jpaEntity));
14+
return elk2dto(elkEntity);
15+
}
16+
17+
Unfortunately, this approach does not guarantee the consistency of the database and the index, not only permanently, but also with a delay. Namely, there may be such a moment that the state of the database at that moment will not correspond to the state of the index at any moment. And vice versa. In particular, some changes in the database may never reach the index.
18+
19+
Since I've seen code similar to the above in several projects, I thought a little project showing how sync could be done would be helpful.
20+
21+
Thid is an example of such a _Java_ project using _Postgresql_ _CDC_(Change Data Capture), _Elasticsearch_, _String Boot_, _String Data_(_JPA_, _Elasticsearch_). The project does not use any external components such as _Kafka_, etc. Only the ones listed. There are ten classes in total.
22+
23+
Below are brief comments on the project that do not fit into the self-describing code paradigm.
24+
25+
*Brief digression*
26+
27+
We will not touch on what _CDC_(Change Data Capture) is in general and its implementation in PostreSQL in particular. Details can be found at the link
28+
29+
https://www.postgresql.org/docs/current/wal-intro.html[]
30+
31+
Let's just say that this technology allows you to work with _WAL_ (Write-Ahead Logging) , which the database maintains to maintain data integrity and replication. This log records data about changes in the database, grouped by transactions.
32+
33+
The main property of this log that can be used to synchronize the database and index is the following.
34+
Data in _WAL_ is stored exactly in the order in which it is written to the database. This allows you to write changes to the index exactly in the order in which they were written to the database, and not in the order in which control is transferred from _JPA_ component to _ELK_ one.
35+
36+
Second, _CDC_ implementation differs between database vendors. The project shown uses PostgreSQL.
37+
38+
39+
*Configuration*
40+
41+
42+
In order to see how the project works, you must first set up the database.
43+
44+
Open some client (_psql_, _DBeaver_, etc) and run the command
45+
46+
`SHOW wal_level;`
47+
48+
If the value is not "_logical_", then you need to execute the command
49+
50+
`ALTER SYSTEM SET wal_level = logical;`
51+
52+
Then you need to restart the db server or container with it ( at least for _PostgreSQL 14_).
53+
54+
After that, open the client, look at the available slots
55+
56+
`SELECT * FROM pg_replication_slots;`
57+
58+
and create a slot with a unique name
59+
60+
``SELECT * FROM pg_create_logical_replication_slot('elk_slot', 'test_decoding', false, true);
61+
``
62+
63+
The unique name of the slot (in the example '_elk_slot_') can be chosen by yourself.
64+
65+
Let's leave the plugin name ('_test_decoding_') as it is for now.
66+
67+
The rest of the settings can be left unchanged.
68+
69+
In order to return to the previous configuration, you must delete the created slot
70+
71+
`SELECT * FROM pg_drop_replication_slot ( 'elk_slot' );`
72+
73+
restore the previous value of _wal_level_
74+
75+
`ALTER SYSTEM RESET wal_level;`
76+
77+
and restart the server or the container again.
78+
79+
80+
*Project description*
81+
82+
83+
To make the project compact, but still fully functional, I limited myself to working with one entity - _Product_, which corresponds to the table of the same name in the database, the index of the same name and three Java classes: _ProductDto_, _JPA_ entity _ProductDB_ and _ELK_ document _ProductELK_.
84+
85+
Two repositories are used to work with the entity
86+
87+
`ProductJPARepository extends JpaRepository<ProductDB, Long>`
88+
89+
``ProductELKRepository extends ElasticsearchRepository<ProductELK,Long>
90+
``
91+
92+
The _ProductVanillaService_ service works with these repositories. This service does not care about database and index synchronization. The above code is taken from this service.
93+
94+
The controller that works with the service is _ProductController_, which has only _CRUD_ endpoints for brevity.
95+
96+
The methods of the _ProductVanillaService_ service, as you can see from the example, are made transactional so that you can roll back changes in the database if there are problems with saving changes to the index.
97+
98+
If the application is launched with the "_vanilla_" profile, then it will work without synchronization.
99+
100+
In order to enable synchronization, you need to run the application with the “_sync_” profile.
101+
102+
To keep the database and the index in sync _ProductCDCService_ service is used instead of _ProductVanillaService_. The sych version uses the same repositories as the vanilla one.
103+
104+
Here is an example of processing data changes from ProductCDCService, similar to the example given at the beginning of the article.
105+
106+
@Override
107+
@Transactional(propagation= Propagation.NEVER)
108+
public ProductDto update(ProductDto dto) {
109+
var id = Objects.requireNonNull(Objects.requireNonNull(dto).getId());
110+
jpaRepository.save(dto2jpa(dto));
111+
processNextCDCChunk();
112+
return elkRepository.findById(id).map(ProductMapper::elk2dto).orElse(null);
113+
}
114+
115+
Working with the database for the two services is identical. Except that the methods of the first service are transactional and therefore saving data to the index occurs in the same transaction as saving data to the database.
116+
117+
The methods of the _ProductCDCService_ service are not transactional.
118+
119+
Also, the _ProductCDCService_ methods, unlike _ProductVanillaService_, do not store data in the index.
120+
Instead, it calls _processNextCDCChunk()_ method, which starts scanning completed transactions from _WAL_.
121+
122+
This method starts its work outside a transaction. The transaction is opened in
123+
124+
`dbRepository.save(dto2db(dto))`
125+
126+
and closed on exit from it.
127+
128+
Therefore, by the time _processNextCDCChunk()_ is called, the data saved in the previous step is already in _WAL_ and available for scanning by this method. And, as a result, by the time _processNextCDCChunk()_ ends, this data will already be in the index.
129+
130+
With the next call to elkRepository.findById(id) we get this data from the index and return it to the controller.
131+
132+
Here the next question arises. Does the state of the returned object match the state the object was at call of _update(dto)_? For example, if user A changed the product name to "_prodA_", will the name stay that way in the output?
133+
The answer to this question is negative.
134+
The correct answer is the following.
135+
We return the state of the product at SOME point in time after the database transaction was committed.
136+
137+
Let's take an example.
138+
Suppose user A renamed the product to "prodA" and user B followed him to "_prodB_".
139+
If by the time _processNextCDCChunk()_ method of user A completed WAL scan, the transaction with user B's change has been committed, then the result of this commit will be included in the scan of user A, and user A's _elkRepository.findById(id)_ will return the product with the value of the field name "_prodB_".
140+
141+
There is nothing strange in this, as it is clear that the state of the database is saved in the index with some delay. The main thing is that all database changes reach the index.
142+
143+
In particular, after successfully saving changes to the database and to the index, we may not return anything to the user. This can happen if, following the changes that we process in client A's transaction, this object is deleted in client B's transaction. And if by the time user A scans _WAL_, the deletion is already committed, then user A's _processNextCDCChunk()_ will remove the object from the index. In this case, upon return from _processNextCDCChunk()_, this object will no longer be in the index. Unless, of course, some third user C re-createed the deleted product and this productgets into the index by the time _elkRepository.findById(id)_ of user A is called. In this case, we will return the product of user C to the user A, not the product of user A with with the name "_prodA_".
144+
145+
146+
147+
The _processNextCDCChunk()_ method is simple. It calls the _TestDecodingCDCService.processNextCDCChunk()_ method and handles the exception. It is important to note two points here.
148+
149+
The first is that the method is called asynchronously.
150+
151+
And the second is that the executor that executes the method uses one thread. This ensures that _WAL_ is processed sequentially, and therefore data changes are loaded into the index in exactly the same order as those changes were loaded into the database. Therefore, the state of the index at each moment will correspond to the state of the database at some moment in the past.
152+
153+
154+
_TestDecodingCDCService.processNextCDCChunk()_ is the central method. It takes all committed unprocessed transactions, parses and uploads changes from scanned transactions into the index.
155+
156+
Uploading changes to the index (_TransactionOperationProcessor.processOp()_) consists of the following steps:
157+
158+
1. Based on the table name (_TransactionOperation.tableName_) from the transaction operation, determine the _JPA_ entity class that is persisted in this table. If the class is not defined, then the operation is skipped.
159+
160+
2. Based on the _JPA_ class, find the _ELK_ service that processes the entities of this class. This service wraps _ProductELKRepository_. _ProductVanillaService_ uses this particular repository to upload _JPA_ entity into the index. If no such service is found, then the operation is skipped.
161+
162+
3. Using _JPA_, restore the _JPA_ entity by _TransactionOperation.restoreSQLStatement_. Since the operation is associated with a specific table (_TransactionOperation.tableName_), only that part of the entity that is persisted in this table is restored. _JPA_ properties annotated with _@OneToOne_, _@OneToMany_, etc. are not initialized.
163+
164+
4. Upload the restored _JPA_ entity into the index with the service found in step 2.
165+
166+
167+
If there are any problems during the processing of operations, you need to fix the problems and re-run _TestDecodingCDCService.processNextCDCChunk()_.
168+
This can be done because the _ProductELKRepository.save_ and _ProductELKRepository.delete_ are idempotent.
169+
170+
If all scanned transactions are successfully processed, then the corresponding records are removed from _WAL_.
171+
172+
I hope provided information is helpful.
173+
174+
Please contact us if you have any comments, suggestions or questions.
175+
176+
Stay in sync,
177+
178+
Sergey
179+
180+
181+
182+
183+
184+

pom.xml

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.rent.app</groupId>
8+
<artifactId>jpacdc</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<java.version>17</java.version>
13+
<maven.compiler.source>17</maven.compiler.source>
14+
<maven.compiler.target>17</maven.compiler.target>
15+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
16+
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
17+
<org.projectlombok.version>1.18.24</org.projectlombok.version>
18+
</properties>
19+
<dependencyManagement>
20+
<dependencies>
21+
<dependency>
22+
<groupId>org.springframework.boot</groupId>
23+
<artifactId>spring-boot-dependencies</artifactId>
24+
<version>2.7.0</version>
25+
<type>pom</type>
26+
<scope>import</scope>
27+
</dependency>
28+
</dependencies>
29+
</dependencyManagement>
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.springframework.boot</groupId>
33+
<artifactId>spring-boot-starter-web</artifactId>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.springframework.boot</groupId>
37+
<artifactId>spring-boot-starter-data-jpa</artifactId>
38+
</dependency>
39+
<dependency>
40+
<groupId>org.springframework.boot</groupId>
41+
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.postgresql</groupId>
45+
<artifactId>postgresql</artifactId>
46+
<scope>runtime</scope>
47+
</dependency>
48+
<dependency>
49+
<groupId>org.projectlombok</groupId>
50+
<artifactId>lombok</artifactId>
51+
<optional>true</optional>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.springframework.boot</groupId>
55+
<artifactId>spring-boot-starter-test</artifactId>
56+
<scope>test</scope>
57+
</dependency>
58+
</dependencies>
59+
<build>
60+
<plugins>
61+
<plugin>
62+
<groupId>org.springframework.boot</groupId>
63+
<artifactId>spring-boot-maven-plugin</artifactId>
64+
<configuration>
65+
<excludes>
66+
<exclude>
67+
<groupId>org.projectlombok</groupId>
68+
<artifactId>lombok</artifactId>
69+
</exclude>
70+
</excludes>
71+
</configuration>
72+
</plugin>
73+
<plugin>
74+
<groupId>org.apache.maven.plugins</groupId>
75+
<artifactId>maven-compiler-plugin</artifactId>
76+
<version>3.8.1</version>
77+
<configuration>
78+
<source>17</source>
79+
<target>17</target>
80+
<annotationProcessorPaths>
81+
<path>
82+
<groupId>org.projectlombok</groupId>
83+
<artifactId>lombok</artifactId>
84+
<version>${org.projectlombok.version}</version>
85+
</path>
86+
</annotationProcessorPaths>
87+
</configuration>
88+
</plugin>
89+
</plugins>
90+
</build>
91+
</project>
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.rent.app;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;
6+
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
7+
8+
@EnableJpaAuditing
9+
@EnableJpaRepositories
10+
@SpringBootApplication
11+
public class JPAELKApplication {
12+
public static void main(String[] args) {
13+
SpringApplication.run(JPAELKApplication.class, args);
14+
}
15+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.rent.app.config;
2+
3+
import org.springframework.context.annotation.Bean;
4+
import org.springframework.context.annotation.Configuration;
5+
import org.springframework.scheduling.annotation.EnableAsync;
6+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
7+
8+
import java.util.concurrent.Executor;
9+
10+
@EnableAsync
11+
@Configuration
12+
public class AsyncConfig {
13+
@Bean(name = "cdcServiceTaskThreadPoolTaskExecutor")
14+
public Executor cdcServiceTaskThreadPoolTaskExecutor() {
15+
var executor = new ThreadPoolTaskExecutor();
16+
executor.setCorePoolSize(1);
17+
executor.setMaxPoolSize(1); //Only one thread!!
18+
executor.setWaitForTasksToCompleteOnShutdown(true);
19+
executor.setThreadNamePrefix("cdcTaskManager-");
20+
return executor;
21+
}
22+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.rent.app.config;
2+
3+
import org.elasticsearch.client.RestHighLevelClient;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.data.elasticsearch.client.ClientConfiguration;
7+
import org.springframework.data.elasticsearch.client.RestClients;
8+
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
9+
import org.springframework.data.elasticsearch.config.EnableElasticsearchAuditing;
10+
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
11+
import org.springframework.http.HttpHeaders;
12+
13+
@Configuration
14+
@EnableElasticsearchAuditing
15+
@EnableElasticsearchRepositories
16+
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
17+
18+
@Bean
19+
@Override
20+
public RestHighLevelClient elasticsearchClient() {
21+
HttpHeaders compatibilityHeaders = new HttpHeaders();
22+
compatibilityHeaders.add("Accept", "application/vnd.elasticsearch+json;compatible-with=7");
23+
compatibilityHeaders.add("Content-Type", "application/vnd.elasticsearch+json;compatible-with=7");
24+
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
25+
.connectedToLocalhost()
26+
.withDefaultHeaders(compatibilityHeaders)
27+
.build();
28+
return RestClients.create(clientConfiguration).rest();
29+
}
30+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package org.rent.app.controller;
2+
3+
import org.rent.app.dto.ProductDto;
4+
import org.rent.app.service.EntityService;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.web.bind.annotation.DeleteMapping;
7+
import org.springframework.web.bind.annotation.PostMapping;
8+
import org.springframework.web.bind.annotation.PutMapping;
9+
import org.springframework.web.bind.annotation.RequestBody;
10+
import org.springframework.web.bind.annotation.RequestMapping;
11+
import org.springframework.web.bind.annotation.RestController;
12+
13+
@RestController
14+
@RequestMapping("product")
15+
public class ProductController {
16+
17+
@Autowired
18+
private EntityService<ProductDto> service;
19+
20+
@PostMapping()
21+
public ProductDto create(@RequestBody ProductDto product) {
22+
return service.create(product);
23+
}
24+
25+
@PutMapping
26+
public ProductDto update(@RequestBody ProductDto product) {
27+
return service.update(product);
28+
}
29+
30+
@DeleteMapping
31+
public void delete(@RequestBody ProductDto product) {
32+
service.delete(product);
33+
}
34+
35+
}

0 commit comments

Comments
 (0)