Elasticsearch: Bulk Insert using Java High Level Rest Client

Image by Hans Braxmeier from Pixabay

In our previous post, we saw how to insert data into Elasticsearch using the curl command. In this post, we will use Java High Level Rest Client to achieve the same.

Introduction

For any use case, data is essential. If we already have some data in Elasticsearch, we can use it. Otherwise, we will have to insert some data into it. In this post, we will see how to bulk insert data using Java High Level Rest Client. The details about instantiating and configuring the client can be found here.

Data Preparation

We will use the accounts data present here. The following represents a single document in the accounts index in Elasticsearch:

{"account_number":1,"balance":39225,"firstname":"Amber","lastname":"Duke","age":32,"gender":"M","address":"880 Holmes Lane","employer":"Pyrami","email":"amberduke@pyrami.com","city":"Brogan","state":"IL"}

Writing Data 

Let's create an Account class that represents a single document as shown above in the index. 

public class Account {     
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();   
@JsonProperty("account_number")
private long accountNumber;
@JsonProperty("balance")
private long balance;
@JsonProperty("firstname")
private String firstname;
@JsonProperty("lastname")
private String lastname;
@JsonProperty("age")
private int age;
@JsonProperty("gender")
private String gender;
@JsonProperty("address")
private String address;
@JsonProperty("employer")
private String employer;
@JsonProperty("email")
private String email;
@JsonProperty("city")
private String city;
@JsonProperty("state")
private String state;
//Getter and Setter removed for brevity

// This method is used to convert an instance of Account class to map. 
// This is done because IndexRequest accepts a map as a source.
public static Map<String, Object> getAsMap(final Account account) {
  return OBJECT_MAPPER.convertValue(account, new TypeReference<Map<String, Object>>() {
      });
  }  
}

We have one another class called AccountManager which reads the data from the file and writes into the Elasticsearch index:

/**
 * The type Account manager.
 */
public class AccountManager {

  private final RestHighLevelClient restHighLevelClient;

  /**
   *     Instantiates a new Account manager.
   *
   * @param restHighLevelClient the rest high level client
   */
  public AccountManager(final RestHighLevelClient restHighLevelClient) {
    this.restHighLevelClient = restHighLevelClient;
  }

  /**
   * Read data from the file and return as a List<Account>.
   *
   * @param fileName the file name
   * @return the list
   * @throws IOException the io exception
   */
  public List<Account> readAccounts(String fileName) throws IOException {
    final File file = new File(fileName);
    final Account[] accounts = new ObjectMapper().readValue(file, Account[].class);
    return Arrays.asList(accounts);
  }

  /**
   * Write accounts data into Elasticsearch
   *
   * @param accounts  the accounts
   * @param indexName the index name
   * @return the bulk response
   * @throws IOException the io exception
   */
  public BulkResponse writeAccounts(List<Account> accounts, String indexName) throws IOException {
    final var bulkRequest = new BulkRequest();
    accounts.forEach(account -> {
      final var indexRequest = new IndexRequest(indexName);
      indexRequest.source(Account.getAsMap(account));
      bulkRequest.add(indexRequest);
    });
    return restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  }
}

The important thing to notice here is how we are creating the BulkRequest (Line#39-44). For each Account in the list, we are creating a new IndexRequest with the given index name and then passing the current account data as a map to it, which is then added to the instance of bulk request.

Once the bulk request is prepared we are calling restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT) to insert data into Elasticsearch.

Testing 

The following is a test to write data into Elasticsearch:

  @Test
  public void writeAccountsValidAccountAndIndexNameShouldReturnBulkResponse() throws IOException {
    final URL resource = AccountManager.class.getClassLoader()
        .getResource("accounts.json");
    Assertions.assertThat(resource).isNotNull();
    final List<Account> accounts = accountManager.readAccounts(resource.getFile());
    final var bulkItemResponses = accountManager.writeAccounts(accounts, "accounts_test");
    Assertions.assertThat(bulkItemResponses.hasFailures()).isFalse();
    Assertions.assertThat(bulkItemResponses).isNotEmpty().hasSize(1000);
  }

References

  • https://pixabay.com/photos/dance-schools-mosquitoes-mass-1837658/?download
  • https://github.com/elastic/elasticsearch/blob/master/docs/src/test/resources/accounts.json
  • https://www.elastic.co/guide/en/elasticsearch/client/java-rest/master/java-rest-high.html

Comments