-
[Kafka] - 15. EOS Transaction개발/Kafka 2022. 8. 10. 17:31
1. Transaction Concept
EOS를 처리하기 위해 하나의 Logic으로 처리되어 필요한 개념
1-1) Transaction Coordinator
- 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 Logic을 수행한다.
1-2) Transaction log
- 새로운 Internal Kafka Topic으로, Consumer offset topic과 유사하게, 모든 Transaction이 영구적이고, 복제된 Record를 저장하는 Transaction Coordinator Status 저장소
1-3) TransactionalId
- Producer를 고유하게 식별하기 위해 사용한다.
- 동일한 TransactionalID를 가진 Producer의 다른 Instance들은 이전 Instance에 의해 만들어진 모든 Transaction을 재개 또는 중단 할 수 있다.
2. Transaction Parameter
2-1) Broker Config
일반적으로 Default 값을 기본으로 사용한다.
Parameter Description Default transactional.id.timeout.ms - Transaction Coordinator가 Producer TransactionalID로 부터 Transaction 상태 업데이트를 수신하지 않고 만료되기 전에 대기하는 시간(ms) 604800000(7days) max.transaction.timeout.ms - Transaction이 허용되는 최대 Timeout 시간
- Client가 요청한 Transaction 시간이 이 시간을 초과하면, Broker는 InitPidRequest에서 InvalidTransactionTimeout Error Response
- Producer가 TRansaction에 포함된 Topic에서 읽는 Consumer를 지연시킬 수 있는 너무 큰 시간 초과를 방지900000(15min) transaction.state.log.replcation.factor Transaction State Topic의 Replication Factor 3 transaction.state.log.num.partitions Transaction State Topic의 Partition 개수 50 transaction.state.log.min.isr Transaction State Topic의 min ISR 개수 2 transaction.state.log.segment.bytes Transaction State Topic의 Segment 크기 104867600 bytes 2-2) Producer Config
Parameter Description Default enable.idempotence - 비활성화 경우 Transaction 사용 불가
- 활성화(True) , Acks=all, retries=1
- max.inflight.requests.per.connection=1(권장) 을 같이 사용해야 함False transaction.timeout.ms Transaction Coordinator가 진행 중인 Transaction을 사전에 중단하기 전에 Producer의 Transaction 상태 업데이트를 기다리는 최대 시간(ms)
Broker의 max.transaction.timeout.ms 작게 설정 값보다 크면 'InvalidTransaction Timeout' Error 발생60000(60 sec) transactional.id - Transaction에 전달되는 TrantactionalID
- 이를 통해 Client는 새로운 Transaction을 시작하기 전에 동일한 TransactionalID를 사용하는 Transaction이 완료되었음을 보장.
- 값이 Setting 되어야 Transaction이 동작한다.
- 비어있다면, Idempotent Deilvery로 제한.
- 반드시 enable.idempotence를 활성화 해야 한다.x 2-3) Consumer Config
- Consumer는 중복해서 데이터가 들어가는 것에 대해서는 보장이 안됨으로, 로직을 별도로 작성해서 처리해야 한다.
Parameter Description Default isolation.level - read_uncommitted = offset 순서로 commit된 Message와 commit 되지 않은 message 둘 다 사용
- read_committed = Non-transaction Message, Commit된 Transaction Message만 offset 순서로 사용read_uncommitted enable.auto.commit False : Consumer Offset에 대해 Auto commit off True 3. Transaction Code
E.g) KIP-98
1. InitTransaction()
2. Poll로 Topic Record 가져온다.
3. Begintransaction()
4. Record Bussiness Login 수행 후 Record를 Send
5. Send Offsetstotransactions()
6. EndTransaction
#https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-RejectedAlternatives public class KafkaTransactionsExample { public static void main(String args[]) { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig); KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig); #1. producer.initTransactions(); while(true) { #2. ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT); if (!records.isEmpty()) { #3. producer.beginTransaction(); #4. List<ProducerRecord<String, String>> outputRecords = processRecords(records); for (ProducerRecord<String, String> outputRecord : outputRecords) { #4. producer.send(outputRecord); } #5. sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets()); #6. producer.endTransaction(); } } } }
1. InitTransaction
- Producer가 Inittransaction() call -> FindCoordinator Request -> Broker -> Response Transaction Coordinator
2. Get ProducerID
- Producer가 Transaction 하기 위해 Transaction Coordinator에게 Request InitPid(TransationalID) -> Response ProducerID(PID)
- PID에 대한 Mapping은 2.a 단계인 Transaction log에 기록
3. BeginTransaction()
- Producer가 BeginTransaction() Call하여 새로운 Transaction의 시작을 알린다.
- 첫번째 Record가 전송될 때 까지는 Transaction Coordinator 관점에서는 Transaction 되지 않았다.
4.1 Insert Data
- Producer는 새로운 Topic Partition이 처음 기록 될 때, 요청을 Transaction coordinator에게 보낸다.
- TopicPartition을 Transaction에 추가하며느 Transaction Coordinator가 4.1a 단계와 같이 기록
- 처음인 추가하는 경우에는 Transaction timer도 시작
4.2 Produce Send()
- Producer Send() = Producer의 Requests를 통해 User topic Partition에 Message를 Write
4.3 SendOffsetstoTransaction()
- Transaction Coordinator는 내부 __Consumer_offsets Topic에서 01 Consumer GRoup에 대해 Topic Parition을 추론
- 4.3a 단계에서 Transaction log에 Topic Partition의 추가를 기록
4.4 Txnoffsetstotransaction()
- Txnoffsetstotransaction Request -> Consumer Group Coordinator -> Consumer_offset에 Message Write
- 이 경우에는 Commit 전임으로, 외부에서 보이지 않는다.
5.1 EndTxnrequest
- Producer는 CommitTransaction() 혹은 AbortTransaction() 중 하나를 Call
- Commit한다고 가정한다면, App Transaction Coordinator에 Prepare Request
5.2 Write TxnmarkerRequest
- Transaction Coordinator가 User Topic Commit, Consumer offset Commit
5.3 Write Final Commit
- 최종적으로 Transaction Coordinator가 Commit을 기록
'개발 > Kafka' 카테고리의 다른 글
[Kafka] - 16. Kafka 설치 (0) 2022.08.18 [Kafka] - 17. Docker-compose Kafka 개발환경 구축 및 테스트 (0) 2022.08.11 [Kafka] - 14. EOS(Exactly Once Symantics) (0) 2022.08.10 [Kafka] - 13. Kafka Log File (0) 2022.08.10 [Kafka] - 12. Cooperative Sticky Assignor (0) 2022.08.09