aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVladimir Lavor <vlavor@cisco.com>2020-10-12 14:21:05 +0200
committerOndrej Fabry <ofabry@cisco.com>2020-10-15 08:30:21 +0000
commit9ea1f778fb1458ce6b2265941885eab0b34b33d7 (patch)
treec1e8c18f9a626ced15c6ea6cf72c053931f6170a
parent8d4ee12e94e634b38f1dc55c830f8e222822215f (diff)
Stream API options
* Stream API uses the same default values as the Channel API * request size, reply size and reply timeout settable using functional options * Added stream client example to show the stream API usage Change-Id: Id599134a7f520fc19f7d770ed5e3de74a7936829 Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
-rw-r--r--.gitignore1
-rw-r--r--CHANGELOG.md1
-rw-r--r--api/api.go8
-rw-r--r--core/stream.go58
-rw-r--r--examples/simple-client/simple_client.go110
-rw-r--r--examples/stream-client/stream_client.go302
6 files changed, 382 insertions, 98 deletions
diff --git a/.gitignore b/.gitignore
index 8a782d5..8e61e14 100644
--- a/.gitignore
+++ b/.gitignore
@@ -17,4 +17,5 @@ examples/perf-bench/perf-bench
examples/rpc-service/rpc-service
examples/simple-client/simple-client
examples/stats-client/stats-client
+examples/stream-client/stream-client
examples/union-example/union-example
diff --git a/CHANGELOG.md b/CHANGELOG.md
index e35ad0f..3bd9357 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -63,6 +63,7 @@ This file lists changes for the GoVPP releases.
- improved [simple client](examples/simple-client) example to work properly even with multiple runs
- added [multi-vpp](examples/multi-vpp) example displaying management of two VPP instances from single
application
+- added [stream-client](examples/stream-client) example showing usage of the new stream API
#### Dependencies
- updated `github.com/sirupsen/logrus` dep to `v1.6.0`
diff --git a/api/api.go b/api/api.go
index 977b02e..93f2b42 100644
--- a/api/api.go
+++ b/api/api.go
@@ -25,7 +25,7 @@ import (
type Connection interface {
// NewStream creates a new stream for sending and receiving messages.
// Context can be used to close the stream using cancel or timeout.
- NewStream(ctx context.Context) (Stream, error)
+ NewStream(ctx context.Context, options ...StreamOption) (Stream, error)
// Invoke can be used for a simple request-reply RPC.
// It creates stream and calls SendMsg with req and RecvMsg with reply.
@@ -57,6 +57,12 @@ type Stream interface {
Close() error
}
+// StreamOption allows customizing a Stream. Available options are:
+// - WithRequestSize
+// - WithReplySize
+// - WithReplyTimeout
+type StreamOption func(Stream)
+
// ChannelProvider provides the communication channel with govpp core.
type ChannelProvider interface {
// NewAPIChannel returns a new channel for communication with VPP via govpp core.
diff --git a/core/stream.go b/core/stream.go
index 61a9965..abe9d55 100644
--- a/core/stream.go
+++ b/core/stream.go
@@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"sync/atomic"
+ "time"
"git.fd.io/govpp.git/api"
)
@@ -29,36 +30,43 @@ type Stream struct {
conn *Connection
ctx context.Context
channel *Channel
+ // available options
+ requestSize int
+ replySize int
+ replyTimeout time.Duration
}
-func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) {
+func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) {
if c == nil {
return nil, errors.New("nil connection passed in")
}
- // TODO: add stream options as variadic parameters for customizing:
- // - request/reply channel size
- // - reply timeout
- // - retries
- // - ???
+ s := &Stream{
+ conn: c,
+ ctx: ctx,
+ // default options
+ requestSize: RequestChanBufSize,
+ replySize: ReplyChanBufSize,
+ replyTimeout: DefaultReplyTimeout,
+ }
- // create new channel
- chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
- channel := newChannel(chID, c, c.codec, c, 10, 10)
+ // parse custom options
+ for _, option := range options {
+ option(s)
+ }
+ // create and store a new channel
+ s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff
+ s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize)
+ s.channel.SetReplyTimeout(s.replyTimeout)
// store API channel within the client
c.channelsLock.Lock()
- c.channels[chID] = channel
+ c.channels[uint16(s.id)] = s.channel
c.channelsLock.Unlock()
// Channel.watchRequests are not started here intentionally, because
// requests are sent directly by SendMsg.
- return &Stream{
- id: uint32(chID),
- conn: c,
- ctx: ctx,
- channel: channel,
- }, nil
+ return s, nil
}
func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error {
@@ -123,6 +131,24 @@ func (s *Stream) RecvMsg() (api.Message, error) {
return msg, nil
}
+func WithRequestSize(size int) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).requestSize = size
+ }
+}
+
+func WithReplySize(size int) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).replySize = size
+ }
+}
+
+func WithReplyTimeout(timeout time.Duration) api.StreamOption {
+ return func(stream api.Stream) {
+ stream.(*Stream).replyTimeout = timeout
+ }
+}
+
func (s *Stream) recvReply() (*vppReply, error) {
if s.conn == nil {
return nil, errors.New("stream closed")
diff --git a/examples/simple-client/simple_client.go b/examples/simple-client/simple_client.go
index d823273..0898c0a 100644
--- a/examples/simple-client/simple_client.go
+++ b/examples/simple-client/simple_client.go
@@ -17,7 +17,6 @@
package main
import (
- "context"
"encoding/json"
"flag"
"fmt"
@@ -31,7 +30,6 @@ import (
"git.fd.io/govpp.git/binapi/interface_types"
"git.fd.io/govpp.git/binapi/ip"
"git.fd.io/govpp.git/binapi/ip_types"
- "git.fd.io/govpp.git/binapi/mactime"
"git.fd.io/govpp.git/binapi/vpe"
"git.fd.io/govpp.git/core"
)
@@ -44,9 +42,10 @@ func main() {
flag.Parse()
fmt.Println("Starting simple client example")
+ fmt.Println()
// connect to VPP asynchronously
- conn, conev, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
+ conn, connEv, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
if err != nil {
log.Fatalln("ERROR:", err)
}
@@ -54,56 +53,46 @@ func main() {
// wait for Connected event
select {
- case e := <-conev:
+ case e := <-connEv:
if e.State != core.Connected {
log.Fatalln("ERROR: connecting to VPP failed:", e.Error)
}
}
- // create an API channel that will be used in the examples
+ // check compatibility of used messages
ch, err := conn.NewAPIChannel()
if err != nil {
log.Fatalln("ERROR: creating channel failed:", err)
}
defer ch.Close()
-
if err := ch.CheckCompatiblity(vpe.AllMessages()...); err != nil {
log.Fatal(err)
}
-
- vppVersion(ch)
-
if err := ch.CheckCompatiblity(interfaces.AllMessages()...); err != nil {
log.Fatal(err)
}
+ // process errors encountered during the example
+ defer func() {
+ if len(Errors) > 0 {
+ fmt.Printf("finished with %d errors\n", len(Errors))
+ os.Exit(1)
+ } else {
+ fmt.Println("finished successfully")
+ }
+ }()
+
+ // use request/reply (channel API)
+ getVppVersion(ch)
idx := createLoopback(ch)
interfaceDump(ch)
-
addIPAddress(ch, idx)
ipAddressDump(ch, idx)
interfaceNotifications(ch, idx)
-
- mactimeDump(conn)
-
- if len(Errors) > 0 {
- fmt.Printf("finished with %d errors\n", len(Errors))
- os.Exit(1)
- } else {
- fmt.Println("finished successfully")
- }
}
-var Errors []error
-
-func logError(err error, msg string) {
- fmt.Printf("ERROR: %s: %v\n", msg, err)
- Errors = append(Errors, err)
-}
-
-// vppVersion is the simplest API example - it retrieves VPP version.
-func vppVersion(ch api.Channel) {
- fmt.Println("Retrieving version")
+func getVppVersion(ch api.Channel) {
+ fmt.Println("Retrieving version..")
req := &vpe.ShowVersion{}
reply := &vpe.ShowVersionReply{}
@@ -112,16 +101,14 @@ func vppVersion(ch api.Channel) {
logError(err, "retrieving version")
return
}
- fmt.Printf("reply: %+v\n", reply)
fmt.Printf("VPP version: %q\n", reply.Version)
fmt.Println("OK")
fmt.Println()
}
-// createLoopback sends request to create loopback interface.
func createLoopback(ch api.Channel) interface_types.InterfaceIndex {
- fmt.Println("Creating loopback interface")
+ fmt.Println("Creating loopback interface..")
req := &interfaces.CreateLoopback{}
reply := &interfaces.CreateLoopbackReply{}
@@ -130,7 +117,6 @@ func createLoopback(ch api.Channel) interface_types.InterfaceIndex {
logError(err, "creating loopback interface")
return 0
}
- fmt.Printf("reply: %+v\n", reply)
fmt.Printf("interface index: %v\n", reply.SwIfIndex)
fmt.Println("OK")
@@ -139,9 +125,8 @@ func createLoopback(ch api.Channel) interface_types.InterfaceIndex {
return reply.SwIfIndex
}
-// interfaceDump shows an example of multipart request (multiple replies are expected).
func interfaceDump(ch api.Channel) {
- fmt.Println("Dumping interfaces")
+ fmt.Println("Dumping interfaces..")
n := 0
reqCtx := ch.SendMultiRequest(&interfaces.SwInterfaceDump{
@@ -166,9 +151,8 @@ func interfaceDump(ch api.Channel) {
fmt.Println()
}
-// addIPAddress sends request to add IP address to interface.
func addIPAddress(ch api.Channel, index interface_types.InterfaceIndex) {
- fmt.Printf("Adding IP address to interface to interface index %d\n", index)
+ fmt.Printf("Adding IP address to interface index %d\n", index)
req := &interfaces.SwInterfaceAddDelAddress{
SwIfIndex: index,
@@ -188,14 +172,13 @@ func addIPAddress(ch api.Channel, index interface_types.InterfaceIndex) {
logError(err, "adding IP address to interface")
return
}
- fmt.Printf("reply: %+v\n", reply)
fmt.Println("OK")
fmt.Println()
}
func ipAddressDump(ch api.Channel, index interface_types.InterfaceIndex) {
- fmt.Printf("Dumping IP addresses for interface index %d\n", index)
+ fmt.Printf("Dumping IP addresses for interface index %d..\n", index)
req := &ip.IPAddressDump{
SwIfIndex: index,
@@ -293,48 +276,6 @@ func interfaceNotifications(ch api.Channel, index interface_types.InterfaceIndex
fmt.Println()
}
-func mactimeDump(conn api.Connection) {
- fmt.Println("Sending mactime dump")
-
- ctx := context.Background()
-
- stream, err := conn.NewStream(ctx)
- if err != nil {
- panic(err)
- }
- defer stream.Close()
-
- if err := stream.SendMsg(&mactime.MactimeDump{}); err != nil {
- logError(err, "sending mactime dump")
- return
- }
-
-Loop:
- for {
- msg, err := stream.RecvMsg()
- if err != nil {
- logError(err, "dumping mactime")
- return
- }
-
- switch msg.(type) {
- case *mactime.MactimeDetails:
- fmt.Printf(" - MactimeDetails: %+v\n", msg)
-
- case *mactime.MactimeDumpReply:
- fmt.Printf(" - MactimeDumpReply: %+v\n", msg)
- break Loop
-
- default:
- logError(err, "unexpected message")
- return
- }
- }
-
- fmt.Println("OK")
- fmt.Println()
-}
-
func marshal(v interface{}) {
fmt.Printf("GO: %#v\n", v)
b, err := json.MarshalIndent(v, "", " ")
@@ -343,3 +284,10 @@ func marshal(v interface{}) {
}
fmt.Printf("JSON: %s\n", b)
}
+
+var Errors []error
+
+func logError(err error, msg string) {
+ fmt.Printf("ERROR: %s: %v\n", msg, err)
+ Errors = append(Errors, err)
+}
diff --git a/examples/stream-client/stream_client.go b/examples/stream-client/stream_client.go
new file mode 100644
index 0000000..fadfe23
--- /dev/null
+++ b/examples/stream-client/stream_client.go
@@ -0,0 +1,302 @@
+// Copyright (c) 2017 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// stream-client is an example VPP management application that exercises the
+// govpp API on real-world use-cases.
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "time"
+
+ "git.fd.io/govpp.git"
+ "git.fd.io/govpp.git/adapter/socketclient"
+ "git.fd.io/govpp.git/api"
+ interfaces "git.fd.io/govpp.git/binapi/interface"
+ "git.fd.io/govpp.git/binapi/interface_types"
+ "git.fd.io/govpp.git/binapi/ip"
+ "git.fd.io/govpp.git/binapi/ip_types"
+ "git.fd.io/govpp.git/binapi/mactime"
+ "git.fd.io/govpp.git/binapi/vpe"
+ "git.fd.io/govpp.git/core"
+)
+
+var (
+ sockAddr = flag.String("sock", socketclient.DefaultSocketName, "Path to VPP binary API socket file")
+)
+
+func main() {
+ flag.Parse()
+
+ fmt.Println("Starting stream client example")
+ fmt.Println()
+
+ // connect to VPP asynchronously
+ conn, connEv, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
+ if err != nil {
+ log.Fatalln("ERROR:", err)
+ }
+ defer conn.Disconnect()
+
+ // wait for Connected event
+ select {
+ case e := <-connEv:
+ if e.State != core.Connected {
+ log.Fatalln("ERROR: connecting to VPP failed:", e.Error)
+ }
+ }
+
+ // check compatibility of used messages
+ ch, err := conn.NewAPIChannel()
+ if err != nil {
+ log.Fatalln("ERROR: creating channel failed:", err)
+ }
+ defer ch.Close()
+ if err := ch.CheckCompatiblity(vpe.AllMessages()...); err != nil {
+ log.Fatal(err)
+ }
+ if err := ch.CheckCompatiblity(interfaces.AllMessages()...); err != nil {
+ log.Fatal(err)
+ }
+
+ // process errors encountered during the example
+ defer func() {
+ if len(Errors) > 0 {
+ fmt.Printf("finished with %d errors\n", len(Errors))
+ os.Exit(1)
+ } else {
+ fmt.Println("finished successfully")
+ }
+ }()
+
+ // send and receive messages using stream (low-low level API)
+ stream, err := conn.NewStream(context.Background(),
+ core.WithRequestSize(50),
+ core.WithReplySize(50),
+ core.WithReplyTimeout(2*time.Second))
+ if err != nil {
+ panic(err)
+ }
+ defer func() {
+ if err := stream.Close(); err != nil {
+ logError(err, "closing the stream")
+ }
+ }()
+ getVppVersionStream(stream)
+ idx := createLoopbackStream(stream)
+ interfaceDumpStream(stream)
+ addIPAddressStream(stream, idx)
+ ipAddressDumpStream(stream, idx)
+ mactimeDump(stream)
+ return
+}
+
+func getVppVersionStream(stream api.Stream) {
+ fmt.Println("Retrieving version..")
+
+ req := &vpe.ShowVersion{}
+ if err := stream.SendMsg(req); err != nil {
+ logError(err, "ShowVersion sending message")
+ return
+ }
+ recv, err := stream.RecvMsg()
+ if err != nil {
+ logError(err, "ShowVersion receive message")
+ return
+ }
+ recvMsg := recv.(*vpe.ShowVersionReply)
+
+ fmt.Printf("Retrieved VPP version: %q\n", recvMsg.Version)
+ fmt.Println("OK")
+ fmt.Println()
+}
+
+func createLoopbackStream(stream api.Stream) (ifIdx interface_types.InterfaceIndex) {
+ fmt.Println("Creating the loopback interface..")
+
+ req := &interfaces.CreateLoopback{}
+ if err := stream.SendMsg(req); err != nil {
+ logError(err, "CreateLoopback sending message")
+ return
+ }
+ recv, err := stream.RecvMsg()
+ if err != nil {
+ logError(err, "CreateLoopback receive message")
+ return
+ }
+ recvMsg := recv.(*interfaces.CreateLoopbackReply)
+
+ fmt.Printf("Loopback interface index: %v\n", recvMsg.SwIfIndex)
+ fmt.Println("OK")
+ fmt.Println()
+
+ return recvMsg.SwIfIndex
+}
+
+func interfaceDumpStream(stream api.Stream) {
+ fmt.Println("Dumping interfaces..")
+
+ if err := stream.SendMsg(&interfaces.SwInterfaceDump{
+ SwIfIndex: ^interface_types.InterfaceIndex(0),
+ }); err != nil {
+ logError(err, "SwInterfaceDump sending message")
+ return
+ }
+ if err := stream.SendMsg(&vpe.ControlPing{}); err != nil {
+ logError(err, "ControlPing sending message")
+ return
+ }
+
+Loop:
+ for {
+ msg, err := stream.RecvMsg()
+ if err != nil {
+ logError(err, "SwInterfaceDump receiving message ")
+ return
+ }
+
+ switch msg.(type) {
+ case *interfaces.SwInterfaceDetails:
+ fmt.Printf(" - SwInterfaceDetails: %+v\n", msg)
+
+ case *vpe.ControlPingReply:
+ fmt.Printf(" - ControlPingReply: %+v\n", msg)
+ break Loop
+
+ default:
+ logError(err, "unexpected message")
+ return
+ }
+ }
+
+ fmt.Println("OK")
+ fmt.Println()
+}
+
+func addIPAddressStream(stream api.Stream, index interface_types.InterfaceIndex) {
+ fmt.Printf("Adding IP address to the interface index %d..\n", index)
+
+ if err := stream.SendMsg(&interfaces.SwInterfaceAddDelAddress{
+ SwIfIndex: index,
+ IsAdd: true,
+ Prefix: ip_types.AddressWithPrefix{
+ Address: ip_types.Address{
+ Af: ip_types.ADDRESS_IP4,
+ Un: ip_types.AddressUnionIP4(ip_types.IP4Address{10, 10, 0, uint8(index)}),
+ },
+ Len: 32,
+ },
+ }); err != nil {
+ logError(err, "SwInterfaceAddDelAddress sending message")
+ return
+ }
+
+ recv, err := stream.RecvMsg()
+ if err != nil {
+ logError(err, "SwInterfaceAddDelAddressReply receiving message")
+ return
+ }
+ recvMsg := recv.(*interfaces.SwInterfaceAddDelAddressReply)
+
+ fmt.Printf("Added IP address to interface: %v (return value: %d)\n", index, recvMsg.Retval)
+ fmt.Println("OK")
+ fmt.Println()
+}
+
+func ipAddressDumpStream(stream api.Stream, index interface_types.InterfaceIndex) {
+ fmt.Printf("Dumping IP addresses for interface index %d..\n", index)
+
+ if err := stream.SendMsg(&ip.IPAddressDump{
+ SwIfIndex: index,
+ }); err != nil {
+ logError(err, "IPAddressDump sending message")
+ return
+ }
+ if err := stream.SendMsg(&vpe.ControlPing{}); err != nil {
+ logError(err, "ControlPing sending sending message")
+ return
+ }
+
+Loop:
+ for {
+ msg, err := stream.RecvMsg()
+ if err != nil {
+ logError(err, "IPAddressDump receiving message ")
+ return
+ }
+
+ switch msg.(type) {
+ case *ip.IPAddressDetails:
+ fmt.Printf(" - IPAddressDetails: %+v\n", msg)
+
+ case *vpe.ControlPingReply:
+ fmt.Printf(" - ControlPingReply: %+v\n", msg)
+ break Loop
+
+ default:
+ logError(err, "unexpected message")
+ return
+ }
+ }
+
+ fmt.Println("OK")
+ fmt.Println()
+}
+
+// Mactime dump uses MactimeDumpReply message as an end of the stream
+// notification instead of the control ping.
+func mactimeDump(stream api.Stream, ) {
+ fmt.Println("Sending mactime dump..")
+
+ if err := stream.SendMsg(&mactime.MactimeDump{}); err != nil {
+ logError(err, "sending mactime dump")
+ return
+ }
+
+Loop:
+ for {
+ msg, err := stream.RecvMsg()
+ if err != nil {
+ logError(err, "MactimeDump receiving message")
+ return
+ }
+
+ switch msg.(type) {
+ case *mactime.MactimeDetails:
+ fmt.Printf(" - MactimeDetails: %+v\n", msg)
+
+ case *mactime.MactimeDumpReply:
+ fmt.Printf(" - MactimeDumpReply: %+v\n", msg)
+ break Loop
+
+ default:
+ logError(err, "unexpected message")
+ return
+ }
+ }
+
+ fmt.Println("OK")
+ fmt.Println()
+}
+
+var Errors []error
+
+func logError(err error, msg string) {
+ fmt.Printf("ERROR: %s: %v\n", msg, err)
+ Errors = append(Errors, err)
+}