@@ -18,6 +18,17 @@ const { parseOrigin } = require('./core/util')
1818const kFactory = Symbol ( 'factory' )
1919
2020const kOptions = Symbol ( 'options' )
21+ const kGreatestCommonDivisor = Symbol ( 'kGreatestCommonDivisor' )
22+ const kCurrentWeight = Symbol ( 'kCurrentWeight' )
23+ const kIndex = Symbol ( 'kIndex' )
24+ const kWeight = Symbol ( 'kWeight' )
25+ const kMaxWeightPerServer = Symbol ( 'kMaxWeightPerServer' )
26+ const kErrorPenalty = Symbol ( 'kErrorPenalty' )
27+
28+ function getGreatestCommonDivisor ( a , b ) {
29+ if ( b === 0 ) return a
30+ return getGreatestCommonDivisor ( b , a % b )
31+ }
2132
2233function defaultFactory ( origin , opts ) {
2334 return new Pool ( origin , opts )
@@ -28,6 +39,11 @@ class BalancedPool extends PoolBase {
2839 super ( )
2940
3041 this [ kOptions ] = opts
42+ this [ kIndex ] = - 1
43+ this [ kCurrentWeight ] = 0
44+
45+ this [ kMaxWeightPerServer ] = this [ kOptions ] . maxWeightPerServer || 100
46+ this [ kErrorPenalty ] = this [ kOptions ] . errorPenalty || 15
3147
3248 if ( ! Array . isArray ( upstreams ) ) {
3349 upstreams = [ upstreams ]
@@ -42,6 +58,7 @@ class BalancedPool extends PoolBase {
4258 for ( const upstream of upstreams ) {
4359 this . addUpstream ( upstream )
4460 }
61+ this . _updateBalancedPoolStats ( )
4562 }
4663
4764 addUpstream ( upstream ) {
@@ -54,12 +71,40 @@ class BalancedPool extends PoolBase {
5471 ) ) ) {
5572 return this
5673 }
74+ const pool = this [ kFactory ] ( upstreamOrigin , Object . assign ( { } , this [ kOptions ] ) )
75+
76+ this [ kAddClient ] ( pool )
77+ pool . on ( 'connect' , ( ) => {
78+ pool [ kWeight ] = Math . min ( this [ kMaxWeightPerServer ] , pool [ kWeight ] + this [ kErrorPenalty ] )
79+ } )
80+
81+ pool . on ( 'connectionError' , ( ) => {
82+ pool [ kWeight ] = Math . max ( 1 , pool [ kWeight ] - this [ kErrorPenalty ] )
83+ this . _updateBalancedPoolStats ( )
84+ } )
85+
86+ pool . on ( 'disconnect' , ( ...args ) => {
87+ const err = args [ 2 ]
88+ if ( err && err . code === 'UND_ERR_SOCKET' ) {
89+ // decrease the weight of the pool.
90+ pool [ kWeight ] = Math . max ( 1 , pool [ kWeight ] - this [ kErrorPenalty ] )
91+ this . _updateBalancedPoolStats ( )
92+ }
93+ } )
94+
95+ for ( const client of this [ kClients ] ) {
96+ client [ kWeight ] = this [ kMaxWeightPerServer ]
97+ }
5798
58- this [ kAddClient ] ( this [ kFactory ] ( upstreamOrigin , Object . assign ( { } , this [ kOptions ] ) ) )
99+ this . _updateBalancedPoolStats ( )
59100
60101 return this
61102 }
62103
104+ _updateBalancedPoolStats ( ) {
105+ this [ kGreatestCommonDivisor ] = this [ kClients ] . map ( p => p [ kWeight ] ) . reduce ( getGreatestCommonDivisor , 0 )
106+ }
107+
63108 removeUpstream ( upstream ) {
64109 const upstreamOrigin = parseOrigin ( upstream ) . origin
65110
@@ -100,10 +145,42 @@ class BalancedPool extends PoolBase {
100145 return
101146 }
102147
103- this [ kClients ] . splice ( this [ kClients ] . indexOf ( dispatcher ) , 1 )
104- this [ kClients ] . push ( dispatcher )
148+ const allClientsBusy = this [ kClients ] . map ( pool => pool [ kNeedDrain ] ) . reduce ( ( a , b ) => a && b , true )
149+
150+ if ( allClientsBusy ) {
151+ return
152+ }
153+
154+ let counter = 0
155+
156+ let maxWeightIndex = this [ kClients ] . findIndex ( pool => ! pool [ kNeedDrain ] )
157+
158+ while ( counter ++ < this [ kClients ] . length ) {
159+ this [ kIndex ] = ( this [ kIndex ] + 1 ) % this [ kClients ] . length
160+ const pool = this [ kClients ] [ this [ kIndex ] ]
161+
162+ // find pool index with the largest weight
163+ if ( pool [ kWeight ] > this [ kClients ] [ maxWeightIndex ] [ kWeight ] && ! pool [ kNeedDrain ] ) {
164+ maxWeightIndex = this [ kIndex ]
165+ }
166+
167+ // decrease the current weight every `this[kClients].length`.
168+ if ( this [ kIndex ] === 0 ) {
169+ // Set the current weight to the next lower weight.
170+ this [ kCurrentWeight ] = this [ kCurrentWeight ] - this [ kGreatestCommonDivisor ]
171+
172+ if ( this [ kCurrentWeight ] <= 0 ) {
173+ this [ kCurrentWeight ] = this [ kMaxWeightPerServer ]
174+ }
175+ }
176+ if ( pool [ kWeight ] >= this [ kCurrentWeight ] && ( ! pool [ kNeedDrain ] ) ) {
177+ return pool
178+ }
179+ }
105180
106- return dispatcher
181+ this [ kCurrentWeight ] = this [ kClients ] [ maxWeightIndex ] [ kWeight ]
182+ this [ kIndex ] = maxWeightIndex
183+ return this [ kClients ] [ maxWeightIndex ]
107184 }
108185}
109186
0 commit comments