ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] - 15. EOS Transaction
    개발/Kafka 2022. 8. 10. 17:31

    1. Transaction Concept


    EOS를 처리하기 위해 하나의 Logic으로 처리되어 필요한 개념

    1-1) Transaction Coordinator

    • 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 Logic을 수행한다.

    1-2) Transaction log

    • 새로운 Internal Kafka Topic으로, Consumer offset topic과 유사하게, 모든 Transaction이 영구적이고, 복제된 Record를 저장하는 Transaction Coordinator Status 저장소

    1-3) TransactionalId

    • Producer를 고유하게 식별하기 위해 사용한다.
    • 동일한 TransactionalID를 가진 Producer의 다른 Instance들은 이전 Instance에 의해 만들어진 모든 Transaction을 재개 또는 중단 할 수 있다.

     

    2. Transaction Parameter


    2-1) Broker Config

    일반적으로 Default 값을 기본으로 사용한다.

    Parameter Description Default
    transactional.id.timeout.ms - Transaction Coordinator가 Producer TransactionalID로 부터 Transaction 상태 업데이트를 수신하지 않고 만료되기 전에 대기하는 시간(ms) 604800000(7days)
    max.transaction.timeout.ms - Transaction이 허용되는 최대 Timeout 시간
    - Client가 요청한 Transaction 시간이 이 시간을 초과하면, Broker는 InitPidRequest에서 InvalidTransactionTimeout Error Response
    - Producer가 TRansaction에 포함된 Topic에서 읽는 Consumer를 지연시킬 수 있는 너무 큰 시간 초과를 방지
    900000(15min)
    transaction.state.log.replcation.factor Transaction State Topic의 Replication Factor 3
    transaction.state.log.num.partitions Transaction State Topic의 Partition 개수 50
    transaction.state.log.min.isr Transaction State Topic의 min ISR 개수 2
    transaction.state.log.segment.bytes Transaction State Topic의 Segment 크기 104867600 bytes

    2-2) Producer Config

    Parameter Description Default
    enable.idempotence - 비활성화 경우 Transaction 사용 불가
    - 활성화(True) , Acks=all, retries=1
    - max.inflight.requests.per.connection=1(권장) 을 같이 사용해야 함
    False
    transaction.timeout.ms Transaction Coordinator가 진행 중인 Transaction을 사전에 중단하기 전에 Producer의 Transaction 상태 업데이트를 기다리는 최대 시간(ms) 
    Broker의 max.transaction.timeout.ms 작게 설정 값보다 크면 'InvalidTransaction Timeout' Error 발생
    60000(60 sec)
    transactional.id - Transaction에 전달되는 TrantactionalID
    - 이를 통해 Client는 새로운 Transaction을 시작하기 전에 동일한 TransactionalID를 사용하는 Transaction이 완료되었음을 보장.
    - 값이 Setting 되어야 Transaction이 동작한다.
    - 비어있다면, Idempotent Deilvery로 제한.
    - 반드시 enable.idempotence를 활성화 해야 한다.
    x

    2-3) Consumer Config

    • Consumer는 중복해서 데이터가 들어가는 것에 대해서는 보장이 안됨으로, 로직을 별도로 작성해서 처리해야 한다.
    Parameter Description Default
    isolation.level - read_uncommitted = offset 순서로 commit된 Message와 commit 되지 않은 message 둘 다 사용
    - read_committed = Non-transaction Message, Commit된 Transaction Message만 offset 순서로 사용
    read_uncommitted
    enable.auto.commit False : Consumer Offset에 대해 Auto commit off True

     

    3. Transaction Code

    E.g) KIP-98

    1. InitTransaction()

    2. Poll로 Topic Record 가져온다.

    3. Begintransaction()

    4. Record Bussiness Login 수행 후 Record를 Send

    5. Send Offsetstotransactions()

    6. EndTransaction

    #https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP98ExactlyOnceDeliveryandTransactionalMessaging-RejectedAlternatives
    
    public class KafkaTransactionsExample {
      
      public static void main(String args[]) {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerConfig);
     
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerConfig);
     
        #1.
        producer.initTransactions();
         
        while(true) {
          #2.
          ConsumerRecords<String, String> records = consumer.poll(CONSUMER_POLL_TIMEOUT);
          if (!records.isEmpty()) {
            #3.
            producer.beginTransaction();
             
            #4.
            List<ProducerRecord<String, String>> outputRecords = processRecords(records);
            for (ProducerRecord<String, String> outputRecord : outputRecords) {
              #4.
              producer.send(outputRecord);
            }
             
            #5.
            sendOffsetsResult = producer.sendOffsetsToTransaction(getUncommittedOffsets());
             
            #6.
            producer.endTransaction();
          }
        }
      }
    }

     

    1. InitTransaction

    • Producer가 Inittransaction() call -> FindCoordinator Request -> Broker -> Response Transaction Coordinator 

     

    2. Get ProducerID

    • Producer가 Transaction 하기 위해 Transaction Coordinator에게 Request InitPid(TransationalID) -> Response ProducerID(PID)
    • PID에 대한 Mapping은 2.a 단계인 Transaction log에 기록

    3. BeginTransaction()

    • Producer가 BeginTransaction() Call하여 새로운 Transaction의 시작을 알린다.
    • 첫번째 Record가 전송될 때 까지는 Transaction Coordinator 관점에서는 Transaction 되지 않았다.

     

    4.1 Insert Data

    • Producer는 새로운 Topic Partition이 처음 기록 될 때, 요청을 Transaction coordinator에게 보낸다.
    • TopicPartition을 Transaction에 추가하며느 Transaction Coordinator가 4.1a 단계와 같이 기록
    • 처음인 추가하는 경우에는 Transaction timer도 시작

     

    4.2 Produce Send()

    • Producer Send() = Producer의 Requests를 통해 User topic Partition에 Message를 Write

     

    4.3 SendOffsetstoTransaction()

    • Transaction Coordinator는 내부 __Consumer_offsets Topic에서 01 Consumer GRoup에 대해 Topic Parition을 추론
    • 4.3a 단계에서 Transaction log에 Topic Partition의 추가를 기록

     

    4.4 Txnoffsetstotransaction()

    • Txnoffsetstotransaction Request -> Consumer Group Coordinator -> Consumer_offset에 Message Write
    • 이 경우에는 Commit 전임으로, 외부에서 보이지 않는다.

     

    5.1 EndTxnrequest

    • Producer는 CommitTransaction() 혹은 AbortTransaction() 중 하나를 Call
    • Commit한다고 가정한다면, App Transaction Coordinator에 Prepare Request

     

    5.2 Write TxnmarkerRequest

    • Transaction Coordinator가 User Topic Commit, Consumer offset Commit

     

    5.3 Write Final Commit

    • 최종적으로 Transaction Coordinator가 Commit을 기록

    댓글

Designed by Tistory.