aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJakub Grajciar <jgrajcia@cisco.com>2020-04-02 10:02:17 +0200
committerDamjan Marion <dmarion@me.com>2020-04-28 21:18:37 +0000
commit07363a45fe4a7fe693acf438f0b56f927bdd3fbd (patch)
tree6d53728ac594de1b86e85c7d4ea1d9f8d145a993
parentc458c493667bde30c22760e3a1839f2cac6e6447 (diff)
gomemif: introduce gomemif
golang native memif driver Type: feature Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com> Change-Id: I693156a44011c80025245d25134f5bf5db6eba82 Signed-off-by: Jakub Grajciar <jgrajcia@cisco.com>
-rw-r--r--.gitignore2
-rw-r--r--MAINTAINERS5
-rw-r--r--extras/gomemif/BUILD.bazel0
-rw-r--r--extras/gomemif/WORKSPACE35
-rw-r--r--extras/gomemif/examples/BUILD.bazel21
-rw-r--r--extras/gomemif/examples/bridge.go286
-rw-r--r--extras/gomemif/examples/responder.go227
-rw-r--r--extras/gomemif/memif/BUILD.bazel17
-rw-r--r--extras/gomemif/memif/control_channel.go965
-rw-r--r--extras/gomemif/memif/control_channel_unsafe.go60
-rw-r--r--extras/gomemif/memif/interface.go507
-rw-r--r--extras/gomemif/memif/interface_unsafe.go40
-rw-r--r--extras/gomemif/memif/memif.go345
-rw-r--r--extras/gomemif/memif/memif_unsafe.go55
-rw-r--r--extras/gomemif/memif/packet_reader.go91
-rw-r--r--extras/gomemif/memif/packet_writer.go95
16 files changed, 2751 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index b39d137d838..3e9a92e44ac 100644
--- a/.gitignore
+++ b/.gitignore
@@ -106,3 +106,5 @@ GTAGS
# extra scripts config
/extras/scripts/.config/
+# extras gomemif build files
+/extras/gomemif/bazel*
diff --git a/MAINTAINERS b/MAINTAINERS
index 83a2f61c9fd..bb26bc4207a 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -593,6 +593,11 @@ I: libmemif
M: Damjan Marion <damarion@cisco.com>
F: extras/libmemif
+gomemif
+I: gomemif
+M: Jakub Grajciar <jgrajcia@cisco.com>
+F: extras/gomemif
+
VPP Comms Library
I: vcl
Y: src/vnet/vcl/FEATURE.yaml
diff --git a/extras/gomemif/BUILD.bazel b/extras/gomemif/BUILD.bazel
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/extras/gomemif/BUILD.bazel
diff --git a/extras/gomemif/WORKSPACE b/extras/gomemif/WORKSPACE
new file mode 100644
index 00000000000..10741f5b7c0
--- /dev/null
+++ b/extras/gomemif/WORKSPACE
@@ -0,0 +1,35 @@
+load("@bazel_tools//tools/build_defs/repo:http.bzl", "http_archive")
+
+http_archive(
+ name = "io_bazel_rules_go",
+ urls = [
+ "https://mirror.bazel.build/github.com/bazelbuild/rules_go/releases/download/v0.21.2/rules_go-v0.21.2.tar.gz",
+ "https://github.com/bazelbuild/rules_go/releases/download/v0.21.2/rules_go-v0.21.2.tar.gz",
+ ],
+ sha256 = "f99a9d76e972e0c8f935b2fe6d0d9d778f67c760c6d2400e23fc2e469016e2bd",
+)
+
+load("@io_bazel_rules_go//go:deps.bzl", "go_rules_dependencies", "go_register_toolchains")
+
+http_archive(
+ name = "bazel_gazelle",
+ sha256 = "86c6d481b3f7aedc1d60c1c211c6f76da282ae197c3b3160f54bd3a8f847896f",
+ urls = [
+ "https://storage.googleapis.com/bazel-mirror/github.com/bazelbuild/bazel-gazelle/releases/download/v0.19.1/bazel-gazelle-v0.19.1.tar.gz",
+ "https://github.com/bazelbuild/bazel-gazelle/releases/download/v0.19.1/bazel-gazelle-v0.19.1.tar.gz",
+ ],
+)
+
+go_rules_dependencies()
+
+go_register_toolchains()
+
+load("@bazel_gazelle//:deps.bzl", "gazelle_dependencies", "go_repository")
+
+gazelle_dependencies()
+
+go_repository(
+ name = "com_github_profile",
+ importpath = "github.com/pkg/profile",
+ commit = "acd64d450fd45fb2afa41f833f3788c8a7797219"
+)
diff --git a/extras/gomemif/examples/BUILD.bazel b/extras/gomemif/examples/BUILD.bazel
new file mode 100644
index 00000000000..a88b92e37f1
--- /dev/null
+++ b/extras/gomemif/examples/BUILD.bazel
@@ -0,0 +1,21 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary")
+
+go_binary(
+ name = "responder",
+ srcs = ["responder.go"],
+ visibility = ["//visibility:public",],
+ deps = [
+ "//memif:memif",
+ "@com_github_profile//:go_default_library",
+ ],
+)
+
+go_binary(
+ name = "bridge",
+ srcs = ["bridge.go"],
+ visibility = ["//visibility:public",],
+ deps = [
+ "//memif:memif",
+ "@com_github_profile//:go_default_library",
+ ],
+)
diff --git a/extras/gomemif/examples/bridge.go b/extras/gomemif/examples/bridge.go
new file mode 100644
index 00000000000..a192034e3be
--- /dev/null
+++ b/extras/gomemif/examples/bridge.go
@@ -0,0 +1,286 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package main
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/pkg/profile"
+ "memif"
+)
+
+func Disconnected(i *memif.Interface) error {
+ fmt.Println("Disconnected: ", i.GetName())
+
+ data, ok := i.GetPrivateData().(*interfaceData)
+ if !ok {
+ return fmt.Errorf("Invalid private data")
+ }
+ close(data.quitChan) // stop polling
+ close(data.errChan)
+ data.wg.Wait() // wait until polling stops, then continue disconnect
+
+ return nil
+}
+
+func Connected(i *memif.Interface) error {
+ fmt.Println("Connected: ", i.GetName())
+
+ data, ok := i.GetPrivateData().(*interfaceData)
+ if !ok {
+ return fmt.Errorf("Invalid private data")
+ }
+ data.errChan = make(chan error, 1)
+ data.quitChan = make(chan struct{}, 1)
+ data.wg.Add(1)
+
+ go func(errChan chan<- error, quitChan <-chan struct{}, wg *sync.WaitGroup) {
+ defer wg.Done()
+ // allocate packet buffer
+ pkt := make([]byte, 2048)
+ // get rx queue
+ rxq0, err := i.GetRxQueue(0)
+ if err != nil {
+ errChan <- err
+ return
+ }
+
+ // wait until both interfaces are connected
+ for !data.bri.IsConnected() {
+ time.Sleep(100 * time.Millisecond)
+ }
+
+ // get bridged interfaces tx queue
+ txq0, err := data.bri.GetTxQueue(0)
+ if err != nil {
+ errChan <- err
+ return
+ }
+ for {
+ select {
+ case <-quitChan: // channel closed
+ return
+ default:
+ // read packet from shared memory
+ pktLen, err := rxq0.ReadPacket(pkt)
+ if pktLen > 0 {
+ // FIXME: prevent packet write if interface is disconencted
+ // write packet to shared memory
+ txq0.WritePacket(pkt[:pktLen])
+ } else if err != nil {
+ errChan <- err
+ return
+ }
+ }
+ }
+ }(data.errChan, data.quitChan, &data.wg)
+
+ return nil
+}
+
+type interfaceData struct {
+ errChan chan error
+ quitChan chan struct{}
+ wg sync.WaitGroup
+ // bridged interface
+ bri *memif.Interface
+}
+
+func interractiveHelp() {
+ fmt.Println("help - print this help")
+ fmt.Println("start - start connecting loop")
+ fmt.Println("show - print interface details")
+ fmt.Println("exit - exit the application")
+}
+
+func newMemifInterface(socket *memif.Socket, id uint32, isMaster bool, name string) (*memif.Interface, *interfaceData, error) {
+ data := &interfaceData{}
+ args := &memif.Arguments{
+ Id: id,
+ IsMaster: isMaster,
+ ConnectedFunc: Connected,
+ DisconnectedFunc: Disconnected,
+ PrivateData: data,
+ Name: name,
+ }
+
+ i, err := socket.NewInterface(args)
+ if err != nil {
+ return nil, nil, fmt.Errorf("Failed to create interface on socket %s: %s", socket.GetFilename(), err)
+ }
+
+ // slave attempts to connect to control socket
+ // to handle control communication call socket.StartPolling()
+ if !i.IsMaster() {
+ fmt.Println(args.Name, ": Connecting to control socket...")
+ for !i.IsConnecting() {
+ err = i.RequestConnection()
+ if err != nil {
+ /* TODO: check for ECONNREFUSED errno
+ * if error is ECONNREFUSED it may simply mean that master
+ * interface is not up yet, use i.RequestConnection()
+ */
+ return nil, nil, fmt.Errorf("Faild to connect: ", err)
+ }
+ }
+ }
+
+ return i, data, nil
+}
+
+func printMemifInterfaceDetails(i *memif.Interface) {
+ fmt.Println(i.GetName(), ":")
+ fmt.Println("\trole: ", memif.RoleToString(i.IsMaster()))
+ fmt.Println("\tid: ", i.GetId())
+ link := "down"
+ if i.IsConnected() {
+ link = "up"
+ }
+ fmt.Println("\tlink: ", link)
+ fmt.Println("\tremote: ", i.GetRemoteName())
+ fmt.Println("\tpeer: ", i.GetPeerName())
+ if i.IsConnected() {
+ mc := i.GetMemoryConfig()
+ fmt.Println("queue pairs: ", mc.NumQueuePairs)
+ fmt.Println("ring size: ", (1 << mc.Log2RingSize))
+ fmt.Println("buffer size: ", mc.PacketBufferSize)
+ }
+}
+
+func main() {
+ memifErrChan := make(chan error)
+ exitChan := make(chan struct{})
+ var i0, i1 *memif.Interface
+ var d0, d1 *interfaceData
+
+ cpuprof := flag.String("cpuprof", "", "cpu profiling output file")
+ memprof := flag.String("memprof", "", "mem profiling output file")
+ role := flag.String("role", "slave", "interface role")
+ name := flag.String("name", "gomemif", "interface name")
+ socketName := flag.String("socket", "", "control socket filename")
+
+ flag.Parse()
+
+ // profiling options
+ if *cpuprof != "" {
+ defer profile.Start(profile.CPUProfile, profile.ProfilePath(*cpuprof)).Stop()
+ }
+ if *memprof != "" {
+ defer profile.Start(profile.MemProfile, profile.ProfilePath(*memprof)).Stop()
+ }
+
+ // memif options
+ var isMaster bool
+ switch *role {
+ case "slave":
+ isMaster = false
+ case "master":
+ isMaster = true
+ default:
+ fmt.Println("Invalid role")
+ return
+ }
+
+ // create memif socket
+ socket, err := memif.NewSocket("gomemif_example", *socketName)
+ if err != nil {
+ fmt.Println("Failed to create socket: ", err)
+ return
+ }
+
+ i0, d0, err = newMemifInterface(socket, 0, isMaster, *name)
+ if err != nil {
+ fmt.Println(err)
+ goto exit
+ }
+
+ // TODO: update name
+ i1, d1, err = newMemifInterface(socket, 1, isMaster, *name)
+ if err != nil {
+ fmt.Println(err)
+ goto exit
+ }
+
+ // set up bridge
+ d0.bri = i1
+ d1.bri = i0
+
+ // user input goroutine
+ go func(exitChan chan<- struct{}) {
+ reader := bufio.NewReader(os.Stdin)
+ fmt.Println("GoMemif: Responder")
+ fmt.Println("-----------------------")
+ for {
+ fmt.Print("gomemif# ")
+ text, _ := reader.ReadString('\n')
+ // convert CRLF to LF
+ text = strings.Replace(text, "\n", "", -1)
+ switch text {
+ case "help":
+ interractiveHelp()
+ case "start":
+ // start polling for events on this socket
+ socket.StartPolling(memifErrChan)
+ case "show":
+ printMemifInterfaceDetails(i0)
+ printMemifInterfaceDetails(i1)
+ case "exit":
+ err = socket.StopPolling()
+ if err != nil {
+ fmt.Println("Failed to stop polling: ", err)
+ }
+ close(exitChan)
+ return
+ default:
+ fmt.Println("Unknown input")
+ }
+ }
+ }(exitChan)
+
+ // main loop
+ for {
+ select {
+ case <-exitChan:
+ goto exit
+ case err, ok := <-memifErrChan:
+ if ok {
+ fmt.Println(err)
+ }
+ case err, ok := <-d0.errChan:
+ if ok {
+ fmt.Println(err)
+ }
+ case err, ok := <-d1.errChan:
+ if ok {
+ fmt.Println(err)
+ }
+ default:
+ continue
+ }
+ }
+
+exit:
+ socket.Delete()
+ close(memifErrChan)
+}
diff --git a/extras/gomemif/examples/responder.go b/extras/gomemif/examples/responder.go
new file mode 100644
index 00000000000..8fda6435bc5
--- /dev/null
+++ b/extras/gomemif/examples/responder.go
@@ -0,0 +1,227 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package main
+
+import (
+ "bufio"
+ "flag"
+ "fmt"
+ "os"
+ "strings"
+ "sync"
+
+ "github.com/pkg/profile"
+ "memif"
+)
+
+func Disconnected(i *memif.Interface) error {
+ fmt.Println("Disconnected: ", i.GetName())
+
+ data, ok := i.GetPrivateData().(*interfaceData)
+ if !ok {
+ return fmt.Errorf("Invalid private data")
+ }
+ close(data.quitChan) // stop polling
+ close(data.errChan)
+ data.wg.Wait() // wait until polling stops, then continue disconnect
+
+ return nil
+}
+
+func Connected(i *memif.Interface) error {
+ fmt.Println("Connected: ", i.GetName())
+
+ data, ok := i.GetPrivateData().(*interfaceData)
+ if !ok {
+ return fmt.Errorf("Invalid private data")
+ }
+ data.errChan = make(chan error, 1)
+ data.quitChan = make(chan struct{}, 1)
+ data.wg.Add(1)
+
+ go func(errChan chan<- error, quitChan <-chan struct{}, wg *sync.WaitGroup) {
+ defer wg.Done()
+ // allocate packet buffer
+ pkt := make([]byte, 2048)
+ // get rx queue
+ rxq0, err := i.GetRxQueue(0)
+ if err != nil {
+ errChan <- err
+ return
+ }
+ // get tx queue
+ txq0, err := i.GetTxQueue(0)
+ if err != nil {
+ errChan <- err
+ return
+ }
+ for {
+ select {
+ case <-quitChan: // channel closed
+ return
+ default:
+ // read packet from shared memory
+ pktLen, err := rxq0.ReadPacket(pkt)
+ if pktLen > 0 {
+ // write packet to shared memory
+ txq0.WritePacket(pkt[:pktLen])
+ } else if err != nil {
+ errChan <- err
+ return
+ }
+ }
+ }
+ }(data.errChan, data.quitChan, &data.wg)
+
+ return nil
+}
+
+type interfaceData struct {
+ errChan chan error
+ quitChan chan struct{}
+ wg sync.WaitGroup
+}
+
+func interractiveHelp() {
+ fmt.Println("help - print this help")
+ fmt.Println("start - start connecting loop")
+ fmt.Println("show - print interface details")
+ fmt.Println("exit - exit the application")
+}
+
+func main() {
+ cpuprof := flag.String("cpuprof", "", "cpu profiling output file")
+ memprof := flag.String("memprof", "", "mem profiling output file")
+ role := flag.String("role", "slave", "interface role")
+ name := flag.String("name", "gomemif", "interface name")
+ socketName := flag.String("socket", "", "control socket filename")
+
+ flag.Parse()
+
+ if *cpuprof != "" {
+ defer profile.Start(profile.CPUProfile, profile.ProfilePath(*cpuprof)).Stop()
+ }
+ if *memprof != "" {
+ defer profile.Start(profile.MemProfile, profile.ProfilePath(*memprof)).Stop()
+ }
+
+ memifErrChan := make(chan error)
+ exitChan := make(chan struct{})
+
+ var isMaster bool
+ switch *role {
+ case "slave":
+ isMaster = false
+ case "master":
+ isMaster = true
+ default:
+ fmt.Println("Invalid role")
+ return
+ }
+
+ fmt.Println("GoMemif: Responder")
+ fmt.Println("-----------------------")
+
+ socket, err := memif.NewSocket("gomemif_example", *socketName)
+ if err != nil {
+ fmt.Println("Failed to create socket: ", err)
+ return
+ }
+
+ data := interfaceData{}
+ args := &memif.Arguments{
+ IsMaster: isMaster,
+ ConnectedFunc: Connected,
+ DisconnectedFunc: Disconnected,
+ PrivateData: &data,
+ Name: *name,
+ }
+
+ i, err := socket.NewInterface(args)
+ if err != nil {
+ fmt.Println("Failed to create interface on socket %s: %s", socket.GetFilename(), err)
+ goto exit
+ }
+
+ // slave attempts to connect to control socket
+ // to handle control communication call socket.StartPolling()
+ if !i.IsMaster() {
+ fmt.Println(args.Name, ": Connecting to control socket...")
+ for !i.IsConnecting() {
+ err = i.RequestConnection()
+ if err != nil {
+ /* TODO: check for ECONNREFUSED errno
+ * if error is ECONNREFUSED it may simply mean that master
+ * interface is not up yet, use i.RequestConnection()
+ */
+ fmt.Println("Faild to connect: ", err)
+ goto exit
+ }
+ }
+ }
+
+ go func(exitChan chan<- struct{}) {
+ reader := bufio.NewReader(os.Stdin)
+ for {
+ fmt.Print("gomemif# ")
+ text, _ := reader.ReadString('\n')
+ // convert CRLF to LF
+ text = strings.Replace(text, "\n", "", -1)
+ switch text {
+ case "help":
+ interractiveHelp()
+ case "start":
+ // start polling for events on this socket
+ socket.StartPolling(memifErrChan)
+ case "show":
+ fmt.Println("remote: ", i.GetRemoteName())
+ fmt.Println("peer: ", i.GetPeerName())
+ case "exit":
+ err = socket.StopPolling()
+ if err != nil {
+ fmt.Println("Failed to stop polling: ", err)
+ }
+ close(exitChan)
+ return
+ default:
+ fmt.Println("Unknown input")
+ }
+ }
+ }(exitChan)
+
+ for {
+ select {
+ case <-exitChan:
+ goto exit
+ case err, ok := <-memifErrChan:
+ if ok {
+ fmt.Println(err)
+ }
+ case err, ok := <-data.errChan:
+ if ok {
+ fmt.Println(err)
+ }
+ default:
+ continue
+ }
+ }
+
+exit:
+ socket.Delete()
+ close(memifErrChan)
+}
diff --git a/extras/gomemif/memif/BUILD.bazel b/extras/gomemif/memif/BUILD.bazel
new file mode 100644
index 00000000000..e6539ff59bd
--- /dev/null
+++ b/extras/gomemif/memif/BUILD.bazel
@@ -0,0 +1,17 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "memif",
+ srcs = [
+ "interface.go",
+ "interface_unsafe.go",
+ "control_channel.go",
+ "control_channel_unsafe.go",
+ "memif.go",
+ "memif_unsafe.go",
+ "packet_writer.go",
+ "packet_reader.go",
+ ],
+ importpath = "memif",
+ visibility = ["//visibility:public",],
+)
diff --git a/extras/gomemif/memif/control_channel.go b/extras/gomemif/memif/control_channel.go
new file mode 100644
index 00000000000..32e34933ab4
--- /dev/null
+++ b/extras/gomemif/memif/control_channel.go
@@ -0,0 +1,965 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package memif
+
+import (
+ "bytes"
+ "container/list"
+ "encoding/binary"
+ "fmt"
+ "os"
+ "sync"
+ "syscall"
+)
+
+const maxEpollEvents = 1
+const maxControlLen = 256
+
+const errorFdNotFound = "fd not found"
+
+// controlMsg represents a message used in communication between memif peers
+type controlMsg struct {
+ Buffer *bytes.Buffer
+ Fd int
+}
+
+// listener represents a listener functionality of UNIX domain socket
+type listener struct {
+ socket *Socket
+ event syscall.EpollEvent
+}
+
+// controlChannel represents a communication channel between memif peers
+// backed by UNIX domain socket
+type controlChannel struct {
+ listRef *list.Element
+ socket *Socket
+ i *Interface
+ event syscall.EpollEvent
+ data [msgSize]byte
+ control [maxControlLen]byte
+ controlLen int
+ msgQueue []controlMsg
+ isConnected bool
+}
+
+// Socket represents a UNIX domain socket used for communication
+// between memif peers
+type Socket struct {
+ appName string
+ filename string
+ listener *listener
+ interfaceList *list.List
+ ccList *list.List
+ epfd int
+ wakeEvent syscall.EpollEvent
+ stopPollChan chan struct{}
+ wg sync.WaitGroup
+}
+
+// StopPolling stops polling events on the socket
+func (socket *Socket) StopPolling() error {
+ if socket.stopPollChan != nil {
+ // stop polling msg
+ close(socket.stopPollChan)
+ // wake epoll
+ buf := make([]byte, 8)
+ binary.PutUvarint(buf, 1)
+ n, err := syscall.Write(int(socket.wakeEvent.Fd), buf[:])
+ if err != nil {
+ return err
+ }
+ if n != 8 {
+ return fmt.Errorf("Faild to write to eventfd")
+ }
+ // wait until polling is stopped
+ socket.wg.Wait()
+ }
+
+ return nil
+}
+
+// StartPolling starts polling and handling events on the socket,
+// enabling communication between memif peers
+func (socket *Socket) StartPolling(errChan chan<- error) {
+ socket.stopPollChan = make(chan struct{})
+ socket.wg.Add(1)
+ go func() {
+ var events [maxEpollEvents]syscall.EpollEvent
+ defer socket.wg.Done()
+
+ for {
+ select {
+ case <-socket.stopPollChan:
+ return
+ default:
+ num, err := syscall.EpollWait(socket.epfd, events[:], -1)
+ if err != nil {
+ errChan <- fmt.Errorf("EpollWait: ", err)
+ return
+ }
+
+ for ev := 0; ev < num; ev++ {
+ if events[0].Fd == socket.wakeEvent.Fd {
+ continue
+ }
+ err = socket.handleEvent(&events[0])
+ if err != nil {
+ errChan <- fmt.Errorf("handleEvent: ", err)
+ }
+ }
+ }
+ }
+ }()
+}
+
+// addEvent adds event to epoll instance associated with the socket
+func (socket *Socket) addEvent(event *syscall.EpollEvent) error {
+ err := syscall.EpollCtl(socket.epfd, syscall.EPOLL_CTL_ADD, int(event.Fd), event)
+ if err != nil {
+ return fmt.Errorf("EpollCtl: %s", err)
+ }
+ return nil
+}
+
+// addEvent deletes event to epoll instance associated with the socket
+func (socket *Socket) delEvent(event *syscall.EpollEvent) error {
+ err := syscall.EpollCtl(socket.epfd, syscall.EPOLL_CTL_DEL, int(event.Fd), event)
+ if err != nil {
+ return fmt.Errorf("EpollCtl: %s", err)
+ }
+ return nil
+}
+
+// Delete deletes the socket
+func (socket *Socket) Delete() (err error) {
+ for elt := socket.ccList.Front(); elt != nil; elt = elt.Next() {
+ cc, ok := elt.Value.(*controlChannel)
+ if ok {
+ err = cc.close(true, "Socket deleted")
+ if err != nil {
+ return nil
+ }
+ }
+ }
+ for elt := socket.interfaceList.Front(); elt != nil; elt = elt.Next() {
+ i, ok := elt.Value.(*Interface)
+ if ok {
+ err = i.Delete()
+ if err != nil {
+ return err
+ }
+ }
+ }
+
+ if socket.listener != nil {
+ err = socket.listener.close()
+ if err != nil {
+ return err
+ }
+ err = os.Remove(socket.filename)
+ if err != nil {
+ return nil
+ }
+ }
+
+ err = socket.delEvent(&socket.wakeEvent)
+ if err != nil {
+ return fmt.Errorf("Failed to delete event: ", err)
+ }
+
+ syscall.Close(socket.epfd)
+
+ return nil
+}
+
+// NewSocket returns a new Socket
+func NewSocket(appName string, filename string) (socket *Socket, err error) {
+ socket = &Socket{
+ appName: appName,
+ filename: filename,
+ interfaceList: list.New(),
+ ccList: list.New(),
+ }
+ if socket.filename == "" {
+ socket.filename = DefaultSocketFilename
+ }
+
+ socket.epfd, _ = syscall.EpollCreate1(0)
+
+ efd, err := eventFd()
+ socket.wakeEvent = syscall.EpollEvent{
+ Events: syscall.EPOLLIN | syscall.EPOLLERR | syscall.EPOLLHUP,
+ Fd: int32(efd),
+ }
+ err = socket.addEvent(&socket.wakeEvent)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to add event: ", err)
+ }
+
+ return socket, nil
+}
+
+// handleEvent handles epoll event
+func (socket *Socket) handleEvent(event *syscall.EpollEvent) error {
+ if socket.listener != nil && socket.listener.event.Fd == event.Fd {
+ return socket.listener.handleEvent(event)
+ }
+
+ for elt := socket.ccList.Front(); elt != nil; elt = elt.Next() {
+ cc, ok := elt.Value.(*controlChannel)
+ if ok {
+ if cc.event.Fd == event.Fd {
+ return cc.handleEvent(event)
+ }
+ }
+ }
+
+ return fmt.Errorf(errorFdNotFound)
+}
+
+// handleEvent handles epoll event for listener
+func (l *listener) handleEvent(event *syscall.EpollEvent) error {
+ // hang up
+ if (event.Events & syscall.EPOLLHUP) == syscall.EPOLLHUP {
+ err := l.close()
+ if err != nil {
+ return fmt.Errorf("Failed to close listener after hang up event: ", err)
+ }
+ return fmt.Errorf("Hang up: ", l.socket.filename)
+ }
+
+ // error
+ if (event.Events & syscall.EPOLLERR) == syscall.EPOLLERR {
+ err := l.close()
+ if err != nil {
+ return fmt.Errorf("Failed to close listener after receiving an error event: ", err)
+ }
+ return fmt.Errorf("Received error event on listener ", l.socket.filename)
+ }
+
+ // read message
+ if (event.Events & syscall.EPOLLIN) == syscall.EPOLLIN {
+ newFd, _, err := syscall.Accept(int(l.event.Fd))
+ if err != nil {
+ return fmt.Errorf("Accept: %s", err)
+ }
+
+ cc, err := l.socket.addControlChannel(newFd, nil)
+ if err != nil {
+ return fmt.Errorf("Failed to add control channel: %s", err)
+ }
+
+ err = cc.msgEnqHello()
+ if err != nil {
+ return fmt.Errorf("msgEnqHello: %s", err)
+ }
+
+ err = cc.sendMsg()
+ if err != nil {
+ return err
+ }
+
+ return nil
+ }
+
+ return fmt.Errorf("Unexpected event: ", event.Events)
+}
+
+// handleEvent handles epoll event for control channel
+func (cc *controlChannel) handleEvent(event *syscall.EpollEvent) error {
+ var size int
+ var err error
+
+ // hang up
+ if (event.Events & syscall.EPOLLHUP) == syscall.EPOLLHUP {
+ // close cc, don't send msg
+ err := cc.close(false, "")
+ if err != nil {
+ return fmt.Errorf("Failed to close control channel after hang up event: ", err)
+ }
+ return fmt.Errorf("Hang up: ", cc.i.GetName())
+ }
+
+ if (event.Events & syscall.EPOLLERR) == syscall.EPOLLERR {
+ // close cc, don't send msg
+ err := cc.close(false, "")
+ if err != nil {
+ return fmt.Errorf("Failed to close control channel after receiving an error event: ", err)
+ }
+ return fmt.Errorf("Received error event on control channel ", cc.i.GetName())
+ }
+
+ if (event.Events & syscall.EPOLLIN) == syscall.EPOLLIN {
+ size, cc.controlLen, _, _, err = syscall.Recvmsg(int(cc.event.Fd), cc.data[:], cc.control[:], 0)
+ if err != nil {
+ return fmt.Errorf("recvmsg: %s", err)
+ }
+ if size != msgSize {
+ return fmt.Errorf("invalid message size %d", size)
+ }
+
+ err = cc.parseMsg()
+ if err != nil {
+ return err
+ }
+
+ err = cc.sendMsg()
+ if err != nil {
+ return err
+ }
+
+ return nil
+ }
+
+ return fmt.Errorf("Unexpected event: ", event.Events)
+}
+
+// close closes the listener
+func (l *listener) close() error {
+ err := l.socket.delEvent(&l.event)
+ if err != nil {
+ return fmt.Errorf("Failed to del event: ", err)
+ }
+ err = syscall.Close(int(l.event.Fd))
+ if err != nil {
+ return fmt.Errorf("Failed to close socket: ", err)
+ }
+ return nil
+}
+
+// AddListener adds a lisntener to the socket. The fd must describe a
+// UNIX domain socket already bound to a UNIX domain filename and
+// marked as listener
+func (socket *Socket) AddListener(fd int) (err error) {
+ l := &listener{
+ // we will need this to look up master interface by id
+ socket: socket,
+ }
+
+ l.event = syscall.EpollEvent{
+ Events: syscall.EPOLLIN | syscall.EPOLLERR | syscall.EPOLLHUP,
+ Fd: int32(fd),
+ }
+ err = socket.addEvent(&l.event)
+ if err != nil {
+ return fmt.Errorf("Failed to add event: ", err)
+ }
+
+ socket.listener = l
+
+ return nil
+}
+
+// addListener creates new UNIX domain socket, binds it to the address
+// and marks it as listener
+func (socket *Socket) addListener() (err error) {
+ // create socket
+ fd, err := syscall.Socket(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
+ if err != nil {
+ return fmt.Errorf("Failed to create UNIX domain socket")
+ }
+ usa := &syscall.SockaddrUnix{Name: socket.filename}
+
+ // Bind to address and start listening
+ err = syscall.SetsockoptInt(fd, syscall.SOL_SOCKET, syscall.SO_PASSCRED, 1)
+ if err != nil {
+ return fmt.Errorf("Failed to set socket option %s : %v", socket.filename, err)
+ }
+ err = syscall.Bind(fd, usa)
+ if err != nil {
+ return fmt.Errorf("Failed to bind socket %s : %v", socket.filename, err)
+ }
+ err = syscall.Listen(fd, syscall.SOMAXCONN)
+ if err != nil {
+ return fmt.Errorf("Failed to listen on socket %s : %v", socket.filename, err)
+ }
+
+ return socket.AddListener(fd)
+}
+
+// close closes a control channel, if the control channel is assigned an
+// interface, the interface is disconnected
+func (cc *controlChannel) close(sendMsg bool, str string) (err error) {
+ if sendMsg == true {
+ // first clear message queue so that the disconnect
+ // message is the only message in queue
+ cc.msgQueue = []controlMsg{}
+ cc.msgEnqDisconnect(str)
+
+ err = cc.sendMsg()
+ if err != nil {
+ return err
+ }
+ }
+
+ err = cc.socket.delEvent(&cc.event)
+ if err != nil {
+ return fmt.Errorf("Failed to del event: ", err)
+ }
+
+ // remove referance form socket
+ cc.socket.ccList.Remove(cc.listRef)
+
+ if cc.i != nil {
+ err = cc.i.disconnect()
+ if err != nil {
+ return fmt.Errorf("Interface Disconnect: ", err)
+ }
+ }
+
+ return nil
+}
+
+//addControlChannel returns a new controlChannel and adds it to the socket
+func (socket *Socket) addControlChannel(fd int, i *Interface) (*controlChannel, error) {
+ cc := &controlChannel{
+ socket: socket,
+ i: i,
+ isConnected: false,
+ }
+
+ var err error
+
+ cc.event = syscall.EpollEvent{
+ Events: syscall.EPOLLIN | syscall.EPOLLERR | syscall.EPOLLHUP,
+ Fd: int32(fd),
+ }
+ err = socket.addEvent(&cc.event)
+ if err != nil {
+ return nil, fmt.Errorf("Failed to add event: ", err)
+ }
+
+ cc.listRef = socket.ccList.PushBack(cc)
+
+ return cc, nil
+}
+
+func (cc *controlChannel) msgEnqAck() (err error) {
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeAck)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: -1,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) msgEnqHello() (err error) {
+ hello := MsgHello{
+ VersionMin: Version,
+ VersionMax: Version,
+ MaxRegion: 255,
+ MaxRingM2S: 255,
+ MaxRingS2M: 255,
+ MaxLog2RingSize: 14,
+ }
+
+ copy(hello.Name[:], []byte(cc.socket.appName))
+
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeHello)
+ err = binary.Write(buf, binary.LittleEndian, hello)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: -1,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) parseHello() (err error) {
+ var hello MsgHello
+
+ buf := bytes.NewReader(cc.data[msgTypeSize:])
+ err = binary.Read(buf, binary.LittleEndian, &hello)
+ if err != nil {
+ return
+ }
+
+ if hello.VersionMin > Version || hello.VersionMax < Version {
+ return fmt.Errorf("Incompatible memif version")
+ }
+
+ cc.i.run = cc.i.args.MemoryConfig
+
+ cc.i.run.NumQueuePairs = min16(cc.i.args.MemoryConfig.NumQueuePairs, hello.MaxRingS2M)
+ cc.i.run.NumQueuePairs = min16(cc.i.args.MemoryConfig.NumQueuePairs, hello.MaxRingM2S)
+ cc.i.run.Log2RingSize = min8(cc.i.args.MemoryConfig.Log2RingSize, hello.MaxLog2RingSize)
+
+ cc.i.remoteName = string(hello.Name[:])
+
+ return nil
+}
+
+func (cc *controlChannel) msgEnqInit() (err error) {
+ init := MsgInit{
+ Version: Version,
+ Id: cc.i.args.Id,
+ Mode: interfaceModeEthernet,
+ }
+
+ copy(init.Name[:], []byte(cc.socket.appName))
+
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeInit)
+ err = binary.Write(buf, binary.LittleEndian, init)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: -1,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) parseInit() (err error) {
+ var init MsgInit
+
+ buf := bytes.NewReader(cc.data[msgTypeSize:])
+ err = binary.Read(buf, binary.LittleEndian, &init)
+ if err != nil {
+ return
+ }
+
+ if init.Version != Version {
+ return fmt.Errorf("Incompatible memif driver version")
+ }
+
+ // find peer interface
+ for elt := cc.socket.interfaceList.Front(); elt != nil; elt = elt.Next() {
+ i, ok := elt.Value.(*Interface)
+ if ok {
+ if i.args.Id == init.Id && i.args.IsMaster && i.cc == nil {
+ // verify secret
+ if i.args.Secret != init.Secret {
+ return fmt.Errorf("Invalid secret")
+ }
+ // interface is assigned to control channel
+ i.cc = cc
+ cc.i = i
+ cc.i.run = cc.i.args.MemoryConfig
+ cc.i.remoteName = string(init.Name[:])
+
+ return nil
+ }
+ }
+ }
+
+ return fmt.Errorf("Invalid interface id")
+}
+
+func (cc *controlChannel) msgEnqAddRegion(regionIndex uint16) (err error) {
+ if len(cc.i.regions) <= int(regionIndex) {
+ return fmt.Errorf("Invalid region index")
+ }
+
+ addRegion := MsgAddRegion{
+ Index: regionIndex,
+ Size: cc.i.regions[regionIndex].size,
+ }
+
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeAddRegion)
+ err = binary.Write(buf, binary.LittleEndian, addRegion)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: cc.i.regions[regionIndex].fd,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) parseAddRegion() (err error) {
+ var addRegion MsgAddRegion
+
+ buf := bytes.NewReader(cc.data[msgTypeSize:])
+ err = binary.Read(buf, binary.LittleEndian, &addRegion)
+ if err != nil {
+ return
+ }
+
+ fd, err := cc.parseControlMsg()
+ if err != nil {
+ return fmt.Errorf("parseControlMsg: %s", err)
+ }
+
+ if addRegion.Index > 255 {
+ return fmt.Errorf("Invalid memory region index")
+ }
+
+ region := memoryRegion{
+ size: addRegion.Size,
+ fd: fd,
+ }
+
+ cc.i.regions = append(cc.i.regions, region)
+
+ return nil
+}
+
+func (cc *controlChannel) msgEnqAddRing(ringType ringType, ringIndex uint16) (err error) {
+ var q Queue
+ var flags uint16 = 0
+
+ if ringType == ringTypeS2M {
+ q = cc.i.txQueues[ringIndex]
+ flags = msgAddRingFlagS2M
+ } else {
+ q = cc.i.rxQueues[ringIndex]
+ }
+
+ addRing := MsgAddRing{
+ Index: ringIndex,
+ Offset: uint32(q.ring.offset),
+ Region: uint16(q.ring.region),
+ RingSizeLog2: uint8(q.ring.log2Size),
+ Flags: flags,
+ PrivateHdrSize: 0,
+ }
+
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeAddRing)
+ err = binary.Write(buf, binary.LittleEndian, addRing)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: q.interruptFd,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) parseAddRing() (err error) {
+ var addRing MsgAddRing
+
+ buf := bytes.NewReader(cc.data[msgTypeSize:])
+ err = binary.Read(buf, binary.LittleEndian, &addRing)
+ if err != nil {
+ return
+ }
+
+ fd, err := cc.parseControlMsg()
+ if err != nil {
+ return err
+ }
+
+ if addRing.Index >= cc.i.run.NumQueuePairs {
+ return fmt.Errorf("invalid ring index")
+ }
+
+ q := Queue{
+ i: cc.i,
+ interruptFd: fd,
+ }
+
+ if (addRing.Flags & msgAddRingFlagS2M) == msgAddRingFlagS2M {
+ q.ring = newRing(int(addRing.Region), ringTypeS2M, int(addRing.Offset), int(addRing.RingSizeLog2))
+ cc.i.rxQueues = append(cc.i.rxQueues, q)
+ } else {
+ q.ring = newRing(int(addRing.Region), ringTypeM2S, int(addRing.Offset), int(addRing.RingSizeLog2))
+ cc.i.txQueues = append(cc.i.txQueues, q)
+ }
+
+ return nil
+}
+
+func (cc *controlChannel) msgEnqConnect() (err error) {
+ var connect MsgConnect
+ copy(connect.Name[:], []byte(cc.i.args.Name))
+
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeConnect)
+ err = binary.Write(buf, binary.LittleEndian, connect)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: -1,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) parseConnect() (err error) {
+ var connect MsgConnect
+
+ buf := bytes.NewReader(cc.data[msgTypeSize:])
+ err = binary.Read(buf, binary.LittleEndian, &connect)
+ if err != nil {
+ return
+ }
+
+ cc.i.peerName = string(connect.Name[:])
+
+ err = cc.i.connect()
+ if err != nil {
+ return err
+ }
+
+ cc.isConnected = true
+
+ return nil
+}
+
+func (cc *controlChannel) msgEnqConnected() (err error) {
+ var connected MsgConnected
+ copy(connected.Name[:], []byte(cc.i.args.Name))
+
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeConnected)
+ err = binary.Write(buf, binary.LittleEndian, connected)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: -1,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) parseConnected() (err error) {
+ var conn MsgConnected
+
+ buf := bytes.NewReader(cc.data[msgTypeSize:])
+ err = binary.Read(buf, binary.LittleEndian, &conn)
+ if err != nil {
+ return
+ }
+
+ cc.i.peerName = string(conn.Name[:])
+
+ err = cc.i.connect()
+ if err != nil {
+ return err
+ }
+
+ cc.isConnected = true
+
+ return nil
+}
+
+func (cc *controlChannel) msgEnqDisconnect(str string) (err error) {
+ dc := MsgDisconnect{
+ // not implemented
+ Code: 0,
+ }
+ copy(dc.String[:], str)
+
+ buf := new(bytes.Buffer)
+ err = binary.Write(buf, binary.LittleEndian, msgTypeDisconnect)
+ err = binary.Write(buf, binary.LittleEndian, dc)
+
+ msg := controlMsg{
+ Buffer: buf,
+ Fd: -1,
+ }
+
+ cc.msgQueue = append(cc.msgQueue, msg)
+
+ return nil
+}
+
+func (cc *controlChannel) parseDisconnect() (err error) {
+ var dc MsgDisconnect
+
+ buf := bytes.NewReader(cc.data[msgTypeSize:])
+ err = binary.Read(buf, binary.LittleEndian, &dc)
+ if err != nil {
+ return
+ }
+
+ err = cc.close(false, string(dc.String[:]))
+ if err != nil {
+ return fmt.Errorf("Failed to disconnect control channel: ", err)
+ }
+
+ return nil
+}
+
+func (cc *controlChannel) parseMsg() error {
+ var msgType msgType
+ var err error
+
+ buf := bytes.NewReader(cc.data[:])
+ err = binary.Read(buf, binary.LittleEndian, &msgType)
+
+ if msgType == msgTypeAck {
+ return nil
+ } else if msgType == msgTypeHello {
+ // Configure
+ err = cc.parseHello()
+ if err != nil {
+ goto error
+ }
+ // Initialize slave memif
+ err = cc.i.initializeRegions()
+ if err != nil {
+ goto error
+ }
+ err = cc.i.initializeQueues()
+ if err != nil {
+ goto error
+ }
+ // Enqueue messages
+ err = cc.msgEnqInit()
+ if err != nil {
+ goto error
+ }
+ for i := 0; i < len(cc.i.regions); i++ {
+ err = cc.msgEnqAddRegion(uint16(i))
+ if err != nil {
+ goto error
+ }
+ }
+ for i := 0; uint16(i) < cc.i.run.NumQueuePairs; i++ {
+ err = cc.msgEnqAddRing(ringTypeS2M, uint16(i))
+ if err != nil {
+ goto error
+ }
+ }
+ for i := 0; uint16(i) < cc.i.run.NumQueuePairs; i++ {
+ err = cc.msgEnqAddRing(ringTypeM2S, uint16(i))
+ if err != nil {
+ goto error
+ }
+ }
+ err = cc.msgEnqConnect()
+ if err != nil {
+ goto error
+ }
+ } else if msgType == msgTypeInit {
+ err = cc.parseInit()
+ if err != nil {
+ goto error
+ }
+
+ err = cc.msgEnqAck()
+ if err != nil {
+ goto error
+ }
+ } else if msgType == msgTypeAddRegion {
+ err = cc.parseAddRegion()
+ if err != nil {
+ goto error
+ }
+
+ err = cc.msgEnqAck()
+ if err != nil {
+ goto error
+ }
+ } else if msgType == msgTypeAddRing {
+ err = cc.parseAddRing()
+ if err != nil {
+ goto error
+ }
+
+ err = cc.msgEnqAck()
+ if err != nil {
+ goto error
+ }
+ } else if msgType == msgTypeConnect {
+ err = cc.parseConnect()
+ if err != nil {
+ goto error
+ }
+
+ err = cc.msgEnqConnected()
+ if err != nil {
+ goto error
+ }
+ } else if msgType == msgTypeConnected {
+ err = cc.parseConnected()
+ if err != nil {
+ goto error
+ }
+ } else if msgType == msgTypeDisconnect {
+ err = cc.parseDisconnect()
+ if err != nil {
+ goto error
+ }
+ } else {
+ err = fmt.Errorf("unknown message %d", msgType)
+ goto error
+ }
+
+ return nil
+
+error:
+ err1 := cc.close(true, err.Error())
+ if err1 != nil {
+ return fmt.Errorf(err.Error(), ": Failed to close control channel: ", err1)
+ }
+
+ return err
+}
+
+// parseControlMsg parses control message and returns file descriptor
+// if any
+func (cc *controlChannel) parseControlMsg() (fd int, err error) {
+ // Assert only called when we require FD
+ fd = -1
+
+ controlMsgs, err := syscall.ParseSocketControlMessage(cc.control[:cc.controlLen])
+ if err != nil {
+ return -1, fmt.Errorf("syscall.ParseSocketControlMessage: %s", err)
+ }
+
+ if len(controlMsgs) == 0 {
+ return -1, fmt.Errorf("Missing control message")
+ }
+
+ for _, cmsg := range controlMsgs {
+ if cmsg.Header.Level == syscall.SOL_SOCKET {
+ if cmsg.Header.Type == syscall.SCM_RIGHTS {
+ FDs, err := syscall.ParseUnixRights(&cmsg)
+ if err != nil {
+ return -1, fmt.Errorf("syscall.ParseUnixRights: %s", err)
+ }
+ if len(FDs) == 0 {
+ continue
+ }
+ // Only expect single FD
+ fd = FDs[0]
+ }
+ }
+ }
+
+ if fd == -1 {
+ return -1, fmt.Errorf("Missing file descriptor")
+ }
+
+ return fd, nil
+}
diff --git a/extras/gomemif/memif/control_channel_unsafe.go b/extras/gomemif/memif/control_channel_unsafe.go
new file mode 100644
index 00000000000..9e91297b160
--- /dev/null
+++ b/extras/gomemif/memif/control_channel_unsafe.go
@@ -0,0 +1,60 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package memif
+
+import (
+ "fmt"
+ "os"
+ "syscall"
+ "unsafe"
+)
+
+// sendMsg sends a control message from contorl channels message queue
+func (cc *controlChannel) sendMsg() (err error) {
+ if len(cc.msgQueue) < 1 {
+ return nil
+ }
+ // Get message buffer
+ msg := cc.msgQueue[0]
+ // Dequeue
+ cc.msgQueue = cc.msgQueue[1:]
+
+ iov := &syscall.Iovec{
+ Base: &msg.Buffer.Bytes()[0],
+ Len: msgSize,
+ }
+
+ msgh := syscall.Msghdr{
+ Iov: iov,
+ Iovlen: 1,
+ }
+
+ if msg.Fd > 0 {
+ oob := syscall.UnixRights(msg.Fd)
+ msgh.Control = &oob[0]
+ msgh.Controllen = uint64(syscall.CmsgSpace(4))
+ }
+
+ _, _, errno := syscall.Syscall(syscall.SYS_SENDMSG, uintptr(cc.event.Fd), uintptr(unsafe.Pointer(&msgh)), uintptr(0))
+ if errno != 0 {
+ err = os.NewSyscallError("sendmsg", errno)
+ return fmt.Errorf("SYS_SENDMSG: %s", errno)
+ }
+
+ return nil
+}
diff --git a/extras/gomemif/memif/interface.go b/extras/gomemif/memif/interface.go
new file mode 100644
index 00000000000..a571deb43c9
--- /dev/null
+++ b/extras/gomemif/memif/interface.go
@@ -0,0 +1,507 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+// Package memif provides the implementation of shared memory interface (memif).
+//
+// Memif network interfaces communicate using UNIX domain socket. This socket
+// must be first created using NewSocket(). Then interfaces can be added
+// to this socket using NewInterface(). To start communication on each socket
+// socket.StartPolling() must be called. socket.StopPolling() will stop
+// the communication. When the interface changes link status Connected and
+// Disconencted callbacks set in Arguments for each interface are called
+// respectively. Once the interface is connected rx and tx queues can be
+// aquired using interface.GetRxQueue() and interface.GetTxQueue().
+// Packets can be transmitted by calling queue.ReadPacket() on rx queues and
+// queue.WritePacket() on tx queues. If the interface is disconnected
+// queue.ReadPacket() and queue.WritePacket() MUST not be called.
+//
+// Data transmission is backed by shared memory. The driver works in
+// promiscuous mode only.
+package memif
+
+import (
+ "container/list"
+ "fmt"
+ "os"
+ "syscall"
+)
+
+const (
+ DefaultSocketFilename = "/run/vpp/memif.sock"
+ DefaultNumQueuePairs = 1
+ DefaultLog2RingSize = 10
+ DefaultPacketBufferSize = 2048
+)
+
+const mfd_allow_sealing = 2
+const sys_memfd_create = 319
+const f_add_seals = 1033
+const f_seal_shrink = 0x0002
+
+const efd_nonblock = 04000
+
+// ConnectedFunc is a callback called when an interface is connected
+type ConnectedFunc func(i *Interface) error
+
+// DisconnectedFunc is a callback called when an interface is disconnected
+type DisconnectedFunc func(i *Interface) error
+
+// MemoryConfig represents shared memory configuration
+type MemoryConfig struct {
+ NumQueuePairs uint16 // number of queue pairs
+ Log2RingSize uint8 // ring size as log2
+ PacketBufferSize uint32 // size of single packet buffer
+}
+
+// Arguments represent interface configuration
+type Arguments struct {
+ Id uint32 // Interface identifier unique across socket. Used to identify peer interface when connecting
+ IsMaster bool // Interface role master/slave
+ Name string
+ Secret [24]byte // optional parameter, secrets of the interfaces must match if they are to connect
+ MemoryConfig MemoryConfig
+ ConnectedFunc ConnectedFunc // callback called when interface changes status to connected
+ DisconnectedFunc DisconnectedFunc // callback called when interface changes status to disconnected
+ PrivateData interface{} // private data used by client program
+}
+
+// memoryRegion represents a shared memory mapped file
+type memoryRegion struct {
+ data []byte
+ size uint64
+ fd int
+ packetBufferOffset uint32
+}
+
+// Queue represents rx or tx queue
+type Queue struct {
+ ring *ring
+ i *Interface
+ lastHead uint16
+ lastTail uint16
+ interruptFd int
+}
+
+// Interface represents memif network interface
+type Interface struct {
+ args Arguments
+ run MemoryConfig
+ privateData interface{}
+ listRef *list.Element
+ socket *Socket
+ cc *controlChannel
+ remoteName string
+ peerName string
+ regions []memoryRegion
+ txQueues []Queue
+ rxQueues []Queue
+}
+
+// IsMaster returns true if the interfaces role is master, else returns false
+func (i *Interface) IsMaster() bool {
+ return i.args.IsMaster
+}
+
+// GetRemoteName returns the name of the application on which the peer
+// interface exists
+func (i *Interface) GetRemoteName() string {
+ return i.remoteName
+}
+
+// GetPeerName returns peer interfaces name
+func (i *Interface) GetPeerName() string {
+ return i.peerName
+}
+
+// GetName returens interfaces name
+func (i *Interface) GetName() string {
+ return i.args.Name
+}
+
+// GetMemoryConfig returns interfaces active memory config.
+// If interface is not connected the config is invalid.
+func (i *Interface) GetMemoryConfig() MemoryConfig {
+ return i.run
+}
+
+// GetRxQueue returns an rx queue specified by queue index
+func (i *Interface) GetRxQueue(qid int) (*Queue, error) {
+ if qid >= len(i.rxQueues) {
+ return nil, fmt.Errorf("Invalid Queue index")
+ }
+ return &i.rxQueues[qid], nil
+}
+
+// GetRxQueue returns a tx queue specified by queue index
+func (i *Interface) GetTxQueue(qid int) (*Queue, error) {
+ if qid >= len(i.txQueues) {
+ return nil, fmt.Errorf("Invalid Queue index")
+ }
+ return &i.txQueues[qid], nil
+}
+
+// GetEventFd returns queues interrupt event fd
+func (q *Queue) GetEventFd() (int, error) {
+ return q.interruptFd, nil
+}
+
+// GetFilename returns sockets filename
+func (socket *Socket) GetFilename() string {
+ return socket.filename
+}
+
+// close closes the queue
+func (q *Queue) close() {
+ syscall.Close(q.interruptFd)
+}
+
+// IsConnecting returns true if the interface is connecting
+func (i *Interface) IsConnecting() bool {
+ if i.cc != nil {
+ return true
+ }
+ return false
+}
+
+// IsConnected returns true if the interface is connected
+func (i *Interface) IsConnected() bool {
+ if i.cc != nil && i.cc.isConnected {
+ return true
+ }
+ return false
+}
+
+// Disconnect disconnects the interface
+func (i *Interface) Disconnect() (err error) {
+ if i.cc != nil {
+ // close control and disconenct interface
+ return i.cc.close(true, "Interface disconnected")
+ }
+ return nil
+}
+
+// disconnect finalizes interface disconnection
+func (i *Interface) disconnect() (err error) {
+ if i.cc == nil { // disconnected
+ return nil
+ }
+
+ err = i.args.DisconnectedFunc(i)
+ if err != nil {
+ return fmt.Errorf("DisconnectedFunc: ", err)
+ }
+
+ for _, q := range i.txQueues {
+ q.close()
+ }
+ i.txQueues = []Queue{}
+
+ for _, q := range i.rxQueues {
+ q.close()
+ }
+ i.rxQueues = []Queue{}
+
+ // unmap regions
+ for _, r := range i.regions {
+ err = syscall.Munmap(r.data)
+ if err != nil {
+ return err
+ }
+ err = syscall.Close(r.fd)
+ if err != nil {
+ return err
+ }
+ }
+ i.regions = nil
+ i.cc = nil
+
+ i.peerName = ""
+ i.remoteName = ""
+
+ return nil
+}
+
+// Delete deletes the interface
+func (i *Interface) Delete() (err error) {
+ i.Disconnect()
+
+ // remove referance on socket
+ i.socket.interfaceList.Remove(i.listRef)
+ i = nil
+
+ return nil
+}
+
+// GetSocket returns the socket the interface belongs to
+func (i *Interface) GetSocket() *Socket {
+ return i.socket
+}
+
+// GetPrivateDate returns interfaces private data
+func (i *Interface) GetPrivateData() interface{} {
+ return i.args.PrivateData
+}
+
+// GetId returns interfaces id
+func (i *Interface) GetId() uint32 {
+ return i.args.Id
+}
+
+// RoleToString returns 'Master' if isMaster os true, else returns 'Slave'
+func RoleToString(isMaster bool) string {
+ if isMaster {
+ return "Master"
+ }
+ return "Slave"
+}
+
+// RequestConnection is used by slave interface to connect to a socket and
+// create a control channel
+func (i *Interface) RequestConnection() error {
+ if i.IsMaster() {
+ return fmt.Errorf("Only slave can request connection")
+ }
+ // create socket
+ fd, err := syscall.Socket(syscall.AF_UNIX, syscall.SOCK_SEQPACKET, 0)
+ if err != nil {
+ return fmt.Errorf("Failed to create UNIX domain socket: %v", err)
+ }
+ usa := &syscall.SockaddrUnix{Name: i.socket.filename}
+
+ // Connect to listener socket
+ err = syscall.Connect(fd, usa)
+ if err != nil {
+ return fmt.Errorf("Failed to connect socket %s : %v", i.socket.filename, err)
+ }
+
+ // Create control channel
+ i.cc, err = i.socket.addControlChannel(fd, i)
+ if err != nil {
+ return fmt.Errorf("Failed to create control channel: %v", err)
+ }
+
+ return nil
+}
+
+// NewInterface returns a new memif network interface. When creating an interface
+// it's id must be unique across socket with the exception of loopback interface
+// in which case the id is the same but role differs
+func (socket *Socket) NewInterface(args *Arguments) (*Interface, error) {
+ var err error
+ // make sure the ID is unique on this socket
+ for elt := socket.interfaceList.Front(); elt != nil; elt = elt.Next() {
+ i, ok := elt.Value.(*Interface)
+ if ok {
+ if i.args.Id == args.Id && i.args.IsMaster == args.IsMaster {
+ return nil, fmt.Errorf("Interface with id %u role %s already exists on this socket", args.Id, RoleToString(args.IsMaster))
+ }
+ }
+ }
+
+ // copy interface configuration
+ i := Interface{
+ args: *args,
+ }
+ // set default values
+ if i.args.MemoryConfig.NumQueuePairs == 0 {
+ i.args.MemoryConfig.NumQueuePairs = DefaultNumQueuePairs
+ }
+ if i.args.MemoryConfig.Log2RingSize == 0 {
+ i.args.MemoryConfig.Log2RingSize = DefaultLog2RingSize
+ }
+ if i.args.MemoryConfig.PacketBufferSize == 0 {
+ i.args.MemoryConfig.PacketBufferSize = DefaultPacketBufferSize
+ }
+
+ i.socket = socket
+
+ // append interface to the list
+ i.listRef = socket.interfaceList.PushBack(&i)
+
+ if i.args.IsMaster {
+ if socket.listener == nil {
+ err = socket.addListener()
+ if err != nil {
+ return nil, fmt.Errorf("Failed to create listener channel: %s", err)
+ }
+ }
+ }
+
+ return &i, nil
+}
+
+// eventFd returns an eventfd (SYS_EVENTFD2)
+func eventFd() (efd int, err error) {
+ u_efd, _, errno := syscall.Syscall(syscall.SYS_EVENTFD2, uintptr(0), uintptr(efd_nonblock), 0)
+ if errno != 0 {
+ return -1, os.NewSyscallError("eventfd", errno)
+ }
+ return int(u_efd), nil
+}
+
+// addRegions creates and adds a new memory region to the interface (slave only)
+func (i *Interface) addRegion(hasPacketBuffers bool, hasRings bool) (err error) {
+ var r memoryRegion
+
+ if hasRings {
+ r.packetBufferOffset = uint32((i.run.NumQueuePairs + i.run.NumQueuePairs) * (ringSize + descSize*(1<<i.run.Log2RingSize)))
+ } else {
+ r.packetBufferOffset = 0
+ }
+
+ if hasPacketBuffers {
+ r.size = uint64(r.packetBufferOffset + i.run.PacketBufferSize*uint32(1<<i.run.Log2RingSize)*uint32(i.run.NumQueuePairs+i.run.NumQueuePairs))
+ } else {
+ r.size = uint64(r.packetBufferOffset)
+ }
+
+ r.fd, err = memfdCreate()
+ if err != nil {
+ return err
+ }
+
+ _, _, errno := syscall.Syscall(syscall.SYS_FCNTL, uintptr(r.fd), uintptr(f_add_seals), uintptr(f_seal_shrink))
+ if errno != 0 {
+ syscall.Close(r.fd)
+ return fmt.Errorf("memfdCreate: %s", os.NewSyscallError("fcntl", errno))
+ }
+
+ err = syscall.Ftruncate(r.fd, int64(r.size))
+ if err != nil {
+ syscall.Close(r.fd)
+ r.fd = -1
+ return fmt.Errorf("memfdCreate: %s", err)
+ }
+
+ r.data, err = syscall.Mmap(r.fd, 0, int(r.size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
+ if err != nil {
+ return fmt.Errorf("addRegion: %s", err)
+ }
+
+ i.regions = append(i.regions, r)
+
+ return nil
+}
+
+// initializeRegions initializes interfaces regions (slave only)
+func (i *Interface) initializeRegions() (err error) {
+
+ err = i.addRegion(true, true)
+ if err != nil {
+ return fmt.Errorf("initializeRegions: %s", err)
+ }
+
+ return nil
+}
+
+// initializeQueues initializes interfaces queues (slave only)
+func (i *Interface) initializeQueues() (err error) {
+ var q *Queue
+ var desc descBuf
+ var slot int
+
+ desc = newDescBuf()
+ desc.setFlags(0)
+ desc.setRegion(0)
+ desc.setLength(int(i.run.PacketBufferSize))
+
+ for qid := 0; qid < int(i.run.NumQueuePairs); qid++ {
+ /* TX */
+ q = &Queue{
+ ring: i.newRing(0, ringTypeS2M, qid),
+ lastHead: 0,
+ lastTail: 0,
+ i: i,
+ }
+ q.ring.setCookie(cookie)
+ q.ring.setFlags(1)
+ q.interruptFd, err = eventFd()
+ if err != nil {
+ return err
+ }
+ q.putRing()
+ i.txQueues = append(i.txQueues, *q)
+
+ for j := 0; j < q.ring.size; j++ {
+ slot = qid*q.ring.size + j
+ desc.setOffset(int(i.regions[0].packetBufferOffset + uint32(slot)*i.run.PacketBufferSize))
+ q.putDescBuf(slot, desc)
+ }
+ }
+ for qid := 0; qid < int(i.run.NumQueuePairs); qid++ {
+ /* RX */
+ q = &Queue{
+ ring: i.newRing(0, ringTypeM2S, qid),
+ lastHead: 0,
+ lastTail: 0,
+ i: i,
+ }
+ q.ring.setCookie(cookie)
+ q.ring.setFlags(1)
+ q.interruptFd, err = eventFd()
+ if err != nil {
+ return err
+ }
+ q.putRing()
+ i.rxQueues = append(i.rxQueues, *q)
+
+ for j := 0; j < q.ring.size; j++ {
+ slot = qid*q.ring.size + j
+ desc.setOffset(int(i.regions[0].packetBufferOffset + uint32(slot)*i.run.PacketBufferSize))
+ q.putDescBuf(slot, desc)
+ }
+ }
+
+ return nil
+}
+
+// connect finalizes interface connection
+func (i *Interface) connect() (err error) {
+ for rid, _ := range i.regions {
+ r := &i.regions[rid]
+ if r.data == nil {
+ r.data, err = syscall.Mmap(r.fd, 0, int(r.size), syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
+ if err != nil {
+ return fmt.Errorf("Mmap: %s", err)
+ }
+ }
+ }
+
+ for _, q := range i.txQueues {
+ q.updateRing()
+
+ if q.ring.getCookie() != cookie {
+ return fmt.Errorf("Wrong cookie")
+ }
+
+ q.lastHead = 0
+ q.lastTail = 0
+ }
+
+ for _, q := range i.rxQueues {
+ q.updateRing()
+
+ if q.ring.getCookie() != cookie {
+ return fmt.Errorf("Wrong cookie")
+ }
+
+ q.lastHead = 0
+ q.lastTail = 0
+ }
+
+ return i.args.ConnectedFunc(i)
+}
diff --git a/extras/gomemif/memif/interface_unsafe.go b/extras/gomemif/memif/interface_unsafe.go
new file mode 100644
index 00000000000..f5cbc2ed207
--- /dev/null
+++ b/extras/gomemif/memif/interface_unsafe.go
@@ -0,0 +1,40 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package memif
+
+import (
+ "fmt"
+ "os"
+ "syscall"
+ "unsafe"
+)
+
+// memfdCreate returns memory file file descriptor (memif.sys_memfd_create)
+func memfdCreate() (mfd int, err error) {
+ p0, err := syscall.BytePtrFromString("memif_region_0")
+ if err != nil {
+ return -1, fmt.Errorf("memfdCreate: %s", err)
+ }
+
+ u_mfd, _, errno := syscall.Syscall(sys_memfd_create, uintptr(unsafe.Pointer(p0)), uintptr(mfd_allow_sealing), uintptr(0))
+ if errno != 0 {
+ return -1, fmt.Errorf("memfdCreate: %s", os.NewSyscallError("memfd_create", errno))
+ }
+
+ return int(u_mfd), nil
+}
diff --git a/extras/gomemif/memif/memif.go b/extras/gomemif/memif/memif.go
new file mode 100644
index 00000000000..1a7857de03e
--- /dev/null
+++ b/extras/gomemif/memif/memif.go
@@ -0,0 +1,345 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package memif
+
+import (
+ "encoding/binary"
+ "fmt"
+ "syscall"
+)
+
+const cookie = 0x3E31F20
+
+// VersionMajor is memif protocols major version
+const VersionMajor = 2
+
+// VersionMinor is memif protocols minor version
+const VersionMinor = 0
+
+// Version is memif protocols version as uint16
+// (M-Major m-minor: MMMMMMMMmmmmmmmm)
+const Version = ((VersionMajor << 8) | VersionMinor)
+
+type msgType uint16
+
+const (
+ msgTypeNone msgType = iota
+ msgTypeAck
+ msgTypeHello
+ msgTypeInit
+ msgTypeAddRegion
+ msgTypeAddRing
+ msgTypeConnect
+ msgTypeConnected
+ msgTypeDisconnect
+)
+
+type interfaceMode uint8
+
+const (
+ interfaceModeEthernet interfaceMode = iota
+ interfaceModeIp
+ interfaceModePuntInject
+)
+
+const msgSize = 128
+const msgTypeSize = 2
+
+const msgAddRingFlagS2M = (1 << 0)
+
+// Descriptor flags
+//
+// next buffer present
+const descFlagNext = (1 << 0)
+
+// Ring flags
+//
+// Interrupt
+const ringFlagInterrupt = 1
+
+func min16(a uint16, b uint16) uint16 {
+ if a < b {
+ return a
+ }
+ return b
+}
+
+func min8(a uint8, b uint8) uint8 {
+ if a < b {
+ return a
+ }
+ return b
+}
+
+type MsgHello struct {
+ // app name
+ Name [32]byte
+ VersionMin uint16
+ VersionMax uint16
+ MaxRegion uint16
+ MaxRingM2S uint16
+ MaxRingS2M uint16
+ MaxLog2RingSize uint8
+}
+
+type MsgInit struct {
+ Version uint16
+ Id uint32
+ Mode interfaceMode
+ Secret [24]byte
+ // app name
+ Name [32]byte
+}
+
+type MsgAddRegion struct {
+ Index uint16
+ Size uint64
+}
+
+type MsgAddRing struct {
+ Flags uint16
+ Index uint16
+ Region uint16
+ Offset uint32
+ RingSizeLog2 uint8
+ PrivateHdrSize uint16
+}
+
+type MsgConnect struct {
+ // interface name
+ Name [32]byte
+}
+
+type MsgConnected struct {
+ // interface name
+ Name [32]byte
+}
+
+type MsgDisconnect struct {
+ Code uint32
+ String [96]byte
+}
+
+/* DESCRIPTOR BEGIN */
+
+const descSize = 16
+
+// desc field offsets
+const descFlagsOffset = 0
+const descRegionOffset = 2
+const descLengthOffset = 4
+const descOffsetOffset = 8
+const descMetadataOffset = 12
+
+// descBuf represents a memif descriptor as array of bytes
+type descBuf []byte
+
+// newDescBuf returns new descriptor buffer
+func newDescBuf() descBuf {
+ return make(descBuf, descSize)
+}
+
+// getDescBuff copies descriptor from shared memory to descBuf
+func (q *Queue) getDescBuf(slot int, db descBuf) {
+ copy(db, q.i.regions[q.ring.region].data[q.ring.offset+ringSize+slot*descSize:])
+}
+
+// putDescBuf copies contents of descriptor buffer into shared memory
+func (q *Queue) putDescBuf(slot int, db descBuf) {
+ copy(q.i.regions[q.ring.region].data[q.ring.offset+ringSize+slot*descSize:], db)
+}
+
+func (db descBuf) getFlags() int {
+ return (int)(binary.LittleEndian.Uint16((db)[descFlagsOffset:]))
+}
+
+func (db descBuf) getRegion() int {
+ return (int)(binary.LittleEndian.Uint16((db)[descRegionOffset:]))
+}
+
+func (db descBuf) getLength() int {
+ return (int)(binary.LittleEndian.Uint32((db)[descLengthOffset:]))
+}
+
+func (db descBuf) getOffset() int {
+ return (int)(binary.LittleEndian.Uint32((db)[descOffsetOffset:]))
+}
+
+func (db descBuf) getMetadata() int {
+ return (int)(binary.LittleEndian.Uint32((db)[descMetadataOffset:]))
+}
+
+func (db descBuf) setFlags(val int) {
+ binary.LittleEndian.PutUint16((db)[descFlagsOffset:], uint16(val))
+}
+
+func (db descBuf) setRegion(val int) {
+ binary.LittleEndian.PutUint16((db)[descRegionOffset:], uint16(val))
+}
+
+func (db descBuf) setLength(val int) {
+ binary.LittleEndian.PutUint32((db)[descLengthOffset:], uint32(val))
+}
+
+func (db descBuf) setOffset(val int) {
+ binary.LittleEndian.PutUint32((db)[descOffsetOffset:], uint32(val))
+}
+
+func (db descBuf) setMetadata(val int) {
+ binary.LittleEndian.PutUint32((db)[descMetadataOffset:], uint32(val))
+}
+
+/* DESCRIPTOR END */
+
+/* RING BEGIN */
+
+type ringType uint8
+
+const (
+ ringTypeS2M ringType = iota
+ ringTypeM2S
+)
+
+const ringSize = 128
+
+// ring field offsets
+const ringCookieOffset = 0
+const ringFlagsOffset = 4
+const ringHeadOffset = 6
+const ringTailOffset = 64
+
+// ringBuf represents a memif ring as array of bytes
+type ringBuf []byte
+
+type ring struct {
+ ringType ringType
+ size int
+ log2Size int
+ region int
+ rb ringBuf
+ offset int
+}
+
+// newRing returns new memif ring based on data received in msgAddRing (master only)
+func newRing(regionIndex int, ringType ringType, ringOffset int, log2RingSize int) *ring {
+ r := &ring{
+ ringType: ringType,
+ size: (1 << log2RingSize),
+ log2Size: log2RingSize,
+ rb: make(ringBuf, ringSize),
+ offset: ringOffset,
+ }
+
+ return r
+}
+
+// newRing returns a new memif ring
+func (i *Interface) newRing(regionIndex int, ringType ringType, ringIndex int) *ring {
+ r := &ring{
+ ringType: ringType,
+ size: (1 << i.run.Log2RingSize),
+ log2Size: int(i.run.Log2RingSize),
+ rb: make(ringBuf, ringSize),
+ }
+
+ rSize := ringSize + descSize*r.size
+ if r.ringType == ringTypeS2M {
+ r.offset = 0
+ } else {
+ r.offset = int(i.run.NumQueuePairs) * rSize
+ }
+ r.offset += ringIndex * rSize
+
+ return r
+}
+
+// putRing put the ring to the shared memory
+func (q *Queue) putRing() {
+ copy(q.i.regions[q.ring.region].data[q.ring.offset:], q.ring.rb)
+}
+
+// updateRing updates ring with data from shared memory
+func (q *Queue) updateRing() {
+ copy(q.ring.rb, q.i.regions[q.ring.region].data[q.ring.offset:])
+}
+
+func (r *ring) getCookie() int {
+ return (int)(binary.LittleEndian.Uint32((r.rb)[ringCookieOffset:]))
+}
+
+// getFlags returns the flags value from ring buffer
+// Use Queue.getFlags in fast-path to avoid updating the whole ring.
+func (r *ring) getFlags() int {
+ return (int)(binary.LittleEndian.Uint16((r.rb)[ringFlagsOffset:]))
+}
+
+// getHead returns the head pointer value from ring buffer.
+// Use readHead in fast-path to avoid updating the whole ring.
+func (r *ring) getHead() int {
+ return (int)(binary.LittleEndian.Uint16((r.rb)[ringHeadOffset:]))
+}
+
+// getTail returns the tail pointer value from ring buffer.
+// Use readTail in fast-path to avoid updating the whole ring.
+func (r *ring) getTail() int {
+ return (int)(binary.LittleEndian.Uint16((r.rb)[ringTailOffset:]))
+}
+
+func (r *ring) setCookie(val int) {
+ binary.LittleEndian.PutUint32((r.rb)[ringCookieOffset:], uint32(val))
+}
+
+func (r *ring) setFlags(val int) {
+ binary.LittleEndian.PutUint16((r.rb)[ringFlagsOffset:], uint16(val))
+}
+
+// setHead set the head pointer value int the ring buffer.
+// Use writeHead in fast-path to avoid putting the whole ring into shared memory.
+func (r *ring) setHead(val int) {
+ binary.LittleEndian.PutUint16((r.rb)[ringHeadOffset:], uint16(val))
+}
+
+// setTail set the tail pointer value int the ring buffer.
+// Use writeTail in fast-path to avoid putting the whole ring into shared memory.
+func (r *ring) setTail(val int) {
+ binary.LittleEndian.PutUint16((r.rb)[ringTailOffset:], uint16(val))
+}
+
+/* RING END */
+
+// isInterrupt returns true if the queue is in interrupt mode
+func (q *Queue) isInterrupt() bool {
+ return (q.getFlags() & ringFlagInterrupt) == 0
+}
+
+// interrupt performs an interrupt if the queue is in interrupt mode
+func (q *Queue) interrupt() error {
+ if q.isInterrupt() {
+ buf := make([]byte, 8)
+ binary.PutUvarint(buf, 1)
+ n, err := syscall.Write(q.interruptFd, buf[:])
+ if err != nil {
+ return err
+ }
+ if n != 8 {
+ return fmt.Errorf("Faild to write to eventfd")
+ }
+ }
+
+ return nil
+}
diff --git a/extras/gomemif/memif/memif_unsafe.go b/extras/gomemif/memif/memif_unsafe.go
new file mode 100644
index 00000000000..4469d26e982
--- /dev/null
+++ b/extras/gomemif/memif/memif_unsafe.go
@@ -0,0 +1,55 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package memif
+
+import (
+ "unsafe"
+)
+
+// readHead reads ring head directly form the shared memory
+func (q *Queue) readHead() (head int) {
+ return (int)(*(*uint16)(unsafe.Pointer(&q.i.regions[q.ring.region].data[q.ring.offset+ringHeadOffset])))
+ // return atomicload16(&q.i.regions[q.region].data[q.offset + descHeadOffset])
+}
+
+// readTail reads ring tail directly form the shared memory
+func (q *Queue) readTail() (tail int) {
+ return (int)(*(*uint16)(unsafe.Pointer(&q.i.regions[q.ring.region].data[q.ring.offset+ringTailOffset])))
+ // return atomicload16(&q.i.regions[q.region].data[q.offset + descTailOffset])
+}
+
+// writeHead writes ring head directly to the shared memory
+func (q *Queue) writeHead(value int) {
+ *(*uint16)(unsafe.Pointer(&q.i.regions[q.ring.region].data[q.ring.offset+ringHeadOffset])) = *(*uint16)(unsafe.Pointer(&value))
+ //atomicstore16(&q.i.regions[q.region].data[q.offset + descHeadOffset], value)
+}
+
+// writeTail writes ring tail directly to the shared memory
+func (q *Queue) writeTail(value int) {
+ *(*uint16)(unsafe.Pointer(&q.i.regions[q.ring.region].data[q.ring.offset+ringTailOffset])) = *(*uint16)(unsafe.Pointer(&value))
+ //atomicstore16(&q.i.regions[q.region].data[q.offset + descTailOffset], value)
+}
+
+func (q *Queue) setDescLength(slot int, length int) {
+ *(*uint16)(unsafe.Pointer(&q.i.regions[q.ring.region].data[q.ring.offset+ringSize+slot*descSize+descLengthOffset])) = *(*uint16)(unsafe.Pointer(&length))
+}
+
+// getFlags reads ring flags directly from the shared memory
+func (q *Queue) getFlags() int {
+ return (int)(*(*uint16)(unsafe.Pointer(&q.i.regions[q.ring.region].data[q.ring.offset+ringFlagsOffset])))
+}
diff --git a/extras/gomemif/memif/packet_reader.go b/extras/gomemif/memif/packet_reader.go
new file mode 100644
index 00000000000..58338f6f2ab
--- /dev/null
+++ b/extras/gomemif/memif/packet_reader.go
@@ -0,0 +1,91 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package memif
+
+import "fmt"
+
+// ReadPacket reads one packet form the shared memory and
+// returns the number of bytes read
+func (q *Queue) ReadPacket(pkt []byte) (int, error) {
+ var mask int = q.ring.size - 1
+ var slot int
+ var lastSlot int
+ var length int
+ var offset int
+ var pktOffset int = 0
+ var nSlots uint16
+ var desc descBuf = newDescBuf()
+
+ if q.i.args.IsMaster {
+ slot = int(q.lastHead)
+ lastSlot = q.readHead()
+ } else {
+ slot = int(q.lastTail)
+ lastSlot = q.readTail()
+ }
+
+ nSlots = uint16(lastSlot - slot)
+ if nSlots == 0 {
+ goto refill
+ }
+
+ // copy descriptor from shm
+ q.getDescBuf(slot&mask, desc)
+ length = desc.getLength()
+ offset = desc.getOffset()
+
+ copy(pkt[:], q.i.regions[desc.getRegion()].data[offset:offset+length])
+ pktOffset += length
+
+ slot++
+ nSlots--
+
+ for (desc.getFlags() & descFlagNext) == descFlagNext {
+ if nSlots == 0 {
+ return 0, fmt.Errorf("Incomplete chained buffer, may suggest peer error.")
+ }
+
+ q.getDescBuf(slot&mask, desc)
+ length = desc.getLength()
+ offset = desc.getOffset()
+
+ copy(pkt[pktOffset:], q.i.regions[desc.getRegion()].data[offset:offset+length])
+ pktOffset += length
+
+ slot++
+ nSlots--
+ }
+
+refill:
+ if q.i.args.IsMaster {
+ q.lastHead = uint16(slot)
+ q.writeTail(slot)
+ } else {
+ q.lastTail = uint16(slot)
+
+ head := q.readHead()
+
+ for nSlots := uint16(q.ring.size - head + int(q.lastTail)); nSlots > 0; nSlots-- {
+ q.setDescLength(head&mask, int(q.i.run.PacketBufferSize))
+ head++
+ }
+ q.writeHead(head)
+ }
+
+ return pktOffset, nil
+}
diff --git a/extras/gomemif/memif/packet_writer.go b/extras/gomemif/memif/packet_writer.go
new file mode 100644
index 00000000000..702044f4b49
--- /dev/null
+++ b/extras/gomemif/memif/packet_writer.go
@@ -0,0 +1,95 @@
+/*
+ *------------------------------------------------------------------
+ * Copyright (c) 2020 Cisco and/or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *------------------------------------------------------------------
+ */
+
+package memif
+
+// WritePacket writes one packet to the shared memory and
+// returns the number of bytes written
+func (q *Queue) WritePacket(pkt []byte) int {
+ var mask int = q.ring.size - 1
+ var slot int
+ var nFree uint16
+ var packetBufferSize int = int(q.i.run.PacketBufferSize)
+
+ if q.i.args.IsMaster {
+ slot = q.readTail()
+ nFree = uint16(q.readHead() - slot)
+ } else {
+ slot = q.readHead()
+ nFree = uint16(q.ring.size - slot + q.readTail())
+ }
+
+ if nFree == 0 {
+ q.interrupt()
+ return 0
+ }
+
+ // copy descriptor from shm
+ desc := newDescBuf()
+ q.getDescBuf(slot&mask, desc)
+ // reset flags
+ desc.setFlags(0)
+ // reset length
+ if q.i.args.IsMaster {
+ packetBufferSize = desc.getLength()
+ }
+ desc.setLength(0)
+ offset := desc.getOffset()
+
+ // write packet into memif buffer
+ n := copy(q.i.regions[desc.getRegion()].data[offset:offset+packetBufferSize], pkt[:])
+ desc.setLength(n)
+ for n < len(pkt) {
+ nFree--
+ if nFree == 0 {
+ q.interrupt()
+ return 0
+ }
+ desc.setFlags(descFlagNext)
+ q.putDescBuf(slot&mask, desc)
+ slot++
+
+ // copy descriptor from shm
+ q.getDescBuf(slot&mask, desc)
+ // reset flags
+ desc.setFlags(0)
+ // reset length
+ if q.i.args.IsMaster {
+ packetBufferSize = desc.getLength()
+ }
+ desc.setLength(0)
+ offset := desc.getOffset()
+
+ tmp := copy(q.i.regions[desc.getRegion()].data[offset:offset+packetBufferSize], pkt[:])
+ desc.setLength(tmp)
+ n += tmp
+ }
+
+ // copy descriptor to shm
+ q.putDescBuf(slot&mask, desc)
+ slot++
+
+ if q.i.args.IsMaster {
+ q.writeTail(slot)
+ } else {
+ q.writeHead(slot)
+ }
+
+ q.interrupt()
+
+ return n
+}