지난번 포스팅에서 포멧에대해 알아봤다면

이번에는 pgoutput 플러그인을 사용하기 위한 설정 방법과

데이터를 파싱하는 방법에 대해서 다루고자 한다

 

- 이전 포스팅 -

https://vuddus526.tistory.com/508

 

[DataBase] postgresql 논리적 복제 plugin(pgoutput) 포멧 조사

postgresql 에서 논리적 복제를 통해 CDC 하는 과정에서 여러가지 plugin이 존재한다 그 중 pgoutput으로 변화된 데이터를 읽고 파싱하기 위해 데이터의 포멧을 알아보았다 기존에 test_decoding plugin을 사

vuddus526.tistory.com

 

1. 설정 방법

pgoutput플러그인을 사용하는 가장 큰 이유는

특정 테이블만 논리적 복제를 하기 위함이다

이를 위해 publication이라는것을 만들어 주어야한다

 

1) publication 생성

CREATE PUBLICATION publication이름 FOR table 지정할 테이블, 지정할 테이블, ...;

 

위와 같이 publication을 생성하여 특정 테이블만 보게할 수 있다

 

2) test_decoding 에서 사용한 아래 옵션은 사용 불가능 (에러발생)

트랜잭션 ID 정보를 포함할지 여부

.withSlotOption("include-xids", false)

 

빈 트랜잭션을 무시할지 여부

.withSlotOption("skip-empty-xacts", true)

 

rewrite된 쿼리를 복제할지 여부

.withSlotOption("include-rewrites", true)

 

쿼리 실행 시간 정보 포함 여부

.withSlotOption("include-timestamp", "on")

 

3) 버전 설정

버전을 설정

.withSlotOption("proto_version", 1)

 

4) publication이름 지정

publication으로 지정한 테이블만 가져온다

.withSlotOption("publication_names", this.publicationName)

 

2. 파싱 방법

pgoutput 플러그인을 사용해서 테이블의 변화가 생기면

byte[] 로 바이트 형태의 값을 받게 된다

예시

{82, 0, 0, 121, -36, 112, 117, 98, 108, 105, 99, 0, 114, 97, 119, 95, 109, 111, 110, 95, 109, 108, 115, 110, 0, 105, 0, 12, 1, 99, 108, 99, 116, 95, 100, 116, 0, 0, 0, 4, -96, -1, -1, -1, -1, 0, 114}

 

 

이를 16진수로 변환하고 2자리씩 잘라서 10진수 또는 문자값으로 바꿔 처리를 해준다

예시

73746E675F76616C0000000413000007D400737474735F636400000004130000001800667273745F7265675F647400000004A0FFFFFFFF00667273745F72677

 

★ 위 두 예시는 아무 값을 넣은거라 일치 하지 않음 ★

 

 

이전 포스팅에서 포멧을 다루었는데
이를 토대로 파싱을 진행해 주었다

 

먼저 처음에 테이블에 변화가 일어나면

R 값이 들어오고 R 에서는 oid와 컬럼명을 알 수 있다

 

그뒤에 insert, update, delete 에 해당하는

I, U, D 값이 들어오게 되는데

I, U, D 에서는 oid와 컬럼값들을 알 수 있다

 

한번 R 값이 들어오고나면 그뒤에 실행되는

DML 문들은 R 없이 I, U, D 값만 받게된다

 

그래서 처음 들어온 R을 oid 값을 기준으로

객체로 가지고있다가 뒤에 들어오는 값의 oid와 비교하여

컬럼명과 컬럼값을 매칭해주어 반환하는 형태이다

 

3. 파싱 코드

실제 코드를 작성하기엔 보안상 문제가 생길 수 있어

샘플로 가상의 변수와 메서드로 나타내 일부 로직만 표현 하였다

 

먼저 글로 설명하자면

입력되는 byte[] 배열의 전체 사이즈를 구해서

시작지점, 읽은사이즈, 전체사이즈 이렇게 3개를 이용해서

byte[] 를 원하는 크기와 포멧에 맞춰 잘라내는 것이다

 

입력 값 : byte[] data

ArrayList<Object> columnValueList = new ArrayList<Object>();

// 접두사 (B, R, I, U, D, C)
char charPrefix = 접두사추출메서드(data);

// 테이블 객체 식별값
String oid = getOid(data);

// ByteReader 객체 생성
ByteReader byteReader = new ByteReader(data);

if (charPrefix == 'I') {

    // 1byte 자르기
    byteReader.읽기메서드(1);

    // 2byte 자르기
    byteReader.읽기메서드(2);

    // 아래부터는 반복이 된다 hasRemains 이 true 인동안 계속 돌기

    while (byteReader.hasRemains()) {
        Object stringColumnValue = "";

        // 1byte 자르기
        byte[] byteTupleType = byteReader.읽기메서드(1);
        // 16진수를 문자열로 변환
        String stringTupleType = CommonUtil.bytesToStr(byteTupleType);
        // 16진수 문자열을 정수로 변환
        int intTupleType = Integer.parseInt(stringTupleType, 16);
        // 정수를 문자로 변환
        char charTupleType = (char) intTupleType;
        // null 값 식별
        if (charTupleType == 'n' ) {
            stringColumnValue = null;
            columnValueList.add(stringColumnValue);
            continue;
        }

        // 3byte 자르기
        byteReader.읽기메서드(3);

        // strLen 튜플 텍스트 길이 (10진수로)
        byte[] byteStrLen = byteReader.읽기메서드(1);
        int intStrLen = byteStrLen[0] & 0xFF;

        // strLen 만큼 뽑기
        if(intStrLen > 0) {
            byte[] byteColumnValue = byteReader.읽기메서드(intStrLen);
            stringColumnValue = new String(byteColumnValue);
        }
        columnValueList.add(stringColumnValue);
    }
}