From 9ea1f778fb1458ce6b2265941885eab0b34b33d7 Mon Sep 17 00:00:00 2001 From: Vladimir Lavor Date: Mon, 12 Oct 2020 14:21:05 +0200 Subject: Stream API options * Stream API uses the same default values as the Channel API * request size, reply size and reply timeout settable using functional options * Added stream client example to show the stream API usage Change-Id: Id599134a7f520fc19f7d770ed5e3de74a7936829 Signed-off-by: Vladimir Lavor --- .gitignore | 1 + CHANGELOG.md | 1 + api/api.go | 8 +- core/stream.go | 58 ++++-- examples/simple-client/simple_client.go | 110 +++--------- examples/stream-client/stream_client.go | 302 ++++++++++++++++++++++++++++++++ 6 files changed, 382 insertions(+), 98 deletions(-) create mode 100644 examples/stream-client/stream_client.go diff --git a/.gitignore b/.gitignore index 8a782d5..8e61e14 100644 --- a/.gitignore +++ b/.gitignore @@ -17,4 +17,5 @@ examples/perf-bench/perf-bench examples/rpc-service/rpc-service examples/simple-client/simple-client examples/stats-client/stats-client +examples/stream-client/stream-client examples/union-example/union-example diff --git a/CHANGELOG.md b/CHANGELOG.md index e35ad0f..3bd9357 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -63,6 +63,7 @@ This file lists changes for the GoVPP releases. - improved [simple client](examples/simple-client) example to work properly even with multiple runs - added [multi-vpp](examples/multi-vpp) example displaying management of two VPP instances from single application +- added [stream-client](examples/stream-client) example showing usage of the new stream API #### Dependencies - updated `github.com/sirupsen/logrus` dep to `v1.6.0` diff --git a/api/api.go b/api/api.go index 977b02e..93f2b42 100644 --- a/api/api.go +++ b/api/api.go @@ -25,7 +25,7 @@ import ( type Connection interface { // NewStream creates a new stream for sending and receiving messages. // Context can be used to close the stream using cancel or timeout. - NewStream(ctx context.Context) (Stream, error) + NewStream(ctx context.Context, options ...StreamOption) (Stream, error) // Invoke can be used for a simple request-reply RPC. // It creates stream and calls SendMsg with req and RecvMsg with reply. @@ -57,6 +57,12 @@ type Stream interface { Close() error } +// StreamOption allows customizing a Stream. Available options are: +// - WithRequestSize +// - WithReplySize +// - WithReplyTimeout +type StreamOption func(Stream) + // ChannelProvider provides the communication channel with govpp core. type ChannelProvider interface { // NewAPIChannel returns a new channel for communication with VPP via govpp core. diff --git a/core/stream.go b/core/stream.go index 61a9965..abe9d55 100644 --- a/core/stream.go +++ b/core/stream.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "sync/atomic" + "time" "git.fd.io/govpp.git/api" ) @@ -29,36 +30,43 @@ type Stream struct { conn *Connection ctx context.Context channel *Channel + // available options + requestSize int + replySize int + replyTimeout time.Duration } -func (c *Connection) NewStream(ctx context.Context) (api.Stream, error) { +func (c *Connection) NewStream(ctx context.Context, options ...api.StreamOption) (api.Stream, error) { if c == nil { return nil, errors.New("nil connection passed in") } - // TODO: add stream options as variadic parameters for customizing: - // - request/reply channel size - // - reply timeout - // - retries - // - ??? + s := &Stream{ + conn: c, + ctx: ctx, + // default options + requestSize: RequestChanBufSize, + replySize: ReplyChanBufSize, + replyTimeout: DefaultReplyTimeout, + } - // create new channel - chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff) - channel := newChannel(chID, c, c.codec, c, 10, 10) + // parse custom options + for _, option := range options { + option(s) + } + // create and store a new channel + s.id = atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff + s.channel = newChannel(uint16(s.id), c, c.codec, c, s.requestSize, s.replySize) + s.channel.SetReplyTimeout(s.replyTimeout) // store API channel within the client c.channelsLock.Lock() - c.channels[chID] = channel + c.channels[uint16(s.id)] = s.channel c.channelsLock.Unlock() // Channel.watchRequests are not started here intentionally, because // requests are sent directly by SendMsg. - return &Stream{ - id: uint32(chID), - conn: c, - ctx: ctx, - channel: channel, - }, nil + return s, nil } func (c *Connection) Invoke(ctx context.Context, req api.Message, reply api.Message) error { @@ -123,6 +131,24 @@ func (s *Stream) RecvMsg() (api.Message, error) { return msg, nil } +func WithRequestSize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).requestSize = size + } +} + +func WithReplySize(size int) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replySize = size + } +} + +func WithReplyTimeout(timeout time.Duration) api.StreamOption { + return func(stream api.Stream) { + stream.(*Stream).replyTimeout = timeout + } +} + func (s *Stream) recvReply() (*vppReply, error) { if s.conn == nil { return nil, errors.New("stream closed") diff --git a/examples/simple-client/simple_client.go b/examples/simple-client/simple_client.go index d823273..0898c0a 100644 --- a/examples/simple-client/simple_client.go +++ b/examples/simple-client/simple_client.go @@ -17,7 +17,6 @@ package main import ( - "context" "encoding/json" "flag" "fmt" @@ -31,7 +30,6 @@ import ( "git.fd.io/govpp.git/binapi/interface_types" "git.fd.io/govpp.git/binapi/ip" "git.fd.io/govpp.git/binapi/ip_types" - "git.fd.io/govpp.git/binapi/mactime" "git.fd.io/govpp.git/binapi/vpe" "git.fd.io/govpp.git/core" ) @@ -44,9 +42,10 @@ func main() { flag.Parse() fmt.Println("Starting simple client example") + fmt.Println() // connect to VPP asynchronously - conn, conev, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval) + conn, connEv, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval) if err != nil { log.Fatalln("ERROR:", err) } @@ -54,56 +53,46 @@ func main() { // wait for Connected event select { - case e := <-conev: + case e := <-connEv: if e.State != core.Connected { log.Fatalln("ERROR: connecting to VPP failed:", e.Error) } } - // create an API channel that will be used in the examples + // check compatibility of used messages ch, err := conn.NewAPIChannel() if err != nil { log.Fatalln("ERROR: creating channel failed:", err) } defer ch.Close() - if err := ch.CheckCompatiblity(vpe.AllMessages()...); err != nil { log.Fatal(err) } - - vppVersion(ch) - if err := ch.CheckCompatiblity(interfaces.AllMessages()...); err != nil { log.Fatal(err) } + // process errors encountered during the example + defer func() { + if len(Errors) > 0 { + fmt.Printf("finished with %d errors\n", len(Errors)) + os.Exit(1) + } else { + fmt.Println("finished successfully") + } + }() + + // use request/reply (channel API) + getVppVersion(ch) idx := createLoopback(ch) interfaceDump(ch) - addIPAddress(ch, idx) ipAddressDump(ch, idx) interfaceNotifications(ch, idx) - - mactimeDump(conn) - - if len(Errors) > 0 { - fmt.Printf("finished with %d errors\n", len(Errors)) - os.Exit(1) - } else { - fmt.Println("finished successfully") - } } -var Errors []error - -func logError(err error, msg string) { - fmt.Printf("ERROR: %s: %v\n", msg, err) - Errors = append(Errors, err) -} - -// vppVersion is the simplest API example - it retrieves VPP version. -func vppVersion(ch api.Channel) { - fmt.Println("Retrieving version") +func getVppVersion(ch api.Channel) { + fmt.Println("Retrieving version..") req := &vpe.ShowVersion{} reply := &vpe.ShowVersionReply{} @@ -112,16 +101,14 @@ func vppVersion(ch api.Channel) { logError(err, "retrieving version") return } - fmt.Printf("reply: %+v\n", reply) fmt.Printf("VPP version: %q\n", reply.Version) fmt.Println("OK") fmt.Println() } -// createLoopback sends request to create loopback interface. func createLoopback(ch api.Channel) interface_types.InterfaceIndex { - fmt.Println("Creating loopback interface") + fmt.Println("Creating loopback interface..") req := &interfaces.CreateLoopback{} reply := &interfaces.CreateLoopbackReply{} @@ -130,7 +117,6 @@ func createLoopback(ch api.Channel) interface_types.InterfaceIndex { logError(err, "creating loopback interface") return 0 } - fmt.Printf("reply: %+v\n", reply) fmt.Printf("interface index: %v\n", reply.SwIfIndex) fmt.Println("OK") @@ -139,9 +125,8 @@ func createLoopback(ch api.Channel) interface_types.InterfaceIndex { return reply.SwIfIndex } -// interfaceDump shows an example of multipart request (multiple replies are expected). func interfaceDump(ch api.Channel) { - fmt.Println("Dumping interfaces") + fmt.Println("Dumping interfaces..") n := 0 reqCtx := ch.SendMultiRequest(&interfaces.SwInterfaceDump{ @@ -166,9 +151,8 @@ func interfaceDump(ch api.Channel) { fmt.Println() } -// addIPAddress sends request to add IP address to interface. func addIPAddress(ch api.Channel, index interface_types.InterfaceIndex) { - fmt.Printf("Adding IP address to interface to interface index %d\n", index) + fmt.Printf("Adding IP address to interface index %d\n", index) req := &interfaces.SwInterfaceAddDelAddress{ SwIfIndex: index, @@ -188,14 +172,13 @@ func addIPAddress(ch api.Channel, index interface_types.InterfaceIndex) { logError(err, "adding IP address to interface") return } - fmt.Printf("reply: %+v\n", reply) fmt.Println("OK") fmt.Println() } func ipAddressDump(ch api.Channel, index interface_types.InterfaceIndex) { - fmt.Printf("Dumping IP addresses for interface index %d\n", index) + fmt.Printf("Dumping IP addresses for interface index %d..\n", index) req := &ip.IPAddressDump{ SwIfIndex: index, @@ -293,48 +276,6 @@ func interfaceNotifications(ch api.Channel, index interface_types.InterfaceIndex fmt.Println() } -func mactimeDump(conn api.Connection) { - fmt.Println("Sending mactime dump") - - ctx := context.Background() - - stream, err := conn.NewStream(ctx) - if err != nil { - panic(err) - } - defer stream.Close() - - if err := stream.SendMsg(&mactime.MactimeDump{}); err != nil { - logError(err, "sending mactime dump") - return - } - -Loop: - for { - msg, err := stream.RecvMsg() - if err != nil { - logError(err, "dumping mactime") - return - } - - switch msg.(type) { - case *mactime.MactimeDetails: - fmt.Printf(" - MactimeDetails: %+v\n", msg) - - case *mactime.MactimeDumpReply: - fmt.Printf(" - MactimeDumpReply: %+v\n", msg) - break Loop - - default: - logError(err, "unexpected message") - return - } - } - - fmt.Println("OK") - fmt.Println() -} - func marshal(v interface{}) { fmt.Printf("GO: %#v\n", v) b, err := json.MarshalIndent(v, "", " ") @@ -343,3 +284,10 @@ func marshal(v interface{}) { } fmt.Printf("JSON: %s\n", b) } + +var Errors []error + +func logError(err error, msg string) { + fmt.Printf("ERROR: %s: %v\n", msg, err) + Errors = append(Errors, err) +} diff --git a/examples/stream-client/stream_client.go b/examples/stream-client/stream_client.go new file mode 100644 index 0000000..fadfe23 --- /dev/null +++ b/examples/stream-client/stream_client.go @@ -0,0 +1,302 @@ +// Copyright (c) 2017 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// stream-client is an example VPP management application that exercises the +// govpp API on real-world use-cases. +package main + +import ( + "context" + "flag" + "fmt" + "log" + "os" + "time" + + "git.fd.io/govpp.git" + "git.fd.io/govpp.git/adapter/socketclient" + "git.fd.io/govpp.git/api" + interfaces "git.fd.io/govpp.git/binapi/interface" + "git.fd.io/govpp.git/binapi/interface_types" + "git.fd.io/govpp.git/binapi/ip" + "git.fd.io/govpp.git/binapi/ip_types" + "git.fd.io/govpp.git/binapi/mactime" + "git.fd.io/govpp.git/binapi/vpe" + "git.fd.io/govpp.git/core" +) + +var ( + sockAddr = flag.String("sock", socketclient.DefaultSocketName, "Path to VPP binary API socket file") +) + +func main() { + flag.Parse() + + fmt.Println("Starting stream client example") + fmt.Println() + + // connect to VPP asynchronously + conn, connEv, err := govpp.AsyncConnect(*sockAddr, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval) + if err != nil { + log.Fatalln("ERROR:", err) + } + defer conn.Disconnect() + + // wait for Connected event + select { + case e := <-connEv: + if e.State != core.Connected { + log.Fatalln("ERROR: connecting to VPP failed:", e.Error) + } + } + + // check compatibility of used messages + ch, err := conn.NewAPIChannel() + if err != nil { + log.Fatalln("ERROR: creating channel failed:", err) + } + defer ch.Close() + if err := ch.CheckCompatiblity(vpe.AllMessages()...); err != nil { + log.Fatal(err) + } + if err := ch.CheckCompatiblity(interfaces.AllMessages()...); err != nil { + log.Fatal(err) + } + + // process errors encountered during the example + defer func() { + if len(Errors) > 0 { + fmt.Printf("finished with %d errors\n", len(Errors)) + os.Exit(1) + } else { + fmt.Println("finished successfully") + } + }() + + // send and receive messages using stream (low-low level API) + stream, err := conn.NewStream(context.Background(), + core.WithRequestSize(50), + core.WithReplySize(50), + core.WithReplyTimeout(2*time.Second)) + if err != nil { + panic(err) + } + defer func() { + if err := stream.Close(); err != nil { + logError(err, "closing the stream") + } + }() + getVppVersionStream(stream) + idx := createLoopbackStream(stream) + interfaceDumpStream(stream) + addIPAddressStream(stream, idx) + ipAddressDumpStream(stream, idx) + mactimeDump(stream) + return +} + +func getVppVersionStream(stream api.Stream) { + fmt.Println("Retrieving version..") + + req := &vpe.ShowVersion{} + if err := stream.SendMsg(req); err != nil { + logError(err, "ShowVersion sending message") + return + } + recv, err := stream.RecvMsg() + if err != nil { + logError(err, "ShowVersion receive message") + return + } + recvMsg := recv.(*vpe.ShowVersionReply) + + fmt.Printf("Retrieved VPP version: %q\n", recvMsg.Version) + fmt.Println("OK") + fmt.Println() +} + +func createLoopbackStream(stream api.Stream) (ifIdx interface_types.InterfaceIndex) { + fmt.Println("Creating the loopback interface..") + + req := &interfaces.CreateLoopback{} + if err := stream.SendMsg(req); err != nil { + logError(err, "CreateLoopback sending message") + return + } + recv, err := stream.RecvMsg() + if err != nil { + logError(err, "CreateLoopback receive message") + return + } + recvMsg := recv.(*interfaces.CreateLoopbackReply) + + fmt.Printf("Loopback interface index: %v\n", recvMsg.SwIfIndex) + fmt.Println("OK") + fmt.Println() + + return recvMsg.SwIfIndex +} + +func interfaceDumpStream(stream api.Stream) { + fmt.Println("Dumping interfaces..") + + if err := stream.SendMsg(&interfaces.SwInterfaceDump{ + SwIfIndex: ^interface_types.InterfaceIndex(0), + }); err != nil { + logError(err, "SwInterfaceDump sending message") + return + } + if err := stream.SendMsg(&vpe.ControlPing{}); err != nil { + logError(err, "ControlPing sending message") + return + } + +Loop: + for { + msg, err := stream.RecvMsg() + if err != nil { + logError(err, "SwInterfaceDump receiving message ") + return + } + + switch msg.(type) { + case *interfaces.SwInterfaceDetails: + fmt.Printf(" - SwInterfaceDetails: %+v\n", msg) + + case *vpe.ControlPingReply: + fmt.Printf(" - ControlPingReply: %+v\n", msg) + break Loop + + default: + logError(err, "unexpected message") + return + } + } + + fmt.Println("OK") + fmt.Println() +} + +func addIPAddressStream(stream api.Stream, index interface_types.InterfaceIndex) { + fmt.Printf("Adding IP address to the interface index %d..\n", index) + + if err := stream.SendMsg(&interfaces.SwInterfaceAddDelAddress{ + SwIfIndex: index, + IsAdd: true, + Prefix: ip_types.AddressWithPrefix{ + Address: ip_types.Address{ + Af: ip_types.ADDRESS_IP4, + Un: ip_types.AddressUnionIP4(ip_types.IP4Address{10, 10, 0, uint8(index)}), + }, + Len: 32, + }, + }); err != nil { + logError(err, "SwInterfaceAddDelAddress sending message") + return + } + + recv, err := stream.RecvMsg() + if err != nil { + logError(err, "SwInterfaceAddDelAddressReply receiving message") + return + } + recvMsg := recv.(*interfaces.SwInterfaceAddDelAddressReply) + + fmt.Printf("Added IP address to interface: %v (return value: %d)\n", index, recvMsg.Retval) + fmt.Println("OK") + fmt.Println() +} + +func ipAddressDumpStream(stream api.Stream, index interface_types.InterfaceIndex) { + fmt.Printf("Dumping IP addresses for interface index %d..\n", index) + + if err := stream.SendMsg(&ip.IPAddressDump{ + SwIfIndex: index, + }); err != nil { + logError(err, "IPAddressDump sending message") + return + } + if err := stream.SendMsg(&vpe.ControlPing{}); err != nil { + logError(err, "ControlPing sending sending message") + return + } + +Loop: + for { + msg, err := stream.RecvMsg() + if err != nil { + logError(err, "IPAddressDump receiving message ") + return + } + + switch msg.(type) { + case *ip.IPAddressDetails: + fmt.Printf(" - IPAddressDetails: %+v\n", msg) + + case *vpe.ControlPingReply: + fmt.Printf(" - ControlPingReply: %+v\n", msg) + break Loop + + default: + logError(err, "unexpected message") + return + } + } + + fmt.Println("OK") + fmt.Println() +} + +// Mactime dump uses MactimeDumpReply message as an end of the stream +// notification instead of the control ping. +func mactimeDump(stream api.Stream, ) { + fmt.Println("Sending mactime dump..") + + if err := stream.SendMsg(&mactime.MactimeDump{}); err != nil { + logError(err, "sending mactime dump") + return + } + +Loop: + for { + msg, err := stream.RecvMsg() + if err != nil { + logError(err, "MactimeDump receiving message") + return + } + + switch msg.(type) { + case *mactime.MactimeDetails: + fmt.Printf(" - MactimeDetails: %+v\n", msg) + + case *mactime.MactimeDumpReply: + fmt.Printf(" - MactimeDumpReply: %+v\n", msg) + break Loop + + default: + logError(err, "unexpected message") + return + } + } + + fmt.Println("OK") + fmt.Println() +} + +var Errors []error + +func logError(err error, msg string) { + fmt.Printf("ERROR: %s: %v\n", msg, err) + Errors = append(Errors, err) +} -- cgit 1.2.3-korg