diff options
author | Ondrej Fabry <ofabry@cisco.com> | 2019-10-08 14:49:16 +0200 |
---|---|---|
committer | Ondrej Fabry <ofabry@cisco.com> | 2019-10-10 11:30:49 +0200 |
commit | 454aff1907d87c79ffb11e151e25953241c7f1be (patch) | |
tree | 07b2287a252f5b981de4fd0b53a7b207fb24a9a0 | |
parent | 809b69ea4a90455445c34bbad7d8e5fea5cf3462 (diff) |
Introduce proxy for VPP
- proxy server defines RPC service for proxying binapi/stats to VPP
- use cmd/vpp-proxy for proxy server/client
Change-Id: I6e698e166ecf6db7109ae5adf8a93f308d3f3f2a
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
-rw-r--r-- | cmd/vpp-proxy/main.go | 131 | ||||
-rw-r--r-- | proxy/client.go | 178 | ||||
-rw-r--r-- | proxy/proxy.go | 102 | ||||
-rw-r--r-- | proxy/server.go | 109 |
4 files changed, 520 insertions, 0 deletions
diff --git a/cmd/vpp-proxy/main.go b/cmd/vpp-proxy/main.go new file mode 100644 index 0000000..7aea885 --- /dev/null +++ b/cmd/vpp-proxy/main.go @@ -0,0 +1,131 @@ +// Copyright (c) 2019 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/gob" + "flag" + "io" + "log" + + "git.fd.io/govpp.git/adapter/socketclient" + "git.fd.io/govpp.git/adapter/statsclient" + "git.fd.io/govpp.git/api" + "git.fd.io/govpp.git/examples/binapi/interfaces" + "git.fd.io/govpp.git/examples/binapi/vpe" + "git.fd.io/govpp.git/proxy" +) + +var ( + binapiSocket = flag.String("binapi-socket", socketclient.DefaultSocketName, "Path to VPP binapi socket") + statsSocket = flag.String("stats-socket", statsclient.DefaultSocketName, "Path to VPP stats socket") + proxyAddr = flag.String("addr", ":7878", "Address on which proxy serves RPC.") +) + +func init() { + for _, msg := range api.GetRegisteredMessages() { + gob.Register(msg) + } +} + +func main() { + flag.Parse() + + switch cmd := flag.Arg(0); cmd { + case "server": + runServer() + case "client": + runClient() + default: + log.Printf("invalid command: %q, (available commands: client, server)", cmd) + } +} + +func runClient() { + // connect to proxy server + client, err := proxy.Connect(*proxyAddr) + if err != nil { + log.Fatalln("connecting to proxy failed:", err) + } + + // proxy stats + statsProvider, err := client.NewStatsClient() + if err != nil { + log.Fatalln(err) + } + + var sysStats api.SystemStats + if err := statsProvider.GetSystemStats(&sysStats); err != nil { + log.Fatalln("getting stats failed:", err) + } + log.Printf("SystemStats: %+v", sysStats) + + var ifaceStats api.InterfaceStats + if err := statsProvider.GetInterfaceStats(&ifaceStats); err != nil { + log.Fatalln("getting stats failed:", err) + } + log.Printf("InterfaceStats: %+v", ifaceStats) + + // proxy binapi + binapiChannel, err := client.NewBinapiClient() + if err != nil { + log.Fatalln(err) + } + + // - using binapi message directly + req := &vpe.CliInband{Cmd: "show version"} + reply := new(vpe.CliInbandReply) + if err := binapiChannel.SendRequest(req).ReceiveReply(reply); err != nil { + log.Fatalln("binapi request failed:", err) + } + log.Printf("VPP version: %+v", reply.Reply) + + // - or using generated rpc service + svc := interfaces.NewServiceClient(binapiChannel) + stream, err := svc.DumpSwInterface(context.Background(), &interfaces.SwInterfaceDump{}) + if err != nil { + log.Fatalln("binapi request failed:", err) + } + for { + iface, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Fatalln(err) + } + log.Printf("- interface: %+v", iface) + } +} + +func runServer() { + p := proxy.NewServer() + + statsAdapter := statsclient.NewStatsClient(*statsSocket) + binapiAdapter := socketclient.NewVppClient(*binapiSocket) + + if err := p.ConnectStats(statsAdapter); err != nil { + log.Fatalln("connecting to stats failed:", err) + } + defer p.DisconnectStats() + + if err := p.ConnectBinapi(binapiAdapter); err != nil { + log.Fatalln("connecting to binapi failed:", err) + } + defer p.DisconnectBinapi() + + p.ListenAndServe(*proxyAddr) +} diff --git a/proxy/client.go b/proxy/client.go new file mode 100644 index 0000000..72ee9e9 --- /dev/null +++ b/proxy/client.go @@ -0,0 +1,178 @@ +package proxy + +import ( + "fmt" + "log" + "net/rpc" + "reflect" + "time" + + "git.fd.io/govpp.git/api" +) + +type Client struct { + serverAddr string + rpc *rpc.Client +} + +// Connect dials remote proxy server on given address and +// returns new client if successful. +func Connect(addr string) (*Client, error) { + client, err := rpc.DialHTTP("tcp", addr) + if err != nil { + log.Fatal("Connection error: ", err) + } + c := &Client{ + serverAddr: addr, + rpc: client, + } + return c, nil +} + +// NewStatsClient returns new StatsClient which implements api.StatsProvider. +func (c *Client) NewStatsClient() (*StatsClient, error) { + stats := &StatsClient{ + rpc: c.rpc, + } + return stats, nil +} + +// NewBinapiClient returns new BinapiClient which implements api.Channel. +func (c *Client) NewBinapiClient() (*BinapiClient, error) { + binapi := &BinapiClient{ + rpc: c.rpc, + } + return binapi, nil +} + +type StatsClient struct { + rpc *rpc.Client +} + +func (s *StatsClient) GetSystemStats(sysStats *api.SystemStats) error { + req := StatsRequest{StatsType: "system"} + resp := StatsResponse{SysStats: sysStats} + return s.rpc.Call("StatsRPC.GetStats", req, &resp) +} + +func (s *StatsClient) GetNodeStats(nodeStats *api.NodeStats) error { + req := StatsRequest{StatsType: "node"} + resp := StatsResponse{NodeStats: nodeStats} + return s.rpc.Call("StatsRPC.GetStats", req, &resp) +} + +func (s *StatsClient) GetInterfaceStats(ifaceStats *api.InterfaceStats) error { + req := StatsRequest{StatsType: "interface"} + resp := StatsResponse{IfaceStats: ifaceStats} + return s.rpc.Call("StatsRPC.GetStats", req, &resp) +} + +func (s *StatsClient) GetErrorStats(errStats *api.ErrorStats) error { + req := StatsRequest{StatsType: "error"} + resp := StatsResponse{ErrStats: errStats} + return s.rpc.Call("StatsRPC.GetStats", req, &resp) +} + +func (s *StatsClient) GetBufferStats(bufStats *api.BufferStats) error { + req := StatsRequest{StatsType: "buffer"} + resp := StatsResponse{BufStats: bufStats} + return s.rpc.Call("StatsRPC.GetStats", req, &resp) +} + +type BinapiClient struct { + rpc *rpc.Client +} + +func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx { + req := &requestCtx{ + rpc: b.rpc, + req: msg, + } + log.Printf("SendRequest: %T %+v", msg, msg) + return req +} + +type requestCtx struct { + rpc *rpc.Client + req api.Message +} + +func (r *requestCtx) ReceiveReply(msg api.Message) error { + req := BinapiRequest{ + Msg: r.req, + ReplyMsg: msg, + } + resp := BinapiResponse{} + + err := r.rpc.Call("BinapiRPC.Invoke", req, &resp) + if err != nil { + return fmt.Errorf("RPC call failed: %v", err) + } + + // we set the value of msg to the value from response + reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem()) + + return nil +} + +func (b *BinapiClient) SendMultiRequest(msg api.Message) api.MultiRequestCtx { + req := &multiRequestCtx{ + rpc: b.rpc, + req: msg, + } + log.Printf("SendMultiRequest: %T %+v", msg, msg) + return req +} + +type multiRequestCtx struct { + rpc *rpc.Client + req api.Message + + index int + replies []api.Message +} + +func (r *multiRequestCtx) ReceiveReply(msg api.Message) (stop bool, err error) { + // we call Invoke only on first ReceiveReply + if r.index == 0 { + req := BinapiRequest{ + Msg: r.req, + ReplyMsg: msg, + IsMulti: true, + } + resp := BinapiResponse{} + + err := r.rpc.Call("BinapiRPC.Invoke", req, &resp) + if err != nil { + return false, fmt.Errorf("RPC call failed: %v", err) + } + + r.replies = resp.Msgs + } + + if r.index >= len(r.replies) { + return true, nil + } + + // we set the value of msg to the value from response + reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(r.replies[r.index]).Elem()) + r.index++ + + return false, nil +} + +func (b *BinapiClient) SubscribeNotification(notifChan chan api.Message, event api.Message) (api.SubscriptionCtx, error) { + panic("implement me") +} + +func (b *BinapiClient) SetReplyTimeout(timeout time.Duration) { + panic("implement me") +} + +func (b *BinapiClient) CheckCompatiblity(msgs ...api.Message) error { + return nil // TODO: proxy this +} + +func (b *BinapiClient) Close() { + b.rpc.Close() +} diff --git a/proxy/proxy.go b/proxy/proxy.go new file mode 100644 index 0000000..1f8f824 --- /dev/null +++ b/proxy/proxy.go @@ -0,0 +1,102 @@ +// Copyright (c) 2019 Cisco and/or its affiliates. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at: +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +import ( + "log" + "net" + "net/http" + "net/rpc" + + "git.fd.io/govpp.git/adapter" + "git.fd.io/govpp.git/core" +) + +// Server defines a proxy server that serves client requests to stats and binapi. +type Server struct { + rpc *rpc.Server + + statsConn *core.StatsConnection + binapiConn *core.Connection +} + +func NewServer() *Server { + return &Server{ + rpc: rpc.NewServer(), + } +} + +func (p *Server) ConnectStats(stats adapter.StatsAPI) error { + var err error + p.statsConn, err = core.ConnectStats(stats) + if err != nil { + return err + } + return nil +} + +func (p *Server) DisconnectStats() { + if p.statsConn != nil { + p.statsConn.Disconnect() + } +} + +func (p *Server) ConnectBinapi(binapi adapter.VppAPI) error { + var err error + p.binapiConn, err = core.Connect(binapi) + if err != nil { + return err + } + return nil +} + +func (p *Server) DisconnectBinapi() { + if p.binapiConn != nil { + p.binapiConn.Disconnect() + } +} + +func (p *Server) ListenAndServe(addr string) { + if p.statsConn != nil { + statsRPC := NewStatsRPC(p.statsConn) + if err := p.rpc.Register(statsRPC); err != nil { + panic(err) + } + } + if p.binapiConn != nil { + ch, err := p.binapiConn.NewAPIChannel() + if err != nil { + panic(err) + } + binapiRPC := NewBinapiRPC(ch) + if err := p.rpc.Register(binapiRPC); err != nil { + panic(err) + } + } + + p.rpc.HandleHTTP(rpc.DefaultRPCPath, rpc.DefaultDebugPath) + + l, e := net.Listen("tcp", addr) + if e != nil { + log.Fatal("listen error:", e) + } + defer l.Close() + + log.Printf("proxy serving on: %v", addr) + + if err := http.Serve(l, nil); err != nil { + log.Fatalln(err) + } +} diff --git a/proxy/server.go b/proxy/server.go new file mode 100644 index 0000000..df62356 --- /dev/null +++ b/proxy/server.go @@ -0,0 +1,109 @@ +package proxy + +import ( + "fmt" + "log" + "reflect" + + "git.fd.io/govpp.git/api" +) + +type StatsRequest struct { + StatsType string +} + +type StatsResponse struct { + SysStats *api.SystemStats + NodeStats *api.NodeStats + IfaceStats *api.InterfaceStats + ErrStats *api.ErrorStats + BufStats *api.BufferStats +} + +// StatsRPC is a RPC server for proxying client request to api.StatsProvider. +type StatsRPC struct { + stats api.StatsProvider +} + +// NewStatsRPC returns new StatsRPC to be used as RPC server +// proxying request to given api.StatsProvider. +func NewStatsRPC(stats api.StatsProvider) *StatsRPC { + return &StatsRPC{stats: stats} +} + +func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error { + log.Printf("StatsRPC.GetStats - REQ: %+v", req) + + switch req.StatsType { + case "system": + resp.SysStats = new(api.SystemStats) + return s.stats.GetSystemStats(resp.SysStats) + case "node": + resp.NodeStats = new(api.NodeStats) + return s.stats.GetNodeStats(resp.NodeStats) + case "interface": + resp.IfaceStats = new(api.InterfaceStats) + return s.stats.GetInterfaceStats(resp.IfaceStats) + case "error": + resp.ErrStats = new(api.ErrorStats) + return s.stats.GetErrorStats(resp.ErrStats) + case "buffer": + resp.BufStats = new(api.BufferStats) + return s.stats.GetBufferStats(resp.BufStats) + default: + return fmt.Errorf("unknown stats type: %s", req.StatsType) + } +} + +type BinapiRequest struct { + Msg api.Message + IsMulti bool + ReplyMsg api.Message +} + +type BinapiResponse struct { + Msg api.Message + Msgs []api.Message +} + +// BinapiRPC is a RPC server for proxying client request to api.Channel. +type BinapiRPC struct { + binapi api.Channel +} + +// NewBinapiRPC returns new BinapiRPC to be used as RPC server +// proxying request to given api.Channel. +func NewBinapiRPC(binapi api.Channel) *BinapiRPC { + return &BinapiRPC{binapi: binapi} +} + +func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error { + log.Printf("BinapiRPC.Invoke - REQ: %#v", req) + + if req.IsMulti { + multi := s.binapi.SendMultiRequest(req.Msg) + for { + // create new message in response of type ReplyMsg + msg := reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message) + + stop, err := multi.ReceiveReply(msg) + if err != nil { + return err + } else if stop { + break + } + + resp.Msgs = append(resp.Msgs, msg) + } + } else { + // create new message in response of type ReplyMsg + resp.Msg = reflect.New(reflect.TypeOf(req.ReplyMsg).Elem()).Interface().(api.Message) + + err := s.binapi.SendRequest(req.Msg).ReceiveReply(resp.Msg) + if err != nil { + return err + } + } + + return nil +} |