Kafka科普系列 | Kafka中的事務是什么樣子的?


事務,對於大家來說可能並不陌生,比如數據庫事務、分布式事務,那么Kafka中的事務是什么樣子的呢?

在說Kafka的事務之前,先要說一下Kafka中冪等的實現。冪等和事務是Kafka 0.11.0.0版本引入的兩個特性,以此來實現EOS(exactly once semantics,精確一次處理語義)。

冪等,簡單地說就是對接口的多次調用所產生的結果和調用一次是一致的。生產者在進行重試的時候有可能會重復寫入消息,而使用Kafka的冪等性功能之后就可以避免這種情況。

開啟冪等性功能的方式很簡單,只需要顯式地將生產者客戶端參數enable.idempotence設置為true即可(這個參數的默認值為false)。

Kafka是如何具體實現冪等的呢?Kafka為此引入了producer id(以下簡稱PID)和序列號(sequence number)這兩個概念。每個新的生產者實例在初始化的時候都會被分配一個PID,這個PID對用戶而言是完全透明的。

對於每個PID,消息發送到的每一個分區都有對應的序列號,這些序列號從0開始單調遞增。生產者每發送一條消息就會將對應的序列號的值加1。

broker端會在內存中為每一對維護一個序列號。對於收到的每一條消息,只有當它的序列號的值(SN_new)比broker端中維護的對應的序列號的值(SN_old)大1(即SN_new = SN_old + 1)時,broker才會接收它。

如果SN_new< SN_old + 1,那么說明消息被重復寫入,broker可以直接將其丟棄。如果SN_new> SN_old + 1,那么說明中間有數據尚未寫入,出現了亂序,暗示可能有消息丟失,這個異常是一個嚴重的異常。

引入序列號來實現冪等也只是針對每一對而言的,也就是說,Kafka的冪等只能保證單個生產者會話(session)中單分區的冪等。冪等性不能跨多個分區運作,而事務可以彌補這個缺陷。

事務可以保證對多個分區寫入操作的原子性。操作的原子性是指多個操作要么全部成功,要么全部失敗,不存在部分成功、部分失敗的可能。

為了使用事務,應用程序必須提供唯一的transactionalId,這個transactionalId通過客戶端參數transactional.id來顯式設置。事務要求生產者開啟冪等特性,因此通過將transactional.id參數設置為非空從而開啟事務特性的同時需要將enable.idempotence設置為true(如果未顯式設置,則KafkaProducer默認會將它的值設置為true),如果用戶顯式地將enable.idempotence設置為false,則會報出ConfigException的異常。

transactionalId與PID一一對應,兩者之間所不同的是transactionalId由用戶顯式設置,而PID是由Kafka內部分配的。

另外,為了保證新的生產者啟動后具有相同transactionalId的舊生產者能夠立即失效,每個生產者通過transactionalId獲取PID的同時,還會獲取一個單調遞增的producer epoch。如果使用同一個transactionalId開啟兩個生產者,那么前一個開啟的生產者會報錯。

從生產者的角度分析,通過事務,Kafka可以保證跨生產者會話的消息冪等發送,以及跨生產者會話的事務恢復。

前者表示具有相同transactionalId的新生產者實例被創建且工作的時候,舊的且擁有相同transactionalId的生產者實例將不再工作。

后者指當某個生產者實例宕機后,新的生產者實例可以保證任何未完成的舊事務要么被提交(Commit),要么被中止(Abort),如此可以使新的生產者實例從一個正常的狀態開始工作。

KafkaProducer提供了5個與事務相關的方法,詳細如下:

void initTransactions();
void beginTransaction() throws ProducerFencedException;
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId)
        throws ProducerFencedException;
void commitTransaction() throws ProducerFencedException;
void abortTransaction() throws ProducerFencedException;

 

initTransactions()方法用來初始化事務;beginTransaction()方法用來開啟事務;sendOffsetsToTransaction()方法為消費者提供在事務內的位移提交的操作;commitTransaction()方法用來提交事務;abortTransaction()方法用來中止事務,類似於事務回滾。

在消費端有一個參數isolation.level,與事務有着莫大的關聯,這個參數的默認值為“read_uncommitted”,意思是說消費端應用可以看到(消費到)未提交的事務,當然對於已提交的事務也是可見的。

這個參數還可以設置為“read_committed”,表示消費端應用不可以看到尚未提交的事務內的消息。

舉個例子,如果生產者開啟事務並向某個分區值發送3條消息msg1、msg2和msg3,在執行commitTransaction()或abortTransaction()方法前,設置為“read_committed”的消費端應用是消費不到這些消息的,不過在KafkaConsumer內部會緩存這些消息,直到生產者執行commitTransaction()方法之后它才能將這些消息推送給消費端應用。反之,如果生產者執行了abortTransaction()方法,那么KafkaConsumer會將這些緩存的消息丟棄而不推送給消費端應用。

 

日志文件中除了普通的消息,還有一種消息專門用來標志一個事務的結束,它就是控制消息(ControlBatch)。控制消息一共有兩種類型:COMMIT和ABORT,分別用來表征事務已經成功提交或已經被成功中止。

RecordBatch中attributes字段的第6位用來標識當前消息是否是控制消息。如果是控制消息,那么這一位會置為1,否則會置為0,如上圖所示。

attributes字段中的第5位用來標識當前消息是否處於事務中,如果是事務中的消息,那么這一位置為1,否則置為0。由於控制消息也處於事務中,所以attributes字段的第5位和第6位都被置為1。


KafkaConsumer可以通過這個控制消息來判斷對應的事務是被提交了還是被中止了,然后結合參數isolation.level配置的隔離級別來決定是否將相應的消息返回給消費端應用,如上圖所示。注意ControlBatch對消費端應用不可見。

本文內容轉載來自朱小廝的博客

 


注意!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系我们删除。



 
粤ICP备14056181号  © 2014-2020 ITdaan.com