4
4
require 'fixtures/integration_worker'
5
5
6
6
require "rabbitmq/http/client"
7
+ require 'timeout'
7
8
8
9
9
10
describe "integration" do
@@ -17,23 +18,30 @@ def integration_log(msg)
17
18
puts msg if ENV [ 'INTEGRATION_LOG' ]
18
19
end
19
20
21
+ def rmq_addr
22
+ @rmq_addr ||= compose_or_localhost ( "rabbitmq" )
23
+ end
24
+
25
+ def admin
26
+ @admin ||=
27
+ begin
28
+ puts "RABBITMQ is at #{ rmq_addr } "
29
+ RabbitMQ ::HTTP ::Client . new ( "http://#{ rmq_addr } :15672/" , username : "guest" , password : "guest" )
30
+ rescue
31
+ fail "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n #{ $!} "
32
+ end
33
+ end
34
+
20
35
def prepare
21
36
# clean up all integration queues; admin interface must be installed
22
37
# in integration env
23
- rmq_addr = compose_or_localhost ( "rabbitmq" )
24
- puts "RABBITMQ is at #{ rmq_addr } "
25
- begin
26
- admin = RabbitMQ ::HTTP ::Client . new ( "http://#{ rmq_addr } :15672/" , username : "guest" , password : "guest" )
27
- qs = admin . list_queues
28
- qs . each do |q |
29
- name = q . name
30
- if name . start_with? 'integration_'
31
- admin . delete_queue ( '/' , name )
32
- integration_log "cleaning up #{ name } ."
33
- end
38
+ qs = admin . list_queues
39
+ qs . each do |q |
40
+ name = q . name
41
+ if name . start_with? 'integration_'
42
+ admin . delete_queue ( '/' , name )
43
+ integration_log "cleaning up #{ name } ."
34
44
end
35
- rescue
36
- puts "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n #{ $!} "
37
45
end
38
46
39
47
Sneakers . clear!
@@ -85,22 +93,27 @@ def start_worker(w)
85
93
pid
86
94
end
87
95
88
- def any_consumers
89
- rmq_addr = compose_or_localhost ( "rabbitmq" )
90
- result = false
91
- begin
92
- admin = RabbitMQ ::HTTP ::Client . new ( "http://#{ rmq_addr } :15672/" , username : "guest" , password : "guest" )
93
- qs = admin . list_queues
94
- qs . each do |q |
95
- if q . name . start_with? 'integration_'
96
- puts "We see #{ q . consumers } consumers on #{ q . name } "
97
- return true if q . consumers > 0
98
- end
96
+ def consumers_count
97
+ qs = admin . list_queues
98
+ qs . each do |q |
99
+ if q . name . start_with? 'integration_'
100
+ return [ q . consumers , q . name ]
99
101
end
100
- return false
101
- rescue
102
- puts "Rabbitmq admin seems to not exist? you better be running this on Travis or Docker. proceeding.\n #{ $!} "
103
102
end
103
+ return [ 0 , nil ]
104
+ end
105
+
106
+ def assert_any_consumers ( consumers_should_be_there , maximum_wait_time = 15 )
107
+ Timeout ::timeout ( maximum_wait_time ) do
108
+ loop do
109
+ consumers , queue = consumers_count
110
+ fail 'no queues so no consumers' if consumers_should_be_there && !queue
111
+ puts "We see #{ consumers } consumers on #{ queue } "
112
+ ( consumers_should_be_there == consumers . zero? ) ? sleep ( 1 ) : return
113
+ end
114
+ end
115
+ rescue Timeout ::Error
116
+ fail "Consumers should #{ 'not' unless consumers_should_be_there } be here but #{ consumers } consumers were after #{ maximum_wait_time } s waiting."
104
117
end
105
118
106
119
it 'should be possible to terminate when queue is full' do
@@ -116,11 +129,10 @@ def any_consumers
116
129
end
117
130
118
131
pid = start_worker ( IntegrationWorker )
119
- any_consumers . must_equal true
132
+ assert_any_consumers true
120
133
integration_log "Killing #{ pid } now!"
121
134
Process . kill ( "TERM" , pid )
122
- sleep ( 2 )
123
- any_consumers . must_equal false
135
+ assert_any_consumers false
124
136
end
125
137
126
138
it 'should pull down 100 jobs from a real queue' do
@@ -144,5 +156,3 @@ def any_consumers
144
156
145
157
end
146
158
end
147
-
148
-
0 commit comments