Optimistic Concurrency with Elasticsearch and NEST

In my last post I gave an example of how to use the bulk api to index many documents at a time into Elasticsearch. Once all of the documents have been indexed, it is very likely that we'll want to update individual documents within the index; In this post, we'll look at how to do this and some of the features available to manage multiple concurrent update requests to a given document.

Updating complete documents

Using the suburb type defined in the last example, we can update a given document using the index api as follows

// The suburb type from the previous post
public class Suburb
{
    public IGeometry Geometry { get; set; }
 
    public int Id { get; set; }
 
    public string Name { get; set; }
 
    public AustralianState State { get; set; }
}
 
// id for the suburb to update. We may have been given a suburb instance 
var suburbId = 17231;
var response = await _client.GetAsync<Suburb>(desc => 
    desc.Preference("_primary")
	    .Id(suburbId)
    ).ConfigureAwait(false);

if (response.Found)
{
    var existingSuburb = response.Source;

    // apply our updates
    existingSuburb.Name = "New suburb name for Galong";

    // index the updated suburb
    var indexResponse = await _client.IndexAsync(existingSuburb, desc => 
	    desc.Id(existingSuburb.Id)).ConfigureAwait(false);

    if (!indexResponse.IsValid)
    {
        throw indexResponse.ConnectionStatus.OriginalException;
    }
}
else
{
    // Suburb was not found. 
	// Maybe we want to index a suburb, if we have been passed one
}

This will overwrite the existing document with the updated document. As per the indexing documentation, the _primary preference is specified when getting the existing document so that the operation will be executed on the primary shards which, under normal conditions, are guaranteed to have the latest version of the document.

Upon receiving many requests to update the same document, the last request to be processed wins and any previous states of the document will be overwritten. This following diagram illustrates what happens

no-optimistic-concurrency

Process 1 gets suburb document with id 17231, updates the name to Galongish and sends the updated document to be indexed. The updated document is indexed and the version number is incremented to 2. In between Process 1 getting document with id 17231 and updating it, Process 2 gets suburb document with id 17231, updates the name to Galonger and sends the updated document to be indexed. The updated document is indexed and the version number is incremented to 3, since the document has been updated by Process 1 in between Process 2 getting document with id 17231 and sending the index request.

Having a “Last Write Wins” scenario may be sufficient for many systems, but not all; for some systems, ensuring that changes are applied to the latest version of a document could be central to keeping the state of data in documents in synchronisation with another data source. Fortunately, Elasticsearch already provides features for managing this, we just need to use them!

Versioning

By default, A version number is given to a document when it is first indexed and the version number is incremented each time the document is updated (or deleted). If the documents being indexed come from another system that already implements a versioning strategy, then Elasticsearch can be configured to use this external versioning strategy instead of the default incrementing version number. For the purposes of this post though, the default versioning will be used.

Optimistic Concurrency for Updates

When versioning  is used in conjunction with index requests, Elasticsearch operates an Optimistic Concurrency Control mechanism; Elasticsearch will use the version number passed with the index request and compare it to the version number held against the document in the index; If the version numbers are the same, the index request will succeed and if they are different, a 409 Conflict HTTP Status code response will be returned indicating that the resource has changed in between the get and index request. The following diagram illustrates how optimistic concurrency works in practice

optimistic-concurrency

Process 1 gets suburb document with id 17231, updates the name to Galongish and sends the updated document to be indexed, along with version number 1, the version number that was returned from the original get request. The updated document is successfully indexed since the version number passed matches the version number in Elasticsearch and therefore the version number in Elasticsearch is now incremented to 2. In between Process 1 getting document with id 17231 and updating it, Process 2 gets suburb document with id 17231, updates the name to Galonger and sends the updated document to be indexed, along with version number 1, the version number that was returned from the original get request. This index request fails and a version conflict exception is returned to Process 2 indicating that the version number in Elasticsearch does not match the version number passed.

Taking the code example from above and using version numbers is straightforward enough

// id for the suburb. We may have been given a suburb instance 
var suburbId = 17231;
var response = await _client.GetAsync<Suburb>(desc => 
    desc.Preference("_primary").Id(suburbId)).ConfigureAwait(false);

if (response.Found)
{
    var existingSuburb = response.Source;

    // apply our updates
    existingSuburb.Name = "New suburb name for Galong";

    // index the updated suburb, passing the version number from the get response
    var indexResponse = await _client.IndexAsync(existingSuburb, desc => 
	    desc.Id(existingSuburb.Id)
		    .Version(long.Parse(response.Version))
	    ).ConfigureAwait(false);

    if (!indexResponse.IsValid)
    {
        throw indexResponse.ConnectionStatus.OriginalException;
    }
}
else
{
    // Suburb was not found. 
	// Maybe we want to index a suburb, if we have been passed one
}

If the IndexAsync() call results in a version conflict, NEST will return a response with an .IsValid property set to false and .ConnectionStatus.OriginalException set to an ElasticsearchServerException with a Status property value of 409. The code above simply throws the exception when the response is not valid, but we will look at how we might want to handle this shortly.

Optimistic Concurrency for Inserts

Optimistic Concurrency control can also be used when indexing documents for the first time, before a version number exists for a document. In this case, instead of passing a version number as part of the indexing request, we can specify an Operation Type of create, as follows

// index a new suburb
var indexResponse = await _client.IndexAsync(suburb, 
    desc => desc.Id(suburb.Id).OpType(OpType.Create)).ConfigureAwait(false);

As with updating a document before using a version number, If the IndexAsync()call results in a conflict (i.e. a document with the given id already exists), NEST will return a response with an .IsValid property set to false and .ConnectionStatus.OriginalException set to an ElasticsearchServerException with a Status property value of 409.

Conflict Resolution

What should we do when a version conflict happens? Well, this probably depends on how the process to update a document was initiated in the first place. If the process was initiated by the direct actions of a user, we might decide to indicate to the user that the document was updated whilst they were making changes to it, provide them the ability to view the updated document and let them decide if they would like to still edit it. If the process was initiated by a background process however, we might decide to log that the conflict happened and continue or, we might retry applying the update a given number of times. Let’s look at the latter.

Polly wanna retry

Polly is an excellent little library that allows developers to express transient exception handling policies such as Retry, Retry Forever, Wait and Retry or Circuit Breaker in a fluent manner. Functionally, it’s similar to the Exception Handling Application Block within Microsoft’s Enterprise Library suite except it is simpler to understand, configure and use.

For handling versioning conflicts that happen in a background process, we might want to retry applying the update a given number of times. With Polly, this is simple

// set up a policy to retry 3 times when a version conflict happens
var policy = Policy
    .Handle<ElasticsearchServerException>(exception => exception.Status == 409)
    .RetryAsync(3);

await policy.ExecuteAsync(async () =>
{
    // id for the suburb. We may have been given a suburb instance 
    var suburbId = 17231;
    var response = await _client.GetAsync<Suburb>(desc => 
	    desc.Preference("_primary")
		    .Id(suburbId)
		).ConfigureAwait(false);

    if (response.Found)
    {
        var existingSuburb = response.Source;

        // apply our updates
        existingSuburb.Name = "New suburb name for Galong";

        // index the updated suburb
        var indexResponse = await _client.IndexAsync(existingSuburb, desc => 
		    desc.Id(existingSuburb.Id)
			    .Version(long.Parse(response.Version))
			).ConfigureAwait(false);

        if (!indexResponse.IsValid)
        {
            throw indexResponse.ConnectionStatus.OriginalException;
        }
    }
}).ConfigureAwait(false);

If a versioning conflict happens when updating the suburb, the process will be retried a further three times; each time, the suburb will be requested from the primary shards to ensure that we’re working with the suburb as it now exists in the index, updates will be applied to the suburb and then an index request including the version number will be made to update the document.

Partial updates to documents

Applying partial updates to a document is an easy affair using the update api. The example in this post is a good candidate for using the update api to update only the name of the suburb

var updateResponse = await _client.UpdateAsync<Suburb, object>(desc => 
    desc.Id(suburbId)
        .Doc(new { Name = "New suburb name for Galong" })
        .RetryOnConflict(3)
    ).ConfigureAwait(false);

if (!updateResponse.IsValid)
{
    throw updateResponse.ConnectionStatus.OriginalException;
}

An instance of an anonymous type is used as the document containing the updates to apply to the Suburb document in the index; the Name property on the anonymous type matches the name and type of the Name property on the Suburb type, such that when serialized to json and sent in the update request, it partially matches the structure of a serialized Suburb instance.

An update in Elasticsearch in reality incorporates getting the existing document, creating a new document with the values passed in the request and copying over any existing values in the case of a partial update, indexing the new document and deleting the old version. The .RetryOnConflict(3) method call will retry the update three times in the case of a version conflict which could happen if the document is changed in between getting the old document and indexing the new one. The end result is similar to our implementation to update the whole document using an exception handling policy with the benefit of being a baked in feature.

In summary, implementing Optimistic Concurrency Control with Elasticsearch is straightforward to implement using document versioning and, when combined with exception handling policies and inbuilt retry features, provides a great deal of flexibility at your fingertips for dealing with conflicts when they do arise.

Comments

comments powered by Disqus