From 82ec908acbab63af64b1b912babcab9a16d9f0e6 Mon Sep 17 00:00:00 2001 From: Daniel Béreš Date: Wed, 27 Jul 2022 12:22:39 +0000 Subject: gomemif: update to libmemif version 4.0 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Type: improvement This patch provides: 1. interrupt mode support, 2. abstract socket support, 3. overriding responder example and divides it to two examples: -icmp_responder_cb -icmp_responder_poll Signed-off-by: Daniel Béreš Change-Id: I99c86d053521760c457541fc596ed554f4077608 --- extras/gomemif/README.rst | 50 ++++ extras/gomemif/WORKSPACE | 5 + extras/gomemif/examples/BUILD.bazel | 19 +- extras/gomemif/examples/icmp_responder_cb.go | 326 +++++++++++++++++++++++++ extras/gomemif/examples/icmp_responder_poll.go | 317 ++++++++++++++++++++++++ extras/gomemif/examples/responder.go | 227 ----------------- extras/gomemif/memif/control_channel.go | 41 +++- extras/gomemif/memif/interface.go | 26 +- 8 files changed, 777 insertions(+), 234 deletions(-) create mode 100644 extras/gomemif/README.rst create mode 100644 extras/gomemif/examples/icmp_responder_cb.go create mode 100644 extras/gomemif/examples/icmp_responder_poll.go delete mode 100644 extras/gomemif/examples/responder.go diff --git a/extras/gomemif/README.rst b/extras/gomemif/README.rst new file mode 100644 index 00000000000..18f382893c9 --- /dev/null +++ b/extras/gomemif/README.rst @@ -0,0 +1,50 @@ +.. _gomemif_doc: + +Gomemif library +======================= + +Memif library implemented in Go. The package contains 3 examples: Bridge and ICMP responder in interrupt and polling mode. + +setup and run +------------- +To Build all examples + +:: + + bazel build //... + +To Run ICMP responder in interrupt mode: + +:: + + DBGvpp# create interface memif id 0 master no-zero-copy + DBGvpp# set int ip addr memif0/0 192.168.1.2/24 + DBGvpp# set int state memif0/0 up + + bazel-bin/examples/linux_amd64_stripped/icmp_responder_cb + gomemif# start + + DBGvpp# ping 192.168.1.1 + +To Run ICMP responder in polling mode: + +:: + + DBGvpp# create interface memif id 0 master no-zero-copy + DBGvpp# set int ip addr memif0/0 192.168.1.2/24 + DBGvpp# set int state memif0/0 up + + bazel-bin/examples/linux_amd64_stripped/icmp_responder_poll + gomemif# start + + DBGvpp# ping 192.168.1.1 + +To Run Bridge: + +:: + + bazel-bin/examples/linux_amd64_stripped/bridge + gomemif# start + + + diff --git a/extras/gomemif/WORKSPACE b/extras/gomemif/WORKSPACE index 10741f5b7c0..d75bfa5a524 100644 --- a/extras/gomemif/WORKSPACE +++ b/extras/gomemif/WORKSPACE @@ -33,3 +33,8 @@ go_repository( importpath = "github.com/pkg/profile", commit = "acd64d450fd45fb2afa41f833f3788c8a7797219" ) +go_repository( + name = "com_github_gopacket", + importpath = "github.com/google/gopacket", + commit = "3eaba08943250fd212520e5cff00ed808b8fc60a" +) diff --git a/extras/gomemif/examples/BUILD.bazel b/extras/gomemif/examples/BUILD.bazel index a88b92e37f1..eb10422027f 100644 --- a/extras/gomemif/examples/BUILD.bazel +++ b/extras/gomemif/examples/BUILD.bazel @@ -1,12 +1,14 @@ load("@io_bazel_rules_go//go:def.bzl", "go_binary") go_binary( - name = "responder", - srcs = ["responder.go"], + name = "icmp_responder_poll", + srcs = ["icmp_responder_poll.go"], visibility = ["//visibility:public",], deps = [ "//memif:memif", "@com_github_profile//:go_default_library", + "@com_github_gopacket//layers:go_default_library", + "@com_github_gopacket//:go_default_library" ], ) @@ -19,3 +21,16 @@ go_binary( "@com_github_profile//:go_default_library", ], ) + +go_binary( + name = "icmp_responder_cb", + srcs = ["icmp_responder_cb.go"], + visibility = ["//visibility:public",], + deps = [ + "//memif:memif", + "@com_github_profile//:go_default_library", + "@com_github_gopacket//layers:go_default_library", + "@com_github_gopacket//:go_default_library" + ], +) + diff --git a/extras/gomemif/examples/icmp_responder_cb.go b/extras/gomemif/examples/icmp_responder_cb.go new file mode 100644 index 00000000000..8d8adcfc892 --- /dev/null +++ b/extras/gomemif/examples/icmp_responder_cb.go @@ -0,0 +1,326 @@ +/* + *------------------------------------------------------------------ + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *------------------------------------------------------------------ + */ + +package main + +import ( + "bufio" + "flag" + "fmt" + "net" + "os" + "strings" + "sync" + + "memif" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/pkg/profile" +) + +func Disconnected(i *memif.Interface) error { + fmt.Println("Disconnected: ", i.GetName()) + + data, ok := i.GetPrivateData().(*interfaceData) + if !ok { + return fmt.Errorf("Invalid private data") + } + close(data.quitChan) // stop polling + close(data.errChan) + data.wg.Wait() // wait until polling stops, then continue disconnect + + return nil +} + +func Responder(i *memif.Interface) error { + data, ok := i.GetPrivateData().(*interfaceData) + if !ok { + return fmt.Errorf("Invalid private data") + } + data.errChan = make(chan error, 1) + data.quitChan = make(chan struct{}, 1) + data.wg.Add(1) + + // allocate packet buffer + pkt := make([]byte, 2048) + // get rx queue + rxq0, err := i.GetRxQueue(0) + if err != nil { + return err + } + // get tx queue + txq0, err := i.GetTxQueue(0) + if err != nil { + return err + } + for { + + // read packet from shared memory + pktLen, err := rxq0.ReadPacket(pkt) + _ = err + if pktLen > 0 { + fmt.Printf("pktLen: %d\n", pktLen) + gopkt := gopacket.NewPacket(pkt[:pktLen], layers.LayerTypeEthernet, gopacket.NoCopy) + etherLayer := gopkt.Layer(layers.LayerTypeEthernet) + if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeARP { + rEth := layers.Ethernet{ + SrcMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}, + DstMAC: net.HardwareAddr{0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + + EthernetType: layers.EthernetTypeARP, + } + rArp := layers.ARP{ + AddrType: layers.LinkTypeEthernet, + Protocol: layers.EthernetTypeIPv4, + HwAddressSize: 6, + ProtAddressSize: 4, + Operation: layers.ARPReply, + SourceHwAddress: []byte(net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}), + SourceProtAddress: []byte("\xc0\xa8\x01\x01"), + DstHwAddress: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}), + DstProtAddress: []byte("\xc0\xa8\x01\x02"), + } + buf := gopacket.NewSerializeBuffer() + opts := gopacket.SerializeOptions{ + FixLengths: true, + ComputeChecksums: true, + } + gopacket.SerializeLayers(buf, opts, &rEth, &rArp) + // write packet to shared memory + txq0.WritePacket(buf.Bytes()) + } + + if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeIPv4 { + ipLayer := gopkt.Layer(layers.LayerTypeIPv4) + if ipLayer == nil { + fmt.Println("Missing IPv4 layer.") + + } + ipv4, _ := ipLayer.(*layers.IPv4) + if ipv4.Protocol != layers.IPProtocolICMPv4 { + fmt.Println("Not ICMPv4 protocol.") + } + icmpLayer := gopkt.Layer(layers.LayerTypeICMPv4) + if icmpLayer == nil { + fmt.Println("Missing ICMPv4 layer.") + } + icmp, _ := icmpLayer.(*layers.ICMPv4) + if icmp.TypeCode.Type() != layers.ICMPv4TypeEchoRequest { + fmt.Println("Not ICMPv4 echo request.") + } + fmt.Println("Received an ICMPv4 echo request.") + + // Build packet layers. + ethResp := layers.Ethernet{ + DstMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}, + //DstMAC: net.HardwareAddr{0x02, 0xfe, 0xa8, 0x77, 0xaf, 0x20}, + SrcMAC: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}), + + EthernetType: layers.EthernetTypeIPv4, + } + ipv4Resp := layers.IPv4{ + Version: 4, + IHL: 5, + TOS: 0, + Id: 0, + Flags: 0, + FragOffset: 0, + TTL: 255, + Protocol: layers.IPProtocolICMPv4, + SrcIP: []byte("\xc0\xa8\x01\x01"), + DstIP: []byte("\xc0\xa8\x01\x02"), + } + icmpResp := layers.ICMPv4{ + TypeCode: layers.CreateICMPv4TypeCode(layers.ICMPv4TypeEchoReply, 0), + Id: icmp.Id, + Seq: icmp.Seq, + } + + // Set up buffer and options for serialization. + buf := gopacket.NewSerializeBuffer() + opts := gopacket.SerializeOptions{ + FixLengths: true, + ComputeChecksums: true, + } + gopacket.SerializeLayers(buf, opts, ðResp, &ipv4Resp, &icmpResp, + gopacket.Payload(icmp.Payload)) + // write packet to shared memory + txq0.WritePacket(buf.Bytes()) + } + + } + return nil + + } + +} +func Connected(i *memif.Interface) error { + data, ok := i.GetPrivateData().(*interfaceData) + if !ok { + return fmt.Errorf("Invalid private data") + } + _ = data + + // allocate packet buffer + pkt := make([]byte, 2048) + // get rx queue + rxq0, err := i.GetRxQueue(0) + _ = err + + // read packet from shared memory + pktLen, err := rxq0.ReadPacket(pkt) + _, _ = err, pktLen + + return nil +} + +type interfaceData struct { + errChan chan error + quitChan chan struct{} + wg sync.WaitGroup +} + +func interractiveHelp() { + fmt.Println("help - print this help") + fmt.Println("start - start connecting loop") + fmt.Println("show - print interface details") + fmt.Println("exit - exit the application") +} + +func main() { + cpuprof := flag.String("cpuprof", "", "cpu profiling output file") + memprof := flag.String("memprof", "", "mem profiling output file") + role := flag.String("role", "slave", "interface role") + name := flag.String("name", "gomemif", "interface name") + socketName := flag.String("socket", "/run/vpp/memif.sock", "control socket filename") + + flag.Parse() + + if *cpuprof != "" { + defer profile.Start(profile.CPUProfile, profile.ProfilePath(*cpuprof)).Stop() + } + if *memprof != "" { + defer profile.Start(profile.MemProfile, profile.ProfilePath(*memprof)).Stop() + } + + memifErrChan := make(chan error) + exitChan := make(chan struct{}) + + var isMaster bool + switch *role { + case "slave": + isMaster = false + case "master": + isMaster = true + default: + fmt.Println("Invalid role") + return + } + + fmt.Println("GoMemif: Responder") + fmt.Println("-----------------------") + + socket, err := memif.NewSocket("gomemif_example", *socketName) + if err != nil { + fmt.Println("Failed to create socket: ", err) + return + } + + data := interfaceData{} + args := &memif.Arguments{ + IsMaster: isMaster, + ConnectedFunc: Connected, + DisconnectedFunc: Disconnected, + PrivateData: &data, + Name: *name, + InterruptFunc: Responder, + } + + i, err := socket.NewInterface(args) + if err != nil { + fmt.Println("Failed to create interface on socket %s: %s", socket.GetFilename(), err) + goto exit + } + + // slave attempts to connect to control socket + // to handle control communication call socket.StartPolling() + if !i.IsMaster() { + fmt.Println(args.Name, ": Connecting to control socket...") + for !i.IsConnecting() { + err = i.RequestConnection() + if err != nil { + /* TODO: check for ECONNREFUSED errno + * if error is ECONNREFUSED it may simply mean that master + * interface is not up yet, use i.RequestConnection() + */ + fmt.Println("Failed to connect: ", err) + goto exit + } + } + } + + go func(exitChan chan<- struct{}) { + reader := bufio.NewReader(os.Stdin) + for { + fmt.Print("gomemif# ") + text, _ := reader.ReadString('\n') + // convert CRLF to LF + text = strings.Replace(text, "\n", "", -1) + switch text { + case "help": + interractiveHelp() + case "start": + // start polling for events on this socket + socket.StartPolling(memifErrChan) + case "show": + fmt.Println("remote: ", i.GetRemoteName()) + fmt.Println("peer: ", i.GetPeerName()) + case "exit": + err = socket.StopPolling() + if err != nil { + fmt.Println("Failed to stop polling: ", err) + } + close(exitChan) + return + default: + fmt.Println("Unknown input") + } + } + }(exitChan) + + for { + select { + case <-exitChan: + goto exit + case err, ok := <-memifErrChan: + if ok { + fmt.Println(err) + } + case err, ok := <-data.errChan: + if ok { + fmt.Println(err) + } + default: + continue + } + } + +exit: + socket.Delete() + close(memifErrChan) +} diff --git a/extras/gomemif/examples/icmp_responder_poll.go b/extras/gomemif/examples/icmp_responder_poll.go new file mode 100644 index 00000000000..f9f1c3e8208 --- /dev/null +++ b/extras/gomemif/examples/icmp_responder_poll.go @@ -0,0 +1,317 @@ +/* + *------------------------------------------------------------------ + * Copyright (c) 2020 Cisco and/or its affiliates. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *------------------------------------------------------------------ + */ + +package main + +import ( + "bufio" + "flag" + "fmt" + "net" + "os" + "strings" + "sync" + + "memif" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/pkg/profile" +) + +func Disconnected(i *memif.Interface) error { + fmt.Println("Disconnected: ", i.GetName()) + + data, ok := i.GetPrivateData().(*interfaceData) + if !ok { + return fmt.Errorf("Invalid private data") + } + close(data.quitChan) // stop polling + close(data.errChan) + data.wg.Wait() // wait until polling stops, then continue disconnect + + return nil +} + +func Connected(i *memif.Interface) error { + fmt.Println("Connected: ", i.GetName()) + + data, ok := i.GetPrivateData().(*interfaceData) + if !ok { + return fmt.Errorf("Invalid private data") + } + data.errChan = make(chan error, 1) + data.quitChan = make(chan struct{}, 1) + data.wg.Add(1) + + go func(errChan chan<- error, quitChan <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + // allocate packet buffer + pkt := make([]byte, 2048) + // get rx queue + rxq0, err := i.GetRxQueue(0) + if err != nil { + errChan <- err + return + } + // get tx queue + txq0, err := i.GetTxQueue(0) + if err != nil { + errChan <- err + return + } + for { + select { + case <-quitChan: // channel closed + return + default: + // read packet from shared memory + pktLen, err := rxq0.ReadPacket(pkt) + if pktLen > 0 { + fmt.Printf("pktLen: %d\n", pktLen) + gopkt := gopacket.NewPacket(pkt[:pktLen], layers.LayerTypeEthernet, gopacket.NoCopy) + etherLayer := gopkt.Layer(layers.LayerTypeEthernet) + if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeARP { + + rEth := layers.Ethernet{ + SrcMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}, + DstMAC: net.HardwareAddr{0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + EthernetType: layers.EthernetTypeARP, + } + rArp := layers.ARP{ + AddrType: layers.LinkTypeEthernet, + Protocol: layers.EthernetTypeIPv4, + HwAddressSize: 6, + ProtAddressSize: 4, + Operation: layers.ARPReply, + SourceHwAddress: []byte(net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}), + SourceProtAddress: []byte("\xc0\xa8\x01\x01"), + DstHwAddress: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}), + DstProtAddress: []byte("\xc0\xa8\x01\x02"), + } + buf := gopacket.NewSerializeBuffer() + opts := gopacket.SerializeOptions{ + FixLengths: true, + ComputeChecksums: true, + } + gopacket.SerializeLayers(buf, opts, &rEth, &rArp) + // write packet to shared memory + txq0.WritePacket(buf.Bytes()) + } + + if etherLayer.(*layers.Ethernet).EthernetType == layers.EthernetTypeIPv4 { + ipLayer := gopkt.Layer(layers.LayerTypeIPv4) + if ipLayer == nil { + fmt.Println("Missing IPv4 layer.") + + } + ipv4, _ := ipLayer.(*layers.IPv4) + if ipv4.Protocol != layers.IPProtocolICMPv4 { + fmt.Println("Not ICMPv4 protocol.") + } + icmpLayer := gopkt.Layer(layers.LayerTypeICMPv4) + if icmpLayer == nil { + fmt.Println("Missing ICMPv4 layer.") + } + icmp, _ := icmpLayer.(*layers.ICMPv4) + if icmp.TypeCode.Type() != layers.ICMPv4TypeEchoRequest { + fmt.Println("Not ICMPv4 echo request.") + } + fmt.Println("Received an ICMPv4 echo request.") + + // Build packet layers. + ethResp := layers.Ethernet{ + DstMAC: net.HardwareAddr{0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa}, + //DstMAC: net.HardwareAddr{0x02, 0xfe, 0xa8, 0x77, 0xaf, 0x20}, + SrcMAC: []byte(net.HardwareAddr{0x02, 0xfe, 0x08, 0x88, 0x45, 0x7f}), + + EthernetType: layers.EthernetTypeIPv4, + } + ipv4Resp := layers.IPv4{ + Version: 4, + IHL: 5, + TOS: 0, + Id: 0, + Flags: 0, + FragOffset: 0, + TTL: 255, + Protocol: layers.IPProtocolICMPv4, + SrcIP: []byte("\xc0\xa8\x01\x01"), + DstIP: []byte("\xc0\xa8\x01\x02"), + } + icmpResp := layers.ICMPv4{ + TypeCode: layers.CreateICMPv4TypeCode(layers.ICMPv4TypeEchoReply, 0), + Id: icmp.Id, + Seq: icmp.Seq, + } + + // Set up buffer and options for serialization. + buf := gopacket.NewSerializeBuffer() + opts := gopacket.SerializeOptions{ + FixLengths: true, + ComputeChecksums: true, + } + gopacket.SerializeLayers(buf, opts, ðResp, &ipv4Resp, &icmpResp, + gopacket.Payload(icmp.Payload)) + // write packet to shared memory + txq0.WritePacket(buf.Bytes()) + } + } else if err != nil { + errChan <- err + return + } + } + } + }(data.errChan, data.quitChan, &data.wg) + + return nil +} + +type interfaceData struct { + errChan chan error + quitChan chan struct{} + wg sync.WaitGroup +} + +func interractiveHelp() { + fmt.Println("help - print this help") + fmt.Println("start - start connecting loop") + fmt.Println("show - print interface details") + fmt.Println("exit - exit the application") +} + +func main() { + cpuprof := flag.String("cpuprof", "", "cpu profiling output file") + memprof := flag.String("memprof", "", "mem profiling output file") + role := flag.String("role", "slave", "interface role") + name := flag.String("name", "gomemif", "interface name") + socketName := flag.String("socket", "", "control socket filename") + + flag.Parse() + + if *cpuprof != "" { + defer profile.Start(profile.CPUProfile, profile.ProfilePath(*cpuprof)).Stop() + } + if *memprof != "" { + defer profile.Start(profile.MemProfile, profile.ProfilePath(*memprof)).Stop() + } + + memifErrChan := make(chan error) + exitChan := make(chan struct{}) + + var isMaster bool + switch *role { + case "slave": + isMaster = false + case "master": + isMaster = true + default: + fmt.Println("Invalid role") + return + } + + fmt.Println("GoMemif: Responder") + fmt.Println("-----------------------") + + socket, err := memif.NewSocket("gomemif_example", *socketName) + if err != nil { + fmt.Println("Failed to create socket: ", err) + return + } + + data := interfaceData{} + args := &memif.Arguments{ + IsMaster: isMaster, + ConnectedFunc: Connected, + DisconnectedFunc: Disconnected, + PrivateData: &data, + Name: *name, + } + + i, err := socket.NewInterface(args) + if err != nil { + fmt.Println("Failed to create interface on socket %s: %s", socket.GetFilename(), err) + goto exit + } + + // slave attempts to connect to control socket + // to handle control communication call socket.StartPolling() + if !i.IsMaster() { + fmt.Println(args.Name, ": Connecting to control socket...") + for !i.IsConnecting() { + err = i.RequestConnection() + if err != nil { + /* TODO: check for ECONNREFUSED errno + * if error is ECONNREFUSED it may simply mean that master + * interface is not up yet, use i.RequestConnection() + */ + fmt.Println("Faild to connect: ", err) + goto exit + } + } + } + + go func(exitChan chan<- struct{}) { + reader := bufio.NewReader(os.Stdin) + for { + fmt.Print("gomemif# ") + text, _ := reader.ReadString('\n') + // convert CRLF to LF + text = strings.Replace(text, "\n", "", -1) + switch text { + case "help": + interractiveHelp() + case "start": + // start polling for events on this socket + socket.StartPolling(memifErrChan) + case "show": + fmt.Println("remote: ", i.GetRemoteName()) + fmt.Println("peer: ", i.GetPeerName()) + case "exit": + err = socket.StopPolling() + if err != nil { + fmt.Println("Failed to stop polling: ", err) + } + close(exitChan) + return + default: + fmt.Println("Unknown input") + } + } + }(exitChan) + + for { + select { + case <-exitChan: + goto exit + case err, ok := <-memifErrChan: + if ok { + fmt.Println(err) + } + case err, ok := <-data.errChan: + if ok { + fmt.Println(err) + } + default: + continue + } + } + +exit: + socket.Delete() + close(memifErrChan) +} diff --git a/extras/gomemif/examples/responder.go b/extras/gomemif/examples/responder.go deleted file mode 100644 index 8fda6435bc5..00000000000 --- a/extras/gomemif/examples/responder.go +++ /dev/null @@ -1,227 +0,0 @@ -/* - *------------------------------------------------------------------ - * Copyright (c) 2020 Cisco and/or its affiliates. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *------------------------------------------------------------------ - */ - -package main - -import ( - "bufio" - "flag" - "fmt" - "os" - "strings" - "sync" - - "github.com/pkg/profile" - "memif" -) - -func Disconnected(i *memif.Interface) error { - fmt.Println("Disconnected: ", i.GetName()) - - data, ok := i.GetPrivateData().(*interfaceData) - if !ok { - return fmt.Errorf("Invalid private data") - } - close(data.quitChan) // stop polling - close(data.errChan) - data.wg.Wait() // wait until polling stops, then continue disconnect - - return nil -} - -func Connected(i *memif.Interface) error { - fmt.Println("Connected: ", i.GetName()) - - data, ok := i.GetPrivateData().(*interfaceData) - if !ok { - return fmt.Errorf("Invalid private data") - } - data.errChan = make(chan error, 1) - data.quitChan = make(chan struct{}, 1) - data.wg.Add(1) - - go func(errChan chan<- error, quitChan <-chan struct{}, wg *sync.WaitGroup) { - defer wg.Done() - // allocate packet buffer - pkt := make([]byte, 2048) - // get rx queue - rxq0, err := i.GetRxQueue(0) - if err != nil { - errChan <- err - return - } - // get tx queue - txq0, err := i.GetTxQueue(0) - if err != nil { - errChan <- err - return - } - for { - select { - case <-quitChan: // channel closed - return - default: - // read packet from shared memory - pktLen, err := rxq0.ReadPacket(pkt) - if pktLen > 0 { - // write packet to shared memory - txq0.WritePacket(pkt[:pktLen]) - } else if err != nil { - errChan <- err - return - } - } - } - }(data.errChan, data.quitChan, &data.wg) - - return nil -} - -type interfaceData struct { - errChan chan error - quitChan chan struct{} - wg sync.WaitGroup -} - -func interractiveHelp() { - fmt.Println("help - print this help") - fmt.Println("start - start connecting loop") - fmt.Println("show - print interface details") - fmt.Println("exit - exit the application") -} - -func main() { - cpuprof := flag.String("cpuprof", "", "cpu profiling output file") - memprof := flag.String("memprof", "", "mem profiling output file") - role := flag.String("role", "slave", "interface role") - name := flag.String("name", "gomemif", "interface name") - socketName := flag.String("socket", "", "control socket filename") - - flag.Parse() - - if *cpuprof != "" { - defer profile.Start(profile.CPUProfile, profile.ProfilePath(*cpuprof)).Stop() - } - if *memprof != "" { - defer profile.Start(profile.MemProfile, profile.ProfilePath(*memprof)).Stop() - } - - memifErrChan := make(chan error) - exitChan := make(chan struct{}) - - var isMaster bool - switch *role { - case "slave": - isMaster = false - case "master": - isMaster = true - default: - fmt.Println("Invalid role") - return - } - - fmt.Println("GoMemif: Responder") - fmt.Println("-----------------------") - - socket, err := memif.NewSocket("gomemif_example", *socketName) - if err != nil { - fmt.Println("Failed to create socket: ", err) - return - } - - data := interfaceData{} - args := &memif.Arguments{ - IsMaster: isMaster, - ConnectedFunc: Connected, - DisconnectedFunc: Disconnected, - PrivateData: &data, - Name: *name, - } - - i, err := socket.NewInterface(args) - if err != nil { - fmt.Println("Failed to create interface on socket %s: %s", socket.GetFilename(), err) - goto exit - } - - // slave attempts to connect to control socket - // to handle control communication call socket.StartPolling() - if !i.IsMaster() { - fmt.Println(args.Name, ": Connecting to control socket...") - for !i.IsConnecting() { - err = i.RequestConnection() - if err != nil { - /* TODO: check for ECONNREFUSED errno - * if error is ECONNREFUSED it may simply mean that master - * interface is not up yet, use i.RequestConnection() - */ - fmt.Println("Faild to connect: ", err) - goto exit - } - } - } - - go func(exitChan chan<- struct{}) { - reader := bufio.NewReader(os.Stdin) - for { - fmt.Print("gomemif# ") - text, _ := reader.ReadString('\n') - // convert CRLF to LF - text = strings.Replace(text, "\n", "", -1) - switch text { - case "help": - interractiveHelp() - case "start": - // start polling for events on this socket - socket.StartPolling(memifErrChan) - case "show": - fmt.Println("remote: ", i.GetRemoteName()) - fmt.Println("peer: ", i.GetPeerName()) - case "exit": - err = socket.StopPolling() - if err != nil { - fmt.Println("Failed to stop polling: ", err) - } - close(exitChan) - return - default: - fmt.Println("Unknown input") - } - } - }(exitChan) - - for { - select { - case <-exitChan: - goto exit - case err, ok := <-memifErrChan: - if ok { - fmt.Println(err) - } - case err, ok := <-data.errChan: - if ok { - fmt.Println(err) - } - default: - continue - } - } - -exit: - socket.Delete() - close(memifErrChan) -} diff --git a/extras/gomemif/memif/control_channel.go b/extras/gomemif/memif/control_channel.go index 12672e6e2f8..4788fcb5ea5 100644 --- a/extras/gomemif/memif/control_channel.go +++ b/extras/gomemif/memif/control_channel.go @@ -67,11 +67,22 @@ type Socket struct { interfaceList *list.List ccList *list.List epfd int + interruptfd int wakeEvent syscall.EpollEvent stopPollChan chan struct{} wg sync.WaitGroup } +type interrupt struct { + socket *Socket + event syscall.EpollEvent +} + +type memifInterrupt struct { + connection *Socket + qid uint16 +} + // StopPolling stops polling events on the socket func (socket *Socket) StopPolling() error { if socket.stopPollChan != nil { @@ -220,6 +231,15 @@ func (socket *Socket) handleEvent(event *syscall.EpollEvent) error { if socket.listener != nil && socket.listener.event.Fd == event.Fd { return socket.listener.handleEvent(event) } + intf := socket.interfaceList.Back().Value.(*Interface) + if intf.args.InterruptFunc != nil { + if int(event.Fd) == int(intf.args.InterruptFd) { + b := make([]byte, 8) + syscall.Read(int(event.Fd), b) + intf.onInterrupt(intf) + return nil + } + } for elt := socket.ccList.Front(); elt != nil; elt = elt.Next() { cc, ok := elt.Value.(*controlChannel) @@ -233,6 +253,25 @@ func (socket *Socket) handleEvent(event *syscall.EpollEvent) error { return fmt.Errorf(errorFdNotFound) } +func (socket *Socket) addInterrupt(fd int) (err error) { + l := &interrupt{ + // we will need this to look up master interface by id + socket: socket, + } + + l.event = syscall.EpollEvent{ + Events: syscall.EPOLLIN, + Fd: int32(fd), + } + err = socket.addEvent(&l.event) + if err != nil { + return fmt.Errorf("Failed to add event: ", err) + } + + return nil + +} + // handleEvent handles epoll event for listener func (l *listener) handleEvent(event *syscall.EpollEvent) error { // hang up @@ -725,7 +764,6 @@ func (cc *controlChannel) parseConnect() (err error) { if err != nil { return err } - cc.isConnected = true return nil @@ -764,7 +802,6 @@ func (cc *controlChannel) parseConnected() (err error) { if err != nil { return err } - cc.isConnected = true return nil diff --git a/extras/gomemif/memif/interface.go b/extras/gomemif/memif/interface.go index 15a8e87453c..4a45075ea4f 100644 --- a/extras/gomemif/memif/interface.go +++ b/extras/gomemif/memif/interface.go @@ -60,6 +60,8 @@ type ConnectedFunc func(i *Interface) error // DisconnectedFunc is a callback called when an interface is disconnected type DisconnectedFunc func(i *Interface) error +type InterruptFunc func(i *Interface) error + // MemoryConfig represents shared memory configuration type MemoryConfig struct { NumQueuePairs uint16 // number of queue pairs @@ -77,7 +79,9 @@ type Arguments struct { MemoryConfig MemoryConfig ConnectedFunc ConnectedFunc // callback called when interface changes status to connected DisconnectedFunc DisconnectedFunc // callback called when interface changes status to disconnected - PrivateData interface{} // private data used by client program + InterruptFunc InterruptFunc + PrivateData interface{} // private data used by client program + InterruptFd uint16 } // memoryRegion represents a shared memory mapped file @@ -110,6 +114,7 @@ type Interface struct { regions []memoryRegion txQueues []Queue rxQueues []Queue + onInterrupt InterruptFunc } // IsMaster returns true if the interfaces role is master, else returns false @@ -270,6 +275,10 @@ func RoleToString(isMaster bool) string { return "Slave" } +func memifPathIsAbstract(filename string) bool { + return (filename[0] == '@') +} + // RequestConnection is used by slave interface to connect to a socket and // create a control channel func (i *Interface) RequestConnection() error { @@ -283,6 +292,9 @@ func (i *Interface) RequestConnection() error { } usa := &syscall.SockaddrUnix{Name: i.socket.filename} + if memifPathIsAbstract(i.socket.GetFilename()) { + usa.Name = "\000" + usa.Name[1:] + } // Connect to listener socket err = syscall.Connect(fd, usa) if err != nil { @@ -315,7 +327,8 @@ func (socket *Socket) NewInterface(args *Arguments) (*Interface, error) { // copy interface configuration i := Interface{ - args: *args, + args: *args, + onInterrupt: args.InterruptFunc, } // set default values if i.args.MemoryConfig.NumQueuePairs == 0 { @@ -434,6 +447,7 @@ func (i *Interface) initializeQueues() (err error) { if err != nil { return err } + i.socket.addInterrupt(q.interruptFd) q.putRing() i.txQueues = append(i.txQueues, *q) @@ -452,11 +466,17 @@ func (i *Interface) initializeQueues() (err error) { i: i, } q.ring.setCookie(cookie) - q.ring.setFlags(1) + if i.args.InterruptFunc == nil { + q.ring.setFlags(1) + } else { + q.ring.setFlags(0) + } q.interruptFd, err = eventFd() if err != nil { return err } + i.args.InterruptFd = uint16(q.interruptFd) + i.socket.addInterrupt(q.interruptFd) q.putRing() i.rxQueues = append(i.rxQueues, *q) -- cgit 1.2.3-korg