Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions pkg/agent/multicast/mcast_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,25 @@ func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
klog.V(2).InfoS("Received PacketIn for IGMP packet", "in_port", iface.OFPort)
podName := "unknown"
var srcNode net.IP

pktData := new(protocol.Ethernet)
if err := pktData.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil {
return fmt.Errorf("failed to parse Ethernet packet from packet-in message: %v", err)
}
ipPacket, err := parseIPv4Packet(pktData)
if err != nil {
return fmt.Errorf("failed to parse IPv4 packet from packet-in message: %v", err)
}

if iface.Type == interfacestore.ContainerInterface {
podName = iface.PodName
} else if iface.Type == interfacestore.TunnelInterface {
var err error
srcNode, err = s.parseSrcNode(pktIn)
if err != nil {
return err
}
// If an IGMP report arrives via a tunnel, extract the Node IP from its source IP.
// This works because for remote IGMP reports (sent via packet-out), the source IP
// is set to the Node's transport IP (see pkg/agent/openflow/client.go SendIGMPRemoteReportPacketOut).
srcNode = ipPacket.NWSrc
}
pktData := new(protocol.Ethernet)
if err := pktData.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil {
return fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err)
}
igmp, err := parseIGMPPacket(*pktData)
igmp, err := parseIGMPPacket(ipPacket)
if err != nil {
return err
}
Expand Down Expand Up @@ -292,16 +297,6 @@ func (s *IGMPSnooper) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
return nil
}

func (s *IGMPSnooper) parseSrcNode(pktIn *ofctrl.PacketIn) (net.IP, error) {
matches := pktIn.GetMatches()
tunSrcField := matches.GetMatchByName(binding.NxmFieldTunIPv4Src)
if tunSrcField == nil {
return nil, errors.New("in_port field not found")
}
tunSrc := tunSrcField.GetValue().(net.IP)
return tunSrc, nil
}

func generateIGMPQueryPacket(group net.IP, version uint8, queryInterval time.Duration) (util.Message, error) {
// The max response time field in IGMP protocol uses a value in units of 1/10 second.
// See https://datatracker.ietf.org/doc/html/rfc2236 and https://datatracker.ietf.org/doc/html/rfc3376
Expand Down Expand Up @@ -335,14 +330,18 @@ func generateIGMPQueryPacket(group net.IP, version uint8, queryInterval time.Dur
return nil, fmt.Errorf("unsupported IGMP version %d", version)
}

func parseIGMPPacket(pkt protocol.Ethernet) (protocol.IGMPMessage, error) {
func parseIPv4Packet(pkt *protocol.Ethernet) (*protocol.IPv4, error) {
if pkt.Ethertype != protocol.IPv4_MSG {
return nil, errors.New("not IPv4 packet")
}
ipPacket, ok := pkt.Data.(*protocol.IPv4)
if !ok {
return nil, errors.New("failed to parse IPv4 packet")
}
return ipPacket, nil
}

func parseIGMPPacket(ipPacket *protocol.IPv4) (protocol.IGMPMessage, error) {
if ipPacket.Protocol != IGMPProtocolNumber {
return nil, errors.New("not IGMP packet")
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/agent/multicast/mcast_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,14 @@ func TestParseIGMPPacket(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
igmpMsg, err := parseIGMPPacket(tc.packet)
assert.Equal(t, tc.igmpMsg, igmpMsg)
assert.Equal(t, tc.err, err)
ipPacket, err := parseIPv4Packet(&tc.packet)
if err != nil {
assert.Equal(t, tc.err, err)
} else {
igmpMsg, err := parseIGMPPacket(ipPacket)
assert.Equal(t, tc.igmpMsg, igmpMsg)
assert.Equal(t, tc.err, err)
}
})
}
}
Expand Down Expand Up @@ -239,16 +244,13 @@ func generatePacketWithMatches(m util.Message, ofport uint32, srcNodeIP net.IP,
for i := range matches {
pkt.Match.AddField(matches[i])
}
if srcNodeIP != nil {
matchTunSrc := openflow15.NewTunnelIpv4SrcField(srcNodeIP, nil)
pkt.Match.AddField(*matchTunSrc)
}
ipPacket := &protocol.IPv4{
Version: 0x4,
IHL: 5,
Protocol: IGMPProtocolNumber,
Length: 20 + m.Len(),
Data: m,
NWSrc: srcNodeIP,
}
ethernetPkt := protocol.NewEthernet()
ethernetPkt.HWDst = pktInDstMAC
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/multicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ func testMulticastForwardToMultipleInterfaces(t *testing.T, data *TestData, send

func runTestMulticastBetweenPods(t *testing.T, data *TestData, mc multicastTestcase, nodeMulticastInterfaces map[int][]string, testNamespace string, transportInterface string, checkReceiverRoute bool, checkSenderRoute bool) {
currentEncapMode, _ := data.GetEncapMode()
if requiresExternalHostSupport(mc) && currentEncapMode == config.TrafficEncapModeEncap {
if requiresExternalHostSupport(mc) && currentEncapMode.SupportsEncap() {
t.Skipf("Multicast does not support using hostNetwork Pod to simulate the external host with encap mode, skip the case")
}
mcjoinWaitTimeout := defaultTimeout / time.Second
Expand Down
Loading