지난번 포스팅에서 포멧에대해 알아봤다면
이번에는 pgoutput 플러그인을 사용하기 위한 설정 방법과
데이터를 파싱하는 방법에 대해서 다루고자 한다
- 이전 포스팅 -
https://vuddus526.tistory.com/508
[DataBase] postgresql 논리적 복제 plugin(pgoutput) 포멧 조사
postgresql 에서 논리적 복제를 통해 CDC 하는 과정에서 여러가지 plugin이 존재한다 그 중 pgoutput으로 변화된 데이터를 읽고 파싱하기 위해 데이터의 포멧을 알아보았다 기존에 test_decoding plugin을 사
vuddus526.tistory.com
1. 설정 방법
pgoutput플러그인을 사용하는 가장 큰 이유는
특정 테이블만 논리적 복제를 하기 위함이다
이를 위해 publication이라는것을 만들어 주어야한다
1) publication 생성
CREATE PUBLICATION publication이름 FOR table 지정할 테이블, 지정할 테이블, ...;
위와 같이 publication을 생성하여 특정 테이블만 보게할 수 있다
2) test_decoding 에서 사용한 아래 옵션은 사용 불가능 (에러발생)
트랜잭션 ID 정보를 포함할지 여부
.withSlotOption("include-xids", false)
빈 트랜잭션을 무시할지 여부
.withSlotOption("skip-empty-xacts", true)
rewrite된 쿼리를 복제할지 여부
.withSlotOption("include-rewrites", true)
쿼리 실행 시간 정보 포함 여부
.withSlotOption("include-timestamp", "on")
3) 버전 설정
버전을 설정
.withSlotOption("proto_version", 1)
4) publication이름 지정
publication으로 지정한 테이블만 가져온다
.withSlotOption("publication_names", this.publicationName)
2. 파싱 방법
pgoutput 플러그인을 사용해서 테이블의 변화가 생기면
byte[] 로 바이트 형태의 값을 받게 된다
예시
{82, 0, 0, 121, -36, 112, 117, 98, 108, 105, 99, 0, 114, 97, 119, 95, 109, 111, 110, 95, 109, 108, 115, 110, 0, 105, 0, 12, 1, 99, 108, 99, 116, 95, 100, 116, 0, 0, 0, 4, -96, -1, -1, -1, -1, 0, 114}
이를 16진수로 변환하고 2자리씩 잘라서 10진수 또는 문자값으로 바꿔 처리를 해준다
예시
73746E675F76616C0000000413000007D400737474735F636400000004130000001800667273745F7265675F647400000004A0FFFFFFFF00667273745F72677
★ 위 두 예시는 아무 값을 넣은거라 일치 하지 않음 ★
이전 포스팅에서 포멧을 다루었는데
이를 토대로 파싱을 진행해 주었다
먼저 처음에 테이블에 변화가 일어나면
R 값이 들어오고 R 에서는 oid와 컬럼명을 알 수 있다
그뒤에 insert, update, delete 에 해당하는
I, U, D 값이 들어오게 되는데
I, U, D 에서는 oid와 컬럼값들을 알 수 있다
한번 R 값이 들어오고나면 그뒤에 실행되는
DML 문들은 R 없이 I, U, D 값만 받게된다
그래서 처음 들어온 R을 oid 값을 기준으로
객체로 가지고있다가 뒤에 들어오는 값의 oid와 비교하여
컬럼명과 컬럼값을 매칭해주어 반환하는 형태이다
3. 파싱 코드
실제 코드를 작성하기엔 보안상 문제가 생길 수 있어
샘플로 가상의 변수와 메서드로 나타내 일부 로직만 표현 하였다
먼저 글로 설명하자면
입력되는 byte[] 배열의 전체 사이즈를 구해서
시작지점, 읽은사이즈, 전체사이즈 이렇게 3개를 이용해서
byte[] 를 원하는 크기와 포멧에 맞춰 잘라내는 것이다
입력 값 : byte[] data
ArrayList<Object> columnValueList = new ArrayList<Object>();
// 접두사 (B, R, I, U, D, C)
char charPrefix = 접두사추출메서드(data);
// 테이블 객체 식별값
String oid = getOid(data);
// ByteReader 객체 생성
ByteReader byteReader = new ByteReader(data);
if (charPrefix == 'I') {
// 1byte 자르기
byteReader.읽기메서드(1);
// 2byte 자르기
byteReader.읽기메서드(2);
// 아래부터는 반복이 된다 hasRemains 이 true 인동안 계속 돌기
while (byteReader.hasRemains()) {
Object stringColumnValue = "";
// 1byte 자르기
byte[] byteTupleType = byteReader.읽기메서드(1);
// 16진수를 문자열로 변환
String stringTupleType = CommonUtil.bytesToStr(byteTupleType);
// 16진수 문자열을 정수로 변환
int intTupleType = Integer.parseInt(stringTupleType, 16);
// 정수를 문자로 변환
char charTupleType = (char) intTupleType;
// null 값 식별
if (charTupleType == 'n' ) {
stringColumnValue = null;
columnValueList.add(stringColumnValue);
continue;
}
// 3byte 자르기
byteReader.읽기메서드(3);
// strLen 튜플 텍스트 길이 (10진수로)
byte[] byteStrLen = byteReader.읽기메서드(1);
int intStrLen = byteStrLen[0] & 0xFF;
// strLen 만큼 뽑기
if(intStrLen > 0) {
byte[] byteColumnValue = byteReader.읽기메서드(intStrLen);
stringColumnValue = new String(byteColumnValue);
}
columnValueList.add(stringColumnValue);
}
}'DataBase' 카테고리의 다른 글
| [DataBase] postgresql 논리적 복제 plugin(pgoutput) 포멧 조사 (1) | 2024.04.18 |
|---|---|
| [DataBase] WITH절 알아보기 (0) | 2023.09.26 |
| [DataBase] 식별 관계와 비식별 관계 (0) | 2023.09.18 |
| [DataBase] TimescaleDB 특징, 설치 방법 및 테스트 (0) | 2023.08.04 |
| [DataBase] JPA @Transactional 없이 수동으로 처리 해보기 (0) | 2023.07.26 |