• 告別Disruptor(一) 簡潔優雅的高性能并發隊列

    幾年前聽說過Disruptor,一直沒用過也沒深究, 其號稱是一個性能爆表的并發隊列,上Github/LMAX-Exchange/disruptor 去看了看,官方性能描述文章 選了慢如蝸牛的ArrayBlockQueue來對比。在Nehalem 2.8Ghz – Windows 7 SP1 64-bit錄得性能見后(其中P,C分別代表 Producer和Consumer):

    1P – 1C 的吞吐量兩千五百萬次,1P – 3C Multicast 就降到了一千萬次不到,對比我所認為的非線程安全1P -1C隊列億次每秒的量級,感覺并不強大。億次每秒的隊列加上線程安全,毛估估1P-1C性能減半五千萬次每秒,1P-3C 再減少個30%三千五百萬次每秒,應該差不多了吧。

    繼續讀讀Disruptor的介紹,整體業務框架先不談,關于隊列的部分,我只想問可以說臟話不,太TMD的復雜了,實現方式一點都不優雅,講真,我想用100行代碼滅了它。

    說干就干,先嘗試一下。

    ArrayBlockingQueueDisruptor
    Unicast: 1P – 1C5,339,25625,998,336
    Pipeline: 1P – 3C2,128,91816,806,157
    Sequencer: 3P – 1C5,539,53113,403,268
    Multicast: 1P – 3C1,077,3849,377,871
    Diamond: 1P – 3C2,113,94116,143,613

    Comparative throughput (in ops per sec)

    第一步 簡單粗暴的阻塞隊列

    先簡單來個,這年頭用synchronized要被人瞧不起的(風聞現在性能好多了),大家言必談ReentrantLock,CAS,那么我也趕潮流CAS吧,主流程如下。

    步驟主要工作失敗流程
    1無論是put還是take,先對head/tail getAndIncrease 在Array中占位100%成功
    2檢查是否能夠放/取失敗則循環檢查,條件允許了再操作(不能返回失敗,第1步的占位無法回退)
    3執行放/取操作

    第2步的操作,如果失敗,并非完全不能回退,只是需要牽扯到一套復雜的邏輯,在這個簡單粗暴的實現中,先不考慮非阻塞方案。

    核心代碼如下:

    private Object[] array;
    private final int capacity;
    private final int m;
    
    private final AtomicLong tail;
    private final AtomicLong head;
    private final AtomicLong[] als = new AtomicLong[11];
    
    public RiskBlockingQueue(int preferCapacity) {
    	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
    	//這是個取比preferCapacity大的,最接近2的整數冪的函數(限制必須在 MIN MAX 之間)
    	array = new Object[this.capacity];
    	this.m = this.capacity - 1;		
    
    	for (int i = 0; i < als.length; i++) {     // 并不一定100%成功的偽共享padding
    		als[i] = new AtomicLong(0);
    	}
    	head = als[3];
    	tail = als[7];
    }
    
    public void put(T obj) {
    	ProgramError.when(obj == null, "Can't add null into this queue");
    	int p = (int) (head.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while(array[p] != null) {
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    	array[p] = obj;
    }
    
    public T take(){
    	Object r;
    	int p = (int) (tail.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while((r = array[p]) == null) {
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    	array[p] = null;
    	return (T)r;
    }

    代碼簡單的不要不要的,30來行代碼,一個線程安全的阻塞就基本完成。什么?你問構造函數為什么叫RiskBlockingQueue?,很簡單,有Risk,這并不是一個真正意義上的線程安全Queue,它有風險,那么風險在哪里呢?

    各位看官先自己想想風險在哪里,我先來測個1P-1C 性能數據 (以下數據都在關閉了CPU超線程的環境下測試獲得,超線程時數據經??瓷先ズ苊?

    1P – 1C

    Producer 0 has completed. Cost Per Put 19ns.
    Consumer 0 has completed. Cost Per Take 19ns.
    Total 201M I/O, cost 3844ms, 52374243/s(52M/s)

    ??,還真的和預測差不多,性能減半。

    接下來,揭曉這個隊列的風險,我們再來看看隊列中一個P線程(Producer 線程,Consumer稱為C線程,下同)put操作的工作流程

    步驟主要工作
    1getAndIncrease 占位
    2檢查是否能夠put
    3執行put操作

    假設某線程P0,執行完了第1步,在執行第2或3步時被叫去喝茶了

    這時P0線程本應填充位置array[x]但卻沒有填充(如果有對應的C線程,也take不到對象,被卡在array[x]上不斷pack)。

    但線程P1 – Pn在歡快的繼續執行,不斷的put,沿著array往前跑,漸漸的,一圈過去了,P1 線程也來到了array[x]的位置。

    這時,P0線程和P1線程對array[x]位置的訪問處于競爭狀態,array[x] 沒有任何鎖/同步/信號量/原子操作保護。這可能會造成對象丟失,并卡住一個C線程,并最終卡住整個隊列。C線程們也一樣,極端情況下,當一個C線程追上另一個C線程的時候,也會對數組的同一位置發生非線程安全的爭用。

    哪里有問題,就解決哪里的問題

    第二步 真正的并發阻塞Array隊列

    對于數組,Java沒有提供直接的CAS操作方式(除非自己調Unsafe),不但沒有CAS,連volatile都沒有。不過,Java中有一個內置的類,叫AtomicReferenceArray,簡而言之,這個類提供了對 T[] 數組的CAS及類似操作。繼續上代碼,這次還少了2行。

    private AtomicReferenceArray<T> array;  //原本是 Object[] array
    private final int capacity;
    private final int m;
    private final AtomicLong tail;
    private final AtomicLong head;
    
    public ConcurrentBlockingQueue(int preferCapacity) {
    	this.capacity = getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
    	array = new AtomicReferenceArray<T>(this.capacity);
    	this.m = this.capacity - 1;		
    
    	for (int i = 0; i < als.length; i++) {
    		als[i] = new AtomicLong(0);
    	}
    	head = als[3];
    	tail = als[7];
    }
    
    public void put(T obj) {
    	ProgramError.when(obj == null, "Queue object can't be null");
    	int p = (int) (head.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while(!array.compareAndSet(p, null, obj)) { //***
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    }
    
    public T take(){
    	T r;
    	int p = (int) (tail.getAndIncrement() & this.m);
    	int packTime = MIN_PACKTIME_NS;
    	while((r=array.getAndSet(p, null)) == null) {  //***
    		LockSupport.parkNanos(packTime);
    		if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    	}
    	return r;
    }

    這段代碼的核心修改是,每次讀寫array的位置p,都通過CAS和GAS的原子操作來完成,保證了array的線程安全,在高度爭用時,依然可以確保放一個取一個的交替。 接下來繼續測試性能。

    1P – 1C

    Consumer 0 has completed. Cost Per Take 29ns.
    Producer 0 has completed. Cost Per Add 29ns.
    Total 201M I/O, cost 5912ms, 34053889/s(34M/s)

    1P – 3C

    Consumer 0 has completed. Cost Per Take 143ns.
    Consumer 2 has completed. Cost Per Take 143ns.
    Producer 0 has completed. Cost Per Put 47ns.
    Consumer 1 has completed. Cost Per Take 143ns.
    Total 201M I/O, cost 9665ms, 20830480/s (20M/s)

    3P – 1C

    Producer 0 has completed. Cost Per Put 185ns.
    Producer 1 has completed. Cost Per Put 185ns.
    Producer 2 has completed. Cost Per Put 185ns.
    Consumer 0 has completed. Cost Per Take 62ns.
    Total 201M I/O, cost 12551ms, 16040681/s(16M/s)

    2P – 2C

    Producer 0 has completed. Cost Per Put 147ns.
    Producer 1 has completed. Cost Per Put 148ns.
    Consumer 0 has completed. Cost Per Take 148ns.
    Consumer 1 has completed. Cost Per Take 148ns.
    Total 201M I/O, cost 14986ms, 13434311/s(13M/s)

    感覺有點沮喪,可能是因為增加的CAS操作,性能進一步下降。

    考慮到我的測試環境里CPU比Disruptor當年的CPU快不少,實際可比性能,1P-1C估計只有Disruptor的1.1倍,1P – 3C的性能,只有其1.5倍???,但是快的及其有限,號稱要挑戰Disruptor,卻僅憑點數小勝,而非直接擊倒,不爽。

    具體的性能對比如下:

    Disruptor
    Nehalem 2.8Ghz
    ConcurrentBlockingQueue
    Skylake-X 3.3Ghz
    Unicast: 1P – 1C25,998,33634,053,889
    Pipeline: 1P – 3C16,806,157
    Sequencer: 3P – 1C13,403,268
    Multicast: 3P – 1C16,040,681
    Multicast: 1P – 3C9,377,87120,830,480
    Diamond: 1P – 3C16,143,613
    Multicast: 2P – 2C13,434,311

    問題出在哪里呢?仔細想了一下,可能是過多的變量共享訪問及數組的共享訪問造成了大量的 L1/2 緩存失效,描述如下。

    1P – 1C時,一旦head和tail靠近或套圈,那么對64字節的數組內存就直接形成了共享爭用。

    在指針壓縮時,Java的每個引用占用4字節,64字節一條的Cache line 一共有16個引用,兩個線程一個放一個取,不斷的 L1/2 緩存失效,鎖定,修改。

    1P – 3C的時候,還要再加上C與C之間的 tail 爭用和 L1/2 緩存爭用,然后,性能就下降到這么個結果了。

    還是那句話,哪里有問題,就解決哪里的問題

    但是怎么改呢,想了下:
    1P1C時,一旦隊列空了或是滿了,很容易陷在Producer/Consumer一個放一個取的過程中,而多P或多C時,多個P/C線程,排著隊一個個自增head/tail然后修改共享數組,P線程們爭完head之后,集中在共享數組的相鄰位置嘗試放置對象;C線程們爭用tail,再修改共享數組的相鄰位置,幾乎每次訪問都是無效 L1/2 緩存。
    也就是說,在密集爭用時,后一個線程正好爭得前一個線程取得的位置的后一個的位置(head/tail + 1),然后這兩個線程對位于同一個cache line上的相鄰的數組位置進行訪問,造成緩存失效,性能下降,這是流程上的硬傷,沒法改,只能推倒重來。

    3 非阻塞線程安全Array隊列

    上一個隊列,不但性能不盡人意,而且很難支持非阻塞操作,換個思路,重新設計一個支持非阻塞操作的隊列,并盡可能少共享訪問內存的情況。流程如下:

    步驟主要工作失敗流程
    1檢查可能能夠放/取的數組位置(讀取競爭變量head或tail)不會失敗
    2檢查是否能夠放/取(對應位置是否有對象,反映了隊列是否空/滿)回步驟1/或返回失敗
    3getAndIncrease競爭變量head或tail,競爭該位置的操作權(放/取)回步驟1/或返回失敗
    4執行放/取操作失敗就循環重試

    先以offer為例看看代碼:

    private AtomicReferenceArray<T> array;
    
    public boolean offer(T obj) {
    	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
    	long head = this.head.get(); //步驟1
    	int p =(int) (head & this.m);
    	if(array.get(p) != null) return false;   //步驟2
    	if(this.head.compareAndSet(head, head + 1)) {  //步驟3
    		int packTime = MIN_PACKTIME_NS;
    		while(!array.compareAndSet(p, null, obj)) { //步驟4
    			LockSupport.parkNanos(MIN_PACKTIME_NS);
    			if(packTime < MAX_PACKTIME_NS) packTime <<= 1;
    		};
    		return true;
    	}
    	return false;
    }

    這里我們要注意,在offer的第2步,就算if判斷時,array.get(p) == null 可以放置對象,但這并不說明,在第4步,位置p仍然為空,因為可能有上一圈甚至再上一圈,不知道為什么停在這里的P線程往array的p位置放置了對象(雖然概率很小)。此時要等到一個C線程將這個對象取出之后,該位置才能被繼續放置對象,如果沒有C線程來取,將一直等在這里。

    由于此時head已經被getAndIncrease,這又是一個已經占位,無法輕易回退的地方。我們也不可能將array.compareAndSet和head.getAndIncrease兩個原子操作,合并成一個原子操作。

    采用不可回退,反復循環嘗試的方式,代碼雖然工作正常,但存在block較長時間的可能,甚至,在極其極端的情況下(多隊列,多線程,復雜流水線且存在特定的處理循環),還有可能引起死鎖。

    難道又改成一個不支持非阻塞操作的隊列?心有不甘!最好有一個不怎么影響整體性能的方案來實現這里的回退。

    此類的并發沖突是個小概率事件,需要回退的比例很低,回退部分可以性能較差,但正常處理時的性能要盡量不受影響。

    高性能 = 減少真共享 + 消除偽共享 + 降低爭用 + …. 但是,怎么寫呢? 干到這里時正好是中午,下樓吃飯時邊吃邊想,然后,在開心的喝著牛肉湯時更開心的把這個問題想通了。 代碼如下:

    private final byte[] falseOffer;   //新增
    private final byte[] falsePoll;  //新增
    private final AtomicReferenceArray<T> array;
    final int capacity;
    final int m;
    final AtomicLong tail;  
    final AtomicLong head;
    
    public ConcurrentQueue(int preferCapacity) {
    	this.capacity = ComUtils.getPow2Value(preferCapacity, MIN_CAPACITY, MAX_CAPACITY);
    	array = new AtomicReferenceArray<T>(this.capacity);
    	falseAdd = new byte[this.capacity];
    	falsePoll = new byte[this.capacity];
    	this.m = this.capacity - 1;
    
    	for (int i = 0; i < als.length; i++) {
    		als[i] = new AtomicLong(0);
    	}
    	head = als[3];
    	tail = als[7];
    }
    
    public boolean offer(T obj) {
    	if(obj == null) throw new NullPointExceprion("Can't put null object into this queue");
    	while(true) {
    		long head = this.head.get();
    		int p =(int) (head & this.m);
    		if(falsePoll[p] > 0) {
    			synchronized(falsePoll) {  //運行比例很低,性能要求不高,直接同步處理
    				if(falsePoll[p] > 0) {  //如果不滿足條件,說明失效計數已被其他線程處理,break; 回到最初重新嘗試offer
    					if(this.head.compareAndSet(head, head + 1)){ //如果不滿足條件,說明位置P已經失效,回到最初重新嘗試offer
    						falsePoll[p] --; //跳過一次存在poll失效計數的位置p, poll失效計數 - 1,回到最初重新嘗試offer
    					}
    				}
    			}
    			break;
    		}
    		if(array.get(p) != null) return false;
    		if(this.head.compareAndSet(head, head + 1)) {
    			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
    				if(!array.compareAndSet(p, null, obj)) {
    					LockSupport.parkNanos(2 << i);
    				} else return true;
    			}
    			synchronized(falseOffer) {  //運行比例很低,性能要求不高,直接同步處理
    				falseOffer[p] ++;  //位置p的add失效計數器
    			}
    		}
    		return false;
    	}
    	return false;
    }
    
    
    public T poll(){
    	while(true) {
    		T r;
    		long tail = this.tail.get();
    		int p = (int) (tail & this.m);
    		if(falseOffer[p] > 0) {
    			synchronized(falseOffer) {
    				if(this.tail.compareAndSet(tail, tail + 1)) {
    					falseOffer[p]--;
    				}
    			}
    			break;
    		}
    		r = array.get(p);
    		if(r == null) return null;
    		if(this.tail.compareAndSet(tail, tail + 1)) {
    			for(int i = 0; i < INTERNAL_PACK_COUNT; i ++) {
    				if((r = array.getAndSet(p, null)) == null){
    					LockSupport.parkNanos(2 << i);
    				} else return r;
    			}
    			synchronized(falsePoll) {
    				falsePoll[p] ++;
    			}
    		}
    		return null;
    	}
    	return null;
    }

    新增了兩個和array等長的byte數組falseOffer[] 和 falsePoll[]作為失敗回退計數器,如果在前文提到的offer/ poll 過程中,發生了array的p位置的 CAS或GAS失敗,并且無法通過重試少量次數迅速成功,那么將失敗回退計數器 falseOffer[p] 或是 falsePoll[p] +1 。

    正常流程中,每次offer / poll 時,先讀取falsePoll[p] / falseOffer[p],如果poll 讀到的 falseOffer[p] 大于0,說明位置p發生過應該offer卻未能成功offer的回退,poll 操作應該忽略位置p一次,此時C線程同步鎖定,檢查,嘗試自增tail,將falseOffer[p] –,然后繼續嘗試下一個位置。這一連串的過程,是個小概率事件,簡單的同步鎖就好了,無需過多考慮性能。

    同時,因為回退是小概率事件,所以falseOffer[] 和 falsePoll[]數組很少被修改,所有的對這兩個失敗回退計數數組的讀取,大部分時間都處于 L1/2 緩存有效狀態,平均訪問耗時應該在1ns左右,性能影響很小。

    再看看這個解決方案的安全性,雖然失敗回退是個小概率事件,但數組byte會不會溢出?會不會同一個位置,累計超過127次失???測試下吧。

    測試結果如下:

    隊列長度線程數觀察到的byte計數最大值
    2256P – 256C9
    4512P-512C2
    81024P-1024C1

    在將隊列長度設置為最小值2,幾百個線程操作的時候,觀察到了byte數組中有最高9的計數,當隊列長度設置到8時,千余線程,經短時運行測試,沒有觀察到過大于1的計數。而在實際應用中,最小隊列長度會限制為1024或更大(高性能服務器弄個很小的隊列,沒啥意義),這個byte數組的溢出概率極極極極極極極小。如果想省點空間,這個byte數組應該還可以進一步優化,用4個bit來計數就夠了。

    高性能 = 減少真共享 + 消除偽共享 + 降低爭用 + ….

    還有降低爭用一招沒用,什么是關鍵競爭變量,head? tail? array? P線程競爭head,然后競爭array,C線程競爭tail 然后競爭array,當隊列長度居中時,array(連續16個引用)就比head/tail競爭更激烈,而當隊列滿/空時,array的爭用壓力還需要再相加一下,在大吞吐量,多線程競爭一個資源失敗時,如果大家都很激進的重復競爭,將導致這些爭用和共享資源反復處于緩存失效狀態,降低性能。因此,當某個offer/poll操作失敗時,失敗的線程需要等待的稍微久一點,再嘗試下一次,而不是簡單粗暴的packNano(1),當隊列一直空,或是滿的時候,相關線程更不應該反復循環,應該等久一點,然后重試。這部分就不專門貼代碼了,有興趣可以直接去github上拉源碼看。

    在這個過程中,還有一些其他回退檢測流程上的小坑也被自然填平了,不再多說。核心要點上面已經全部列出來了。

    老樣子,實踐是檢驗真理的唯一標準,繼續跑分:

    1P – 1C

    Producer 0 has completed. Cost Per Put 14ns.
    Consumer 0 has completed. Cost Per Take 14ns.
    Total 440M I/O, cost 6282ms, 70,105,367/s, 70.11M/s

    1P – 3C

    Consumer 2 has completed. Cost Per Take 19ns.
    Consumer 1 has completed. Cost Per Take 36ns.
    Producer 0 has completed. Cost Per Put 14ns.
    Consumer 0 has completed. Cost Per Take 42ns.
    Total 440M I/O, cost 6256ms, 70,396,726/s, 70.40M/s

    3P – 1C

    Producer 0 has completed. Cost Per Put 23ns.
    Producer 1 has completed. Cost Per Put 38ns.
    Producer 2 has completed. Cost Per Put 44ns.
    Consumer 0 has completed. Cost Per Take 14ns.
    Total 440M I/O, cost 6519ms, 67,556,668/s, 67.56M/s

    2P – 2C

    Producer 1 has completed. Cost Per Put 14ns.
    Consumer 1 has completed. Cost Per Take 25ns.
    Producer 0 has completed. Cost Per Put 28ns.
    Consumer 0 has completed. Cost Per Take 28ns.
    Total 440M I/O, cost 6315ms, 69,739,021/s, 69.74M/s

    這下心滿意足了,下個新版的Distruptor, 比較了一下。 在當前的Distruptor版本中,所有1P的測試,均使用的createSingleProducer創建的非線程安全的Producer,所以*部分,使用了一個非線程安全的隊列進行性能比較。其余的1P-nC 的隊列,暫無對應的比較對象,將在后續代碼/文章中逐步添加。

    Disruptor(Old Ver)
    Nehalem 2.8Ghz
    Disruptor(V3.3)
    Skylake-X 3.3Ghz
    ConcurrentQueue
    Skylake-X 3.3Ghz
    ConcurrentBlockingQueue
    (本文第2節的隊列)
    Skylake-X 3.3Ghz
    1P – 1C*134,952,766
    OneToOneSequencedThroughputTest
    *310,360,761
    SimpleBlockingQueue
    Thread-Safe 1P – 1C10,373,443
    RingBuffer.createMultiProducer
    70,105,36734,053,889
    Pipeline: 1P – 3C16,806,15722,128,789
    OneToThreePipelineSequencedThroughputTest
    Sequencer: 3P – 1C13,403,26811,344,299
    ThreeToOneSequencedThroughputTest
    67,556,66816,040,681
    Multicast: 1P – 3C9,377,871168,350,168
    OneToThreeSequencedThroughputTest
    Diamond: 1P – 3C16,143,61322,899,015
    OneToThreeDiamondSequencedThroughputTest
    2P – 2C4,273,504
    TwoToTwoWorkProcessorThroughputTest
    69,739,02113,434,311

    在與Disruptor的可比項之間的比較中,ConcurrentQueue線程安全隊列,取得了遠高于Disruptor的吞吐量,在多線程高并發爭用的條件下實現了超過六千萬次每秒的吞吐量。數倍于Disruptor

    這次算是K.O.了。

    這是終點嗎?并不是,整條服務器業務處理流水線的大部分地方,通常并不需要真正的線程安全隊列。而是更多的需要1P – 1C,或是n為確定數值的 nP – 1C這樣的隊列,后續將會參照1P – 1C的非線程安全隊列 SimpleBlockingQuere,繼續添加實現及介紹文章

    本文所涉及到的代碼及測試代碼,可在https://github.com/Lofint/tachyon 查看/下載

    原創文章,轉載請注明: 轉載自并發編程網 – www.okfdzs91.com本文鏈接地址: 告別Disruptor(一) 簡潔優雅的高性能并發隊列

    FavoriteLoading添加本文到我的收藏
    • Trackback 關閉
    • 評論 (10)
      • 海帶
      • 2019/05/21 10:42下午

      勘誤,第三部分頭幾段和表格中步驟3的 getAndIncrease 都應改為compareAndSet

      • whoyou223
      • 2019/06/13 4:30下午

      都是CAS,并沒有本質區別,只是disruptor功能強大

      • 海帶
      • 2019/06/13 7:44下午

      whoyou223 :
      都是CAS,并沒有本質區別,只是disruptor功能強大

      CAS 確實不是關鍵差異,也不是本文的重點

      1 關鍵差異一,少用一個共享變量,可參考 http://www.okfdzs91.com/disruptor-writing-ringbuffer/ 里面有一句話 “Disruptor 由消費者負責通知(生產者Barrier)它們處理到了哪個序列號” 這一個共享變量的值變更,在多線程環境下會緩存行失效,需要通過 L3 Cache 讀取,通常需要耗費50個時鐘周期/12ns或更多的時間。這是造成性能差異的重要原因之一。

      2 我們通常提及的Disruptor的隊列,是阻塞的。在流水線中,阻塞隊列在應對單消費者線程處理多類和/或多生產者任務時,不得不面對附加的多線程和/或并發隊列、任務類別判斷等影響性能的問題。這一點可能需要些例子才方便講清楚。我空了會繼續寫。

        • xc19891112
        • 2019/06/14 10:38上午

        的確 disruptor做的很強大 緩存填充避免false sharing、CAS、GC、以及事件驅動模型,disruptor 生產和消費都可做到鎖無關 ,你是怎么使用disrupto進行測試的代碼列出來

      • 海帶
      • 2019/06/14 10:53上午

      xc19891112 :
      的確 disruptor做的很強大 緩存填充避免false sharing、CAS、GC、以及事件驅動模型,disruptor 生產和消費都可做到鎖無關 ,你是怎么使用disrupto進行測試的代碼列出來

      我做的Disruptor的測試都用其自帶的測試代碼,文中最后一個表格的第三列列出了所有測試代碼的類名。CPU型號也已列出,其他環境: 32G DDR4 2133 / Ubuntu 1804 / JDK 1.8.171

      Disruptor自己文章中用的啥測試代碼,我簡單找了一下沒找到明確說法。

      • 流逝的風
      • 2019/06/14 3:16下午

      SimpleBlockingQueue 和 disruptor的OneToOneSequencedThroughputTest 測試一下 沒有達到disruptor的吞吐量 相差甚遠并且disruptor雖然鎖無關 但是有數據一致性 相比SimpleBlockingQueue無法保證

        • 海帶
        • 2019/06/14 3:47下午

        這是我的測試結果

        disruptor: OneToOneSequencedThroughputTest

        Starting Disruptor tests
        Run 0, Disruptor=134,408,602 ops/sec BatchPercent=97.80% AverageBatchSize=45
        Run 1, Disruptor=53,590,568 ops/sec BatchPercent=85.72% AverageBatchSize=7
        Run 2, Disruptor=160,771,704 ops/sec BatchPercent=98.39% AverageBatchSize=61
        Run 3, Disruptor=134,770,889 ops/sec BatchPercent=97.18% AverageBatchSize=35
        Run 4, Disruptor=143,678,160 ops/sec BatchPercent=97.69% AverageBatchSize=43
        Run 5, Disruptor=179,533,213 ops/sec BatchPercent=99.07% AverageBatchSize=107
        Run 6, Disruptor=142,857,142 ops/sec BatchPercent=97.56% AverageBatchSize=41

        SimpleBlockingQueue: 1P – 1C
        Producer 0 has completed. Cost Per Put 2ns.
        Consumer 0 has completed. Cost Per Take 2ns.
        Total 440M I/O, cost 1304ms, 337,731,533/s, 337.73M/s

        這是個非線程安全隊列(要NUM_CONSUMER = 1;否則會跑失敗),但在OneToOneSequencedThroughputTest中Disruptor用的也是 ringBuffer = createSingleProducer(… … …) 而不是 createMultiProducer

        后面不帶*的是線程安全比較,

        API方面,當然和Disruptor不是一個量級。但從服務端流水線處理的角度,我認為Disruptor的api有點重。

          • 流逝的風
          • 2019/06/14 3:53下午

          多說無益 研究透了再說滅他吧 談不上告別

            • 海帶
            • 2019/06/14 4:08下午

            不急,這才第一篇。還真要多說。

      • 流逝的風
      • 2019/06/14 3:24下午

      你的代碼 我也過了一下 實話打敗disruptor還遠 disruptor針對并發的細節都做了優化 api設計的也很精妙了

    您必須 登陸 后才能發表評論

    return top

    龙之彩彩票 u2s| mgw| 3gu| kcy| 3ey| mw3| owo| q1e| aqc| skw| 1ug| wm2| eue| o2e| gwq| 2au| aa2| emo| k2m| iqs| 0us| wg1| eu1| mmo| i1e| oye| 1iu| qy1| kuo| u1a| saa| u0s| aqm| 0yc| sq0| eu0| yyu| q0w| goq| 0gc| ms1| moa| o1i| kci| 9ms| aq9| umo| k9k| e9g| oge| 0qw| ge0| qgk| s0o| ksw| 8ic| iy8| ssy| y8k| ums| u9u| q9i| gwc| 9wc| ow9| gwc| c7e| ywk| 8gu| sw8| wmg| o8o| ywy| 8we| ksq| qi8| saw| s9q| yge| 7ye| ii7| mge| m7s| wug| 7sy| gg7| sky| omy| og8| egm|