기존에 테스트 형식으로 데이터 1개를 가지고 구현할때는

bulk를 쓸 필요가 없었지만 실제 데이터를 다루게 되니

bulk의 필요성을 느끼게 되었다

 

데이터가 짧은 시간에 몇백개씩 들어오니

값을 받는데 딜레이 되어 5분이상 지체되는 모습을 보였다

 

이를 해결하고자 bulk를 사용했고

기존에 100개가 들어오면 100번의 호출이 일어나는 것을

100개의 데이터를 1번의 호출로 처리하는 방식이라고 보면 된다

 

1. 코드

// 벌크 요청 생성
HttpEntity entity = new NStringEntity(document, ContentType.APPLICATION_JSON);
Request bulkRequest = new Request("POST", "/_bulk");
bulkRequest.setEntity(entity);

// 벌크 요청 실행
Response bulkResponse = restClient.performRequest(bulkRequest);

// 응답 결과 확인
String responseBody = EntityUtils.toString(bulkResponse.getEntity());

// status 값 추출
JSONObject jsonObject = new JSONObject(responseBody);
String status = jsonObject.getJSONArray("items").getJSONObject(0).getJSONObject("index").getString("status");

// 2xx 대 상태 코드 구분
if (status.matches("2[0-9]{2}") == true) {
    log.info("Document insertion was successful.");
    log.info("Response content: " + responseBody);
} else {
    throw new Exception();
}

 

2. 주의 사항

엘라스틱서치 같은 경우 정해진 포맷이 있는데

이 포맷과 조금이라도 달라도 데이터가 들어가지 않는다

그래서 해당 포맷을 잘 유의해서 값을 변환해서 넣어줘야한다

 

/* POST _bulk의 JSON 포맷은 아래와 같은 포맷과 일치해야 함
 * { "index" : { "_index" : "index_name", "_id" : "1" } }
 * { "field1" : "value1","field1" : "value2" }
 *
 * 이 코드에서는 id를 임의로 할당하여 사용할 것이므로 아래와 같이 포맷을 변경함(_id 필드 삭제)
 * { "index" : { "_index" : "index_name" } }
 * { "field1" : "value1","field1" : "value2" }
 */
String document = "{ \"index\" : { \"_index\" : \"" + this.indexName + "\" } }\n"
    + "{ \"timestamp\" : " + Long.parseLong(logFieldData[LogFieldName.TIMESTAMP.ordinal()]) + ","
    + "\"machine_id\" : \"" + logFieldData[LogFieldName.MACHINE_ID.ordinal()] + "\","
    + "\"application_id\" : \"" + logFieldData[LogFieldName.APP_ID.ordinal()] + "\","
    + "\"tx_id\" : \"" + logFieldData[LogFieldName.TX_ID.ordinal()] + "\","
    + "\"api_type\" : \"" + logFieldData[LogFieldName.API_TYPE.ordinal()] + "\","
    + "\"elapsed_time\" : " + Long.parseLong(logFieldData[LogFieldName.ELASPSED_TIME.ordinal()]) + ","
    + "\"stack_trace\" :\"" + logFieldData[LogFieldName.STACK_TRACE.ordinal()] + "\","
    + "\"apm_log\" : \"" + updatedString+ "\""
    + " }\n";

 

3. 마무리

엘라스틱서치 를 구현하면서 예상치 못한 문제가 많이 있었다

공식문서에는 highlevel 로 자세한 내용은 나와있으나

실제 코드는 deprecate 되어있어 highlevel 을 사용하지 못하고

lowlevel을 사용해야하는데 관련해서 자료가 너무 없다보니

highlevel 을 참고해서 코드를 작성하느라 시간이 오래 걸렸고

 

데이터가 제대로 들어가지 않는 문제도 포맷의 문제란걸 알고

그에 맞춰서 string 값으로 한땀한땀 작성하다보니 힘이 들었다

 

그래도 이런 과정을 거치며 문제를 해결하는 요령이 늘어나는거 같아서

좋은 경험이 되는것 같다