Elasticsearch is an excellent search engine, but it has few complications. One of them is updating documents/records in the index and that’s because Elasticsearch does not support ACID transactions. Elasticsearch treats all its documents as immutable.
So when you update an existing document 2 things happen in one shot:
– A new document is created with incremented _version
– The old document deleted
But what if you work in the multithreaded environment and 2-3 threads simultaneously trying to update the same document? Well, in that situation only one thread will succeed and others will throw exceptions with no update performed. I hardly believe that’s acceptable behavior for most of the businesses. There are several workarounds to this issue, but here I will talk about optimistic lock. With optimistic lock you take advantage of the _version number to ensure that conflicting changes made by our application do not result in data loss. Below I created a small project to play with.
Spin up locally ELK stack using Docker:
sudo docker run -p 5601:5601 -p 9200:9200 -p 5044:5044 -it --name elk sebp/elk
After ELK stack is up & running we go to Kibana to create an index and insert document:
Kibana commands
DELETE colors #create index PUT colors { "mappings" : { "my_colors" : { "properties" : { "color" : { "type" : "text" } } } } } #insert document and assign custom _id POST colors/my_colors/123 { "color": "green" } #find all documents inside colors index GET colors/_search { "query": { "match_all": {} } }
Our index is very simple and consists of only one field called “color”.
build.gradle
plugins { id 'java' } group 'io.karengryg' version '1.0-SNAPSHOT' sourceCompatibility = 1.8 repositories { mavenCentral() } dependencies { implementation('org.elasticsearch:elasticsearch:6.4.3') implementation('org.elasticsearch.client:elasticsearch-rest-high-level-client:6.4.3') testImplementation ('junit:junit:4.12') }
MainClass.java
package io.karengryg.elasticsearchoptimisticlock; import org.apache.http.HttpHost; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import java.io.IOException; import java.util.HashMap; import java.util.Map; import static java.util.concurrent.TimeUnit.SECONDS; public class MainClass { private static final String HOSTNAME = "localhost"; private static final String INDEX = "colors"; private static final String TYPE = "my_colors"; private final static String ID = "123"; // creating elasticsearch client private static final RestHighLevelClient elasticSearchClient = new RestHighLevelClient(RestClient.builder(new HttpHost(HOSTNAME, 9200))); public static void main(String[] args) throws InterruptedException { //run Thread-1 and update document with color "yellow" new Thread(() -> { try { updateIndex("yellow"); } catch (IOException e) { e.printStackTrace(); } }).start(); //run Thread-2 and update document with color "red" new Thread(() -> { try { updateIndex("red"); } catch (IOException e) { e.printStackTrace(); } }).start(); SECONDS.sleep(1); } private static void updateIndex(String newColor) throws IOException { Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("color", newColor); UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index(INDEX); updateRequest.type(TYPE); updateRequest.id(ID); updateRequest.doc(jsonMap); UpdateResponse updateResponse = elasticSearchClient.update(updateRequest, RequestOptions.DEFAULT); System.out.println( Thread.currentThread().getName() + ": " + updateResponse.getResult() + "\n" + "version: " + updateResponse.getVersion() + "\n" + "color: " + newColor + "\n" ); } }
When you run main method your console output will approximately look like this(pls note it might not throw an exception from the first try as threads might do updates sequentially):
Thread-1: UPDATED version: 5 color: yellow Exception in thread "Thread-2" [colors/wNtFiLjkSESzxutCF912pg][[colors][0]] ElasticsearchStatusException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[my_colors][123]: version conflict, current version [5] is different than the one provided [4]]] at org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:177) at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1406) at org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1382) at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1269) at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1231) at org.elasticsearch.client.RestHighLevelClient.update(RestHighLevelClient.java:634) at io.karengryg.elasticsearchoptimisticlock.MainClass.updateIndex(MainClass.java:59) at io.karengryg.elasticsearchoptimisticlock.MainClass.lambda$main$1(MainClass.java:38) at java.lang.Thread.run(Thread.java:748) Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://localhost:9200], URI [/colors/my_colors/123/_update?timeout=1m], status line [HTTP/1.1 409 Conflict]
As I explained earlier that’s expected behavior. Before “Thread-2” tries to do update document it fetches _version of the document and then does update against this version, in our case, it’s [4], but _version was incremented by other thread, between fetch and update operation of “Thread-2”, and document with _version [4] doesn’t exist anymore. Which leads to the exception.
Here is how it looks graphically:
To fix this issue we need to add one line of code to updateIndex method:
... updateRequest.doc(jsonMap); updateRequest.retryOnConflict(3);//new line of code UpdateResponse updateResponse = elasticSearchClient.update(updateRequest, RequestOptions.DEFAULT); ...
The retryOnConflict parameter controls how many times to retry the update before finally throwing an exception. So if you update color back to “green” in Kibana and try to run main method again, it will work nicely.