aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorOndrej Fabry <ofabry@cisco.com>2019-10-08 14:49:16 +0200
committerOndrej Fabry <ofabry@cisco.com>2019-10-10 11:30:49 +0200
commit454aff1907d87c79ffb11e151e25953241c7f1be (patch)
tree07b2287a252f5b981de4fd0b53a7b207fb24a9a0
parent809b69ea4a90455445c34bbad7d8e5fea5cf3462 (diff)
Introduce proxy for VPP
- proxy server defines RPC service for proxying binapi/stats to VPP - use cmd/vpp-proxy for proxy server/client Change-Id: I6e698e166ecf6db7109ae5adf8a93f308d3f3f2a Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
-rw-r--r--cmd/vpp-proxy/main.go131
-rw-r--r--proxy/client.go178
-rw-r--r--proxy/proxy.go102
-rw-r--r--proxy/server.go109
4 files changed, 520 insertions, 0 deletions
diff --git a/cmd/vpp-proxy/main.go b/cmd/vpp-proxy/main.go
new file mode 100644
index 0000000..7aea885
--- /dev/null
+++ b/cmd/vpp-proxy/main.go
@@ -0,0 +1,131 @@
+// Copyright (c) 2019 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "encoding/gob"
+ "flag"
+ "io"
+ "log"
+
+ "git.fd.io/govpp.git/adapter/socketclient"
+ "git.fd.io/govpp.git/adapter/statsclient"
+ "git.fd.io/govpp.git/api"
+ "git.fd.io/govpp.git/examples/binapi/interfaces"
+ "git.fd.io/govpp.git/examples/binapi/vpe"
+ "git.fd.io/govpp.git/proxy"
+)
+
+var (
+ binapiSocket = flag.String("binapi-socket", socketclient.DefaultSocketName, "Path to VPP binapi socket")
+ statsSocket = flag.String("stats-socket", statsclient.DefaultSocketName, "Path to VPP stats socket")
+ proxyAddr = flag.String("addr", ":7878", "Address on which proxy serves RPC.")
+)
+
+func init() {
+ for _, msg := range api.GetRegisteredMessages() {
+ gob.Register(msg)
+ }
+}
+
+func main() {
+ flag.Parse()
+
+ switch cmd := flag.Arg(0); cmd {
+ case "server":
+ runServer()
+ case "client":
+ runClient()
+ default:
+ log.Printf("invalid command: %q, (available commands: client, server)", cmd)
+ }
+}
+
+func runClient() {
+ // connect to proxy server
+ client, err := proxy.Connect(*proxyAddr)
+ if err != nil {
+ log.Fatalln("connecting to proxy failed:", err)
+ }
+
+ // proxy stats
+ statsProvider, err := client.NewStatsClient()
+ if err != nil {
+ log.Fatalln(err)
+ }
+
+ var sysStats api.SystemStats
+ if err := statsProvider.GetSystemStats(&sysStats); err != nil {
+ log.Fatalln("getting stats failed:", err)
+ }
+ log.Printf("SystemStats: %+v", sysStats)
+
+ var ifaceStats api.InterfaceStats
+ if err := statsProvider.GetInterfaceStats(&ifaceStats); err != nil {
+ log.Fatalln("getting stats failed:", err)
+ }
+ log.Printf("InterfaceStats: %+v", ifaceStats)
+
+ // proxy binapi
+ binapiChannel, err := client.NewBinapiClient()
+ if err != nil {
+ log.Fatalln(err)
+ }
+
+ // - using binapi message directly
+ req := &vpe.CliInband{Cmd: "show version"}
+ reply := new(vpe.CliInbandReply)
+ if err := binapiChannel.SendRequest(req).ReceiveReply(reply); err != nil {
+ log.Fatalln("binapi request failed:", err)
+ }
+ log.Printf("VPP version: %+v", reply.Reply)
+
+ // - or using generated rpc service
+ svc := interfaces.NewServiceClient(binapiChannel)
+ stream, err := svc.DumpSwInterface(context.Background(), &interfaces.SwInterfaceDump{})
+ if err != nil {
+ log.Fatalln("binapi request failed:", err)
+ }
+ for {
+ iface, err := stream.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ log.Fatalln(err)
+ }
+ log.Printf("- interface: %+v", iface)
+ }
+}
+
+func runServer() {
+ p := proxy.NewServer()
+
+ statsAdapter := statsclient.NewStatsClient(*statsSocket)
+ binapiAdapter := socketclient.NewVppClient(*binapiSocket)
+
+ if err := p.ConnectStats(statsAdapter); err != nil {
+ log.Fatalln("connecting to stats failed:", err)
+ }
+ defer p.DisconnectStats()
+
+ if err := p.ConnectBinapi(binapiAdapter); err != nil {
+ log.Fatalln("connecting to binapi failed:", err)
+ }
+ defer p.DisconnectBinapi()
+
+ p.ListenAndServe(*proxyAddr)
+}
diff --git a/proxy/client.go b/proxy/client.go
new file mode 100644
index 0000000..72ee9e9
--- /dev/null
+++ b/proxy/client.go
@@ -0,0 +1,178 @@
+package proxy
+
+import (
+ "fmt"
+ "log"
+ "net/rpc"
+ "reflect"
+ "time"
+
+ "git.fd.io/govpp.git/api"
+)
+
+type Client struct {
+ serverAddr string
+ rpc *rpc.Client
+}
+
+// Connect dials remote proxy server on given address and
+// returns new client if successful.
+func Connect(addr string) (*Client, error) {
+ client, err := rpc.DialHTTP("tcp", addr)
+ if err != nil {
+ log.Fatal("Connection error: ", err)
+ }
+ c := &Client{
+ serverAddr: addr,
+ rpc: client,
+ }
+ return c, nil
+}
+
+// NewStatsClient returns new StatsClient which implements api.StatsProvider.
+func (c *Client) NewStatsClient() (*StatsClient, error) {
+ stats := &StatsClient{
+ rpc: c.rpc,
+ }
+ return stats, nil
+}
+
+// NewBinapiClient returns new BinapiClient which implements api.Channel.
+func (c *Client) NewBinapiClient() (*BinapiClient, error) {
+ binapi := &BinapiClient{
+ rpc: c.rpc,
+ }
+ return binapi, nil
+}
+
+type StatsClient struct {
+ rpc *rpc.Client
+}
+
+func (s *StatsClient) GetSystemStats(sysStats *api.SystemStats) error {
+ req := StatsRequest{StatsType: "system"}
+ resp := StatsResponse{SysStats: sysStats}
+ return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetNodeStats(nodeStats *api.NodeStats) error {
+ req := StatsRequest{StatsType: "node"}
+ resp := StatsResponse{NodeStats: nodeStats}
+ return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetInterfaceStats(ifaceStats *api.InterfaceStats) error {
+ req := StatsRequest{StatsType: "interface"}
+ resp := StatsResponse{IfaceStats: ifaceStats}
+ return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetErrorStats(errStats *api.ErrorStats) error {
+ req := StatsRequest{StatsType: "error"}
+ resp := StatsResponse{ErrStats: errStats}
+ return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+func (s *StatsClient) GetBufferStats(bufStats *api.BufferStats) error {
+ req := StatsRequest{StatsType: "buffer"}
+ resp := StatsResponse{BufStats: bufStats}
+ return s.rpc.Call("StatsRPC.GetStats", req, &resp)
+}
+
+type BinapiClient struct {
+ rpc *rpc.Client
+}
+
+func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
+ req := &requestCtx{
+ rpc: b.rpc,
+ req: msg,
+ }
+ log.Printf("SendRequest: %T %+v", msg, msg)
+ return req
+}
+
+type requestCtx struct {
+ rpc *rpc.Client
+ req api.Message
+}
+
+func (r *requestCtx) ReceiveReply(msg api.Message) error {
+ req := BinapiRequest{
+ Msg: r.req,
+ ReplyMsg: msg,
+ }
+ resp := BinapiResponse{}
+
+ err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+ if err != nil {
+ return fmt.Errorf("RPC call failed: %v", err)
+ }
+
+ // we set the value of msg to the value from response
+ reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
+
+ return nil
+}
+
+func (b *BinapiClient) SendMultiRequest(msg api.Message) api.MultiRequestCtx {
+ req := &multiRequestCtx{
+ rpc: b.rpc,
+ req: msg,
+ }
+ log.Printf("SendMultiRequest: %T %+v", msg, msg)
+ return req
+}
+
+type multiRequestCtx struct {
+ rpc *rpc.Client
+ req api.Message
+
+ index int
+ replies []api.Message
+}
+
+func (r *multiRequestCtx) ReceiveReply(msg api.Message) (stop bool, err error) {
+ // we call Invoke only on first ReceiveReply
+ if r.index == 0 {
+ req := BinapiRequest{
+ Msg: r.req,
+ ReplyMsg: msg,
+ IsMulti: true,
+ }
+ resp := BinapiResponse{}
+
+ err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+ if err != nil {
+ return false, fmt.Errorf("RPC call failed: %v", err)
+ }
+
+ r.replies = resp.Msgs
+ }
+
+ if r.index >= len(r.replies) {
+ return true, nil
+ }
+
+ // we set the value of msg to the value from response
+ reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(r.replies[r.index]).Elem())
+ r.index++
+
+ return false, nil
+}
+
+func (b *BinapiClient) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) {
+ panic("implement me")
+}
+
+func (b *BinapiClient) SetReplyTimeout(timeout time.Duration) {
+ panic("implement me")
+}
+
+func (b *BinapiClient) CheckCompatiblity(msgs ...api.Message) error {
+ return nil // TODO: proxy this
+}
+
+func (b *BinapiClient) Close() {
+ b.rpc.Close()
+}
diff --git a/proxy/proxy.go b/proxy/proxy.go
new file mode 100644
index 0000000..1f8f824
--- /dev/null
+++ b/proxy/proxy.go
@@ -0,0 +1,102 @@
+// Copyright (c) 2019 Cisco and/or its affiliates.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at:
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package proxy
+
+import (
+ "log"
+ "net"
+ "net/http"
+ "net/rpc"
+
+ "git.fd.io/govpp.git/adapter"
+ "git.fd.io/govpp.git/core"
+)
+
+// Server defines a proxy server that serves client requests to stats and binapi.
+type Server struct {
+ rpc *rpc.Server
+
+ statsConn *core.StatsConnection
+ binapiConn *core.Connection
+}
+
+func NewServer() *Server {
+ return &Server{
+ rpc: rpc.NewServer(),
+ }
+}
+
+func (p *Server) ConnectStats(stats adapter.StatsAPI) error {
+ var err error
+ p.statsConn, err = core.ConnectStats(stats)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (p *Server) DisconnectStats() {
+ if p.statsConn != nil {
+ p.statsConn.Disconnect()
+ }
+}
+
+func (p *Server) ConnectBinapi(binapi adapter.VppAPI) error {
+ var err error
+ p.binapiConn, err = core.Connect(binapi)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (p *Server) DisconnectBinapi() {
+ if p.binapiConn != nil {
+ p.binapiConn.Disconnect()
+ }
+}
+
+func (p *Server) ListenAndServe(addr string) {
+ if p.statsConn != nil {
+ statsRPC := NewStatsRPC(p.statsConn)
+ if err := p.rpc.Register(statsRPC); err != nil {
+ panic(err)
+ }
+ }
+ if p.binapiConn != nil {
+ ch, err := p.binapiConn.NewAPIChannel()
+ if err != nil {
+ panic(err)
+ }
+ binapiRPC := NewBinapiRPC(ch)
+ if err := p.rpc.Register(binapiRPC); err != nil {
+ panic(err)
+ }
+ }
+
+ p.rpc.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath)
+
+ l, e := net.Listen("tcp", addr)
+ if e != nil {
+ log.Fatal("listen error:", e)
+ }
+ defer l.Close()
+
+ log.Printf("proxy serving on: %v", addr)
+
+ if err := http.Serve(l, nil); err != nil {
+ log.Fatalln(err)
+ }
+}
diff --git a/proxy/server.go b/proxy/server.go
new file mode 100644
index 0000000..df62356
--- /dev/null
+++ b/proxy/server.go
@@ -0,0 +1,109 @@
+package proxy
+
+import (
+ "fmt"
+ "log"
+ "reflect"
+
+ "git.fd.io/govpp.git/api"
+)
+
+type StatsRequest struct {
+ StatsType string
+}
+
+type StatsResponse struct {
+ SysStats *api.SystemStats
+ NodeStats *api.NodeStats
+ IfaceStats *api.InterfaceStats
+ ErrStats *api.ErrorStats
+ BufStats *api.BufferStats
+}
+
+// StatsRPC is a RPC server for proxying client request to api.StatsProvider.
+type StatsRPC struct {
+ stats api.StatsProvider
+}
+
+// NewStatsRPC returns new StatsRPC to be used as RPC server
+// proxying request to given api.StatsProvider.
+func NewStatsRPC(stats api.StatsProvider) *StatsRPC {
+ return &StatsRPC{stats: stats}
+}
+
+func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
+ log.Printf("StatsRPC.GetStats - REQ: %+v", req)
+
+ switch req.StatsType {
+ case "system":
+ resp.SysStats = new(api.SystemStats)
+ return s.stats.GetSystemStats(resp.SysStats)
+ case "node":
+ resp.NodeStats = new(api.NodeStats)
+ return s.stats.GetNodeStats(resp.NodeStats)
+ case "interface":
+ resp.IfaceStats = new(api.InterfaceStats)
+ return s.stats.GetInterfaceStats(resp.IfaceStats)
+ case "error":
+ resp.ErrStats = new(api.ErrorStats)
+ return s.stats.GetErrorStats(resp.ErrStats)
+ case "buffer":
+ resp.BufStats = new(api.BufferStats)
+ return s.stats.GetBufferStats(resp.BufStats)
+ default:
+ return fmt.Errorf("unknown stats type: %s", req.StatsType)
+ }
+}
+
+type BinapiRequest struct {
+ Msg api.Message
+ IsMulti bool
+ ReplyMsg api.Message
+}
+
+type BinapiResponse struct {
+ Msg api.Message
+ Msgs []api.Message
+}
+
+// BinapiRPC is a RPC server for proxying client request to api.Channel.
+type BinapiRPC struct {
+ binapi api.Channel
+}
+
+// NewBinapiRPC returns new BinapiRPC to be used as RPC server
+// proxying request to given api.Channel.
+func NewBinapiRPC(binapi api.Channel) *BinapiRPC {
+ return &BinapiRPC{binapi: binapi}
+}
+
+func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
+ log.Printf("BinapiRPC.Invoke - REQ: %#v", req)
+
+ if req.IsMulti {
+ multi := s.binapi.SendMultiRequest(req.Msg)
+ for {
+ // create new message in response of type ReplyMsg
+ msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
+
+ stop, err := multi.ReceiveReply(msg)
+ if err != nil {
+ return err
+ } else if stop {
+ break
+ }
+
+ resp.Msgs = append(resp.Msgs, msg)
+ }
+ } else {
+ // create new message in response of type ReplyMsg
+ resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message)
+
+ err := s.binapi.SendRequest(req.Msg).ReceiveReply(resp.Msg)
+ if err != nil {
+ return err
+ }
+ }
+
+ return nil
+}