Skip to content

Commit 1204a83

Browse files
authored
Add consumer group integration tests (#33)
* Add consumer group integration tests * Fix credo
1 parent 4eabcf1 commit 1204a83

File tree

4 files changed

+276
-1
lines changed

4 files changed

+276
-1
lines changed

lib/kayrock/broker_connection.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ defmodule Kayrock.BrokerConnection do
3131

3232
def send(conn, data), do: Connection.call(conn, {:send, data})
3333

34-
def recv(conn, timeout \\ 3000) do
34+
def recv(conn, timeout \\ 5000) do
3535
Connection.call(conn, {:recv, timeout})
3636
end
3737

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
defmodule Kayrock.Integration.ConsumerGroupTest do
2+
use Kayrock.IntegrationCase
3+
use ExUnit.Case, async: true
4+
5+
import Kayrock.TestSupport
6+
import Kayrock.RequestFactory
7+
8+
container(:kafka, KafkaContainer.new(), shared: true)
9+
10+
describe "Consumer Group API" do
11+
for api_version <- [0, 1, 2] do
12+
test "v#{api_version} - allows to manage consumer groups", %{kafka: kafka} do
13+
api_version = unquote(api_version)
14+
group_name = unique_string()
15+
16+
{:ok, client_pid} = build_client(kafka)
17+
topic_name = create_topic(client_pid, api_version)
18+
19+
# [WHEN] No consumer groups exist
20+
# [THEN] List consumer groups returns empty list
21+
list_request = list_consumer_groups_request(api_version)
22+
{:ok, list_response} = Kayrock.client_call(client_pid, list_request, :controller)
23+
24+
matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name))
25+
assert list_response.error_code == 0
26+
assert matching_groups == []
27+
28+
# [WHEN] We try to find a coordinator for a consumer group
29+
coordinator_request = find_coordinator_request(group_name, api_version)
30+
31+
{:ok, coordinator_response} =
32+
with_retry(fn ->
33+
Kayrock.client_call(client_pid, coordinator_request, 1)
34+
end)
35+
36+
# [THEN] We get a valid coordinator node
37+
assert coordinator_response.error_code == 0
38+
assert coordinator_response.coordinator.node_id > 0
39+
node_id = coordinator_response.coordinator.node_id
40+
41+
# [WHEN] We join a group
42+
member_data = %{group_id: group_name, topics: [topic_name]}
43+
join_request = join_group_request(member_data, api_version)
44+
45+
{:ok, join_response} =
46+
with_retry(fn ->
47+
Kayrock.client_call(client_pid, join_request, node_id)
48+
end)
49+
50+
assert join_response.error_code == 0
51+
assert join_response.members != []
52+
member_ids = Enum.map(join_response.members, & &1.member_id)
53+
assert Enum.member?(member_ids, join_response.member_id)
54+
55+
# [THEN] We can list the consumer group
56+
list_request = list_consumer_groups_request(api_version)
57+
{:ok, list_response} = Kayrock.client_call(client_pid, list_request, node_id)
58+
59+
matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name))
60+
assert list_response.error_code == 0
61+
assert matching_groups == [%{group_id: group_name, protocol_type: "consumer"}]
62+
63+
# [WHEN] We sync the group
64+
assignments = [
65+
%{
66+
member_id: join_response.member_id,
67+
topic: topic_name,
68+
partitions: [0, 1, 2]
69+
}
70+
]
71+
72+
sync_request =
73+
sync_group_request(group_name, join_response.member_id, assignments, api_version)
74+
75+
{:ok, sync_response} = Kayrock.client_call(client_pid, sync_request, node_id)
76+
assert sync_response.error_code == 0
77+
78+
# [THEN] We can describe the consumer group
79+
describe_request = describe_groups_request([group_name], api_version)
80+
{:ok, describe_response} = Kayrock.client_call(client_pid, describe_request, node_id)
81+
82+
[group_info] = describe_response.groups
83+
assert group_info.error_code == 0
84+
assert group_info.group_id == group_name
85+
assert group_info.protocol_type == "consumer"
86+
[member] = group_info.members
87+
assert member.member_id == join_response.member_id
88+
89+
# [WHEN] We leave the group
90+
leave_group_request =
91+
leave_group_request(group_name, join_response.member_id, api_version)
92+
93+
{:ok, leave_group_response} =
94+
Kayrock.client_call(client_pid, leave_group_request, node_id)
95+
96+
assert leave_group_response.error_code == 0
97+
98+
# [THEN] We can don't find member in the group
99+
describe_request = describe_groups_request([group_name], api_version)
100+
{:ok, describe_response} = Kayrock.client_call(client_pid, describe_request, node_id)
101+
102+
[group_info] = describe_response.groups
103+
assert group_info.error_code == 0
104+
assert group_info.group_id == group_name
105+
assert group_info.protocol_type == "consumer"
106+
assert group_info.members == []
107+
108+
# [WHEN] We delete consumer group
109+
delete_request = delete_groups_request([group_name], api_version)
110+
{:ok, delete_response} = Kayrock.client_call(client_pid, delete_request, node_id)
111+
112+
assert delete_response.group_error_codes == [%{group_id: group_name, error_code: 0}]
113+
114+
# [THEN] We can't find the group
115+
list_request = list_consumer_groups_request(api_version)
116+
{:ok, list_response} = Kayrock.client_call(client_pid, list_request, node_id)
117+
118+
matching_groups = Enum.filter(list_response.groups, &(&1.group_id == group_name))
119+
assert matching_groups == []
120+
end
121+
end
122+
end
123+
124+
defp build_client(kafka) do
125+
uris = [{"localhost", Container.mapped_port(kafka, 9092)}]
126+
Kayrock.Client.start_link(uris)
127+
end
128+
129+
defp create_topic(client_pid, api_version) do
130+
topic_name = unique_string()
131+
create_request = create_topic_request(topic_name, api_version)
132+
{:ok, _} = Kayrock.client_call(client_pid, create_request, :controller)
133+
topic_name
134+
end
135+
end

test/support/request_factory.ex

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,126 @@ defmodule Kayrock.RequestFactory do
8181

8282
%{Map.merge(request, request_date) | replica_id: -1}
8383
end
84+
85+
@doc """
86+
Creates a request to join a consumer group
87+
Uses min of api_version and max supported version
88+
"""
89+
def list_consumer_groups_request(api_version) do
90+
api_version = min(Kayrock.ListGroups.max_vsn(), api_version)
91+
Kayrock.ListGroups.get_request_struct(api_version)
92+
end
93+
94+
@doc """
95+
Create a request to find coordinator for a consumer group
96+
Uses min of api_version and max supported version
97+
"""
98+
def find_coordinator_request(group_id, api_version) do
99+
api_version = min(Kayrock.FindCoordinator.max_vsn(), api_version)
100+
request = Kayrock.FindCoordinator.get_request_struct(api_version)
101+
coordinator_key(request, api_version, group_id)
102+
end
103+
104+
defp coordinator_key(request, 0, group_id), do: %{request | group_id: group_id}
105+
106+
defp coordinator_key(request, _, group_id) do
107+
%{request | coordinator_key: group_id, coordinator_type: 0}
108+
end
109+
110+
@doc """
111+
Creates a request to join a consumer group
112+
Uses min of api_version and max supported version
113+
"""
114+
def join_group_request(member_data, api_version) do
115+
api_version = min(Kayrock.JoinGroup.max_vsn(), api_version)
116+
request = Kayrock.JoinGroup.get_request_struct(api_version)
117+
topics = Map.fetch!(member_data, :topics)
118+
119+
%{
120+
request
121+
| group_id: Map.fetch!(member_data, :group_id),
122+
session_timeout: Map.get(member_data, :session_timeout, 10_000),
123+
member_id: Map.get(member_data, :member_id, ""),
124+
protocol_type: "consumer",
125+
group_protocols: [
126+
%{
127+
protocol_metadata: %Kayrock.GroupProtocolMetadata{topics: topics},
128+
protocol_name: Map.get(member_data, :protocol_name, "assign")
129+
}
130+
]
131+
}
132+
|> add_rebalance_timeout(api_version, member_data)
133+
end
134+
135+
defp add_rebalance_timeout(request, 0, _), do: request
136+
137+
defp add_rebalance_timeout(request, _, member_data) do
138+
%{
139+
request
140+
| rebalance_timeout: Map.get(member_data, :rebalance_timeout, 30_000)
141+
}
142+
end
143+
144+
@doc """
145+
Creates a request to sync a consumer group
146+
Uses min of api_version and max supported version
147+
"""
148+
def sync_group_request(group_id, member_id, assignments, api_version) do
149+
api_version = min(Kayrock.SyncGroup.max_vsn(), api_version)
150+
request = Kayrock.SyncGroup.get_request_struct(api_version)
151+
152+
%{
153+
request
154+
| group_id: group_id,
155+
member_id: member_id,
156+
generation_id: 1,
157+
group_assignment: build_assignments(assignments)
158+
}
159+
end
160+
161+
defp build_assignments(assignments) do
162+
Enum.map(assignments, fn assignment ->
163+
%{
164+
member_id: Map.fetch!(assignment, :member_id),
165+
member_assignment: %Kayrock.MemberAssignment{
166+
partition_assignments: [
167+
%Kayrock.MemberAssignment.PartitionAssignment{
168+
topic: Map.fetch!(assignment, :topic),
169+
partitions: Map.fetch!(assignment, :partitions)
170+
}
171+
]
172+
}
173+
}
174+
end)
175+
end
176+
177+
@doc """
178+
Creates a request to describe a consumer groups
179+
Uses min of api_version and max supported version
180+
"""
181+
def describe_groups_request(group_ids, api_version) do
182+
api_version = min(Kayrock.DescribeGroups.max_vsn(), api_version)
183+
request = Kayrock.DescribeGroups.get_request_struct(api_version)
184+
%{request | group_ids: group_ids}
185+
end
186+
187+
@doc """
188+
Creates a request to leave a consumer group
189+
Uses min of api_version and max supported version
190+
"""
191+
def leave_group_request(group_id, member_id, api_version) do
192+
api_version = min(Kayrock.LeaveGroup.max_vsn(), api_version)
193+
request = Kayrock.LeaveGroup.get_request_struct(api_version)
194+
%{request | group_id: group_id, member_id: member_id}
195+
end
196+
197+
@doc """
198+
Creates a request to delete a group
199+
Uses min of api_version and max supported version
200+
"""
201+
def delete_groups_request(group_ids, api_version) do
202+
api_version = min(Kayrock.DeleteGroups.max_vsn(), api_version)
203+
request = Kayrock.DeleteGroups.get_request_struct(api_version)
204+
%{request | groups: group_ids}
205+
end
84206
end

test/support/test_support.ex

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,4 +65,22 @@ defmodule Kayrock.TestSupport do
6565
defp pad_list(l, n, pad_with) do
6666
l ++ List.duplicate(pad_with, n - length(l))
6767
end
68+
69+
@doc """
70+
Calls the given function up to 3 times, sleeping 1 second between each call.
71+
"""
72+
def with_retry(fun), do: do_with_retry(3, fun, nil)
73+
74+
defp do_with_retry(0, _fun, result), do: result
75+
76+
defp do_with_retry(n, fun, _result) do
77+
case fun.() do
78+
{:ok, response = %{error_code: 0}} ->
79+
{:ok, response}
80+
81+
result ->
82+
:timer.sleep(1000)
83+
do_with_retry(n - 1, fun, result)
84+
end
85+
end
6886
end

0 commit comments

Comments
 (0)