Skip to content

Commit cb14c3c

Browse files
committed
topotests: new test for multicast route maps
Implement tests for the new multicast route-map filtering. Signed-off-by: Rafael Zalamena <[email protected]>
1 parent 3282094 commit cb14c3c

File tree

2 files changed

+131
-0
lines changed

2 files changed

+131
-0
lines changed
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
log commands
2+
!
3+
interface r1-eth0
4+
ip address 192.168.100.1/24
5+
ip igmp
6+
exit
7+
!
8+
router pim
9+
exit
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
#!/usr/bin/env python
2+
# SPDX-License-Identifier: ISC
3+
4+
#
5+
# test_multicast_gmp_route_map_topo1.py
6+
# Part of NetDEF Topology Tests
7+
#
8+
# Copyright (c) 2025 by
9+
# Network Device Education Foundation, Inc. ("NetDEF")
10+
#
11+
12+
"""
13+
test_multicast_gmp_route_map_topo1.py: Test the FRR PIM multicast route map.
14+
"""
15+
16+
import os
17+
import sys
18+
import json
19+
from functools import partial
20+
import re
21+
import pytest
22+
23+
# Save the Current Working Directory to find configuration files.
24+
CWD = os.path.dirname(os.path.realpath(__file__))
25+
sys.path.append(os.path.join(CWD, "../"))
26+
27+
# pylint: disable=C0413
28+
# Import topogen and topotest helpers
29+
from lib import topotest
30+
31+
# Required to instantiate the topology builder class.
32+
from lib.topogen import Topogen, TopoRouter, get_topogen
33+
from lib.topolog import logger
34+
35+
from lib.pim import McastTesterHelper
36+
37+
pytestmark = [pytest.mark.bgpd, pytest.mark.pimd]
38+
39+
app_helper = McastTesterHelper()
40+
41+
42+
def build_topo(tgen):
43+
"""
44+
+----+ +----+
45+
| h1 | <-> | r1 |
46+
+----+ +----+
47+
^
48+
+----+ |
49+
| h2 | <-----+
50+
+----+
51+
"""
52+
53+
tgen.add_router(f"r1")
54+
tgen.add_host("h1", "192.168.100.100/24", "via 192.168.100.1")
55+
tgen.add_host("h2", "192.168.101.100/24", "via 192.168.101.1")
56+
57+
switch = tgen.add_switch("s1")
58+
switch.add_link(tgen.gears["r1"])
59+
switch.add_link(tgen.gears["h1"])
60+
61+
switch = tgen.add_switch("s2")
62+
switch.add_link(tgen.gears["r1"])
63+
switch.add_link(tgen.gears["h2"])
64+
65+
66+
def setup_module(mod):
67+
"Sets up the pytest environment"
68+
tgen = Topogen(build_topo, mod.__name__)
69+
tgen.start_topology()
70+
71+
tgen.gears["r1"].load_frr_config(os.path.join(CWD, "r1/frr.conf"))
72+
tgen.start_router()
73+
74+
app_helper.init(tgen)
75+
76+
77+
def teardown_module():
78+
"Teardown the pytest environment"
79+
tgen = get_topogen()
80+
app_helper.cleanup()
81+
tgen.stop_topology()
82+
83+
84+
def test_igmp_route_map():
85+
"Test IGMP route map filtering"
86+
87+
tgen = get_topogen()
88+
if tgen.routers_have_failure():
89+
pytest.skip(tgen.errors)
90+
91+
def expect_loopback_route(router, iptype, route, proto):
92+
"Wait until route is present on RIB for protocol."
93+
logger.info("waiting route {} in {}".format(route, router))
94+
test_func = partial(
95+
topotest.router_json_cmp,
96+
tgen.gears[router],
97+
"show {} route json".format(iptype),
98+
{route: [{"protocol": proto}]},
99+
)
100+
_, result = topotest.run_and_expect(test_func, None, count=130, wait=1)
101+
assertmsg = '"{}" convergence failure'.format(router)
102+
assert result is None, assertmsg
103+
104+
# Wait for R1
105+
expect_loopback_route("r1", "ip", "10.254.254.2/32", "bgp")
106+
107+
# Wait for R2
108+
expect_loopback_route("r2", "ip", "10.254.254.1/32", "bgp")
109+
110+
111+
def test_memory_leak():
112+
"Run the memory leak test and report results."
113+
tgen = get_topogen()
114+
if not tgen.is_memleak_enabled():
115+
pytest.skip("Memory leak test/report is disabled")
116+
117+
tgen.report_memory_leaks()
118+
119+
120+
if __name__ == "__main__":
121+
args = ["-s"] + sys.argv[1:]
122+
sys.exit(pytest.main(args))

0 commit comments

Comments
 (0)