aboutsummaryrefslogtreecommitdiffstats
path: root/proxy
diff options
context:
space:
mode:
Diffstat (limited to 'proxy')
-rw-r--r--proxy/client.go92
-rw-r--r--proxy/proxy.go2
-rw-r--r--proxy/server.go168
3 files changed, 228 insertions, 34 deletions
diff --git a/proxy/client.go b/proxy/client.go
index 6f29c71..e333eb9 100644
--- a/proxy/client.go
+++ b/proxy/client.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2019 Cisco and/or its affiliates.
+// Copyright (c) 2021 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,13 +15,14 @@
package proxy
import (
+ "context"
"fmt"
"net/rpc"
"reflect"
"time"
- "git.fd.io/govpp.git/api"
- "git.fd.io/govpp.git/core"
+ "go.fd.io/govpp/api"
+ "go.fd.io/govpp/core"
)
type Client struct {
@@ -117,11 +118,82 @@ func (s *StatsClient) GetBufferStats(bufStats *api.BufferStats) error {
return nil
}
+func (s *StatsClient) GetMemoryStats(memStats *api.MemoryStats) error {
+ req := StatsRequest{StatsType: "memory"}
+ resp := StatsResponse{MemStats: new(api.MemoryStats)}
+ if err := s.rpc.Call("StatsRPC.GetStats", req, &resp); err != nil {
+ return err
+ }
+ *memStats = *resp.MemStats
+ return nil
+}
+
type BinapiClient struct {
rpc *rpc.Client
timeout time.Duration
}
+// RPCStream is a stream for forwarding requests to BinapiRPC's stream.
+type RPCStream struct {
+ rpc *rpc.Client
+ id uint32
+}
+
+func (s *RPCStream) SendMsg(msg api.Message) error {
+ req := RPCStreamReqResp{
+ ID: s.id,
+ Msg: msg,
+ }
+ resp := RPCStreamReqResp{}
+ err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp)
+ if err != nil {
+ return fmt.Errorf("RPC SendMessage call failed: %v", err)
+ }
+ return nil
+}
+
+func (s *RPCStream) RecvMsg() (api.Message, error) {
+ req := RPCStreamReqResp{
+ ID: s.id,
+ }
+ resp := RPCStreamReqResp{}
+ err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp)
+ if err != nil {
+ return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err)
+ }
+ return resp.Msg, nil
+}
+
+func (s *RPCStream) Close() error {
+ req := RPCStreamReqResp{
+ ID: s.id,
+ }
+ resp := RPCStreamReqResp{}
+ err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp)
+ if err != nil {
+ return fmt.Errorf("RPC CloseStream call failed: %v", err)
+ }
+ return nil
+}
+
+func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) {
+ stream := &RPCStream{
+ rpc: b.rpc,
+ }
+ req := RPCStreamReqResp{}
+ resp := RPCStreamReqResp{}
+ err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp)
+ if err != nil {
+ return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err)
+ }
+ stream.id = resp.ID
+ return stream, err
+}
+
+func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error {
+ return invokeInternal(b.rpc, request, reply, b.timeout)
+}
+
func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
req := &requestCtx{
rpc: b.rpc,
@@ -139,20 +211,24 @@ type requestCtx struct {
}
func (r *requestCtx) ReceiveReply(msg api.Message) error {
+ return invokeInternal(r.rpc, r.req, msg, r.timeout)
+}
+
+func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error {
req := BinapiRequest{
- Msg: r.req,
- ReplyMsg: msg,
- Timeout: r.timeout,
+ Msg: msgIn,
+ ReplyMsg: msgOut,
+ Timeout: timeout,
}
resp := BinapiResponse{}
- err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+ err := rpc.Call("BinapiRPC.Invoke", req, &resp)
if err != nil {
return fmt.Errorf("RPC call failed: %v", err)
}
// we set the value of msg to the value from response
- reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
+ reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
return nil
}
diff --git a/proxy/proxy.go b/proxy/proxy.go
index 49c650d..5d7c499 100644
--- a/proxy/proxy.go
+++ b/proxy/proxy.go
@@ -21,7 +21,7 @@ import (
"net/http"
"net/rpc"
- "git.fd.io/govpp.git/adapter"
+ "go.fd.io/govpp/adapter"
)
// Server defines a proxy server that serves client requests to stats and binapi.
diff --git a/proxy/server.go b/proxy/server.go
index c2c4fe3..11c443f 100644
--- a/proxy/server.go
+++ b/proxy/server.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2019 Cisco and/or its affiliates.
+// Copyright (c) 2021 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
package proxy
import (
+ "context"
"errors"
"fmt"
"reflect"
@@ -22,9 +23,9 @@ import (
"sync/atomic"
"time"
- "git.fd.io/govpp.git/adapter"
- "git.fd.io/govpp.git/api"
- "git.fd.io/govpp.git/core"
+ "go.fd.io/govpp/adapter"
+ "go.fd.io/govpp/api"
+ "go.fd.io/govpp/core"
)
const (
@@ -55,6 +56,7 @@ type StatsResponse struct {
IfaceStats *api.InterfaceStats
ErrStats *api.ErrorStats
BufStats *api.BufferStats
+ MemStats *api.MemoryStats
}
// StatsRPC is a RPC server for proxying client request to api.StatsProvider.
@@ -84,7 +86,7 @@ func NewStatsRPC(stats adapter.StatsAPI) (*StatsRPC, error) {
func (s *StatsRPC) watchConnection() {
heartbeatTicker := time.NewTicker(10 * time.Second).C
atomic.StoreUint32(&s.available, 1)
- log.Println("enabling statsRPC service")
+ log.Debugln("enabling statsRPC service")
count := 0
prev := new(api.SystemStats)
@@ -176,7 +178,7 @@ func (s *StatsRPC) serviceAvailable() bool {
func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
if !s.serviceAvailable() {
- log.Println(statsErrorMsg)
+ log.Print(statsErrorMsg)
return errors.New("server does not support 'get stats' at this time, try again later")
}
log.Debugf("StatsRPC.GetStats - REQ: %+v", req)
@@ -200,6 +202,9 @@ func (s *StatsRPC) GetStats(req StatsRequest, resp *StatsResponse) error {
case "buffer":
resp.BufStats = new(api.BufferStats)
return s.statsConn.GetBufferStats(resp.BufStats)
+ case "memory":
+ resp.MemStats = new(api.MemoryStats)
+ return s.statsConn.GetMemoryStats(resp.MemStats)
default:
return fmt.Errorf("unknown stats type: %s", req.StatsType)
}
@@ -222,15 +227,21 @@ type BinapiCompatibilityRequest struct {
}
type BinapiCompatibilityResponse struct {
- CompatibleMsgs []string
- IncompatibleMsgs []string
+ CompatibleMsgs map[string][]string
+ IncompatibleMsgs map[string][]string
}
-// BinapiRPC is a RPC server for proxying client request to api.Channel.
+// BinapiRPC is a RPC server for proxying client request to api.Channel
+// or api.Stream.
type BinapiRPC struct {
binapiConn *core.Connection
binapi adapter.VppAPI
+ streamsLock sync.Mutex
+ // local ID, different from api.Stream ID
+ maxStreamID uint32
+ streams map[uint32]api.Stream
+
events chan core.ConnectionEvent
done chan struct{}
// non-zero if the RPC service is available
@@ -263,7 +274,7 @@ func (s *BinapiRPC) watchConnection() {
case core.Connected:
if !s.serviceAvailable() {
atomic.StoreUint32(&s.available, 1)
- log.Println("enabling binapiRPC service")
+ log.Debugln("enabling binapiRPC service")
}
case core.Disconnected:
if s.serviceAvailable() {
@@ -320,9 +331,104 @@ func (s *BinapiRPC) serviceAvailable() bool {
return atomic.LoadUint32(&s.available) == 1
}
+type RPCStreamReqResp struct {
+ ID uint32
+ Msg api.Message
+}
+
+func (s *BinapiRPC) NewAPIStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Print(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.NewAPIStream - REQ: %#v", req)
+
+ stream, err := s.binapiConn.NewStream(context.Background())
+ if err != nil {
+ return err
+ }
+
+ if s.streams == nil {
+ s.streams = make(map[uint32]api.Stream)
+ }
+
+ s.streamsLock.Lock()
+ s.maxStreamID++
+ s.streams[s.maxStreamID] = stream
+ resp.ID = s.maxStreamID
+ s.streamsLock.Unlock()
+
+ return nil
+}
+
+func (s *BinapiRPC) SendMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Print(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.SendMessage - REQ: %#v", req)
+
+ stream, err := s.getStream(req.ID)
+ if err != nil {
+ return err
+ }
+
+ return stream.SendMsg(req.Msg)
+}
+
+func (s *BinapiRPC) ReceiveMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Print(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.ReceiveMessage - REQ: %#v", req)
+
+ stream, err := s.getStream(req.ID)
+ if err != nil {
+ return err
+ }
+
+ resp.Msg, err = stream.RecvMsg()
+ return err
+}
+
+func (s *BinapiRPC) CloseStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Print(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.CloseStream - REQ: %#v", req)
+
+ stream, err := s.getStream(req.ID)
+ if err != nil {
+ return err
+ }
+
+ s.streamsLock.Lock()
+ delete(s.streams, req.ID)
+ s.streamsLock.Unlock()
+
+ return stream.Close()
+}
+
+func (s *BinapiRPC) getStream(id uint32) (api.Stream, error) {
+ s.streamsLock.Lock()
+ stream := s.streams[id]
+ s.streamsLock.Unlock()
+
+ if stream == nil || reflect.ValueOf(stream).IsNil() {
+ s.streamsLock.Lock()
+ // delete the stream in case it is still in the map
+ delete(s.streams, id)
+ s.streamsLock.Unlock()
+ return nil, errors.New("BinapiRPC stream closed")
+ }
+ return stream, nil
+}
+
func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
if !s.serviceAvailable() {
- log.Println(binapiErrorMsg)
+ log.Print(binapiErrorMsg)
return errors.New("server does not support 'invoke' at this time, try again later")
}
log.Debugf("BinapiRPC.Invoke - REQ: %#v", req)
@@ -364,7 +470,7 @@ func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCompatibilityResponse) error {
if !s.serviceAvailable() {
- log.Println(binapiErrorMsg)
+ log.Print(binapiErrorMsg)
return errors.New("server does not support 'compatibility check' at this time, try again later")
}
log.Debugf("BinapiRPC.Compatiblity - REQ: %#v", req)
@@ -375,25 +481,37 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo
}
defer ch.Close()
- resp.CompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
- resp.IncompatibleMsgs = make([]string, 0, len(req.MsgNameCrcs))
+ resp.CompatibleMsgs = make(map[string][]string)
+ resp.IncompatibleMsgs = make(map[string][]string)
- for _, msg := range req.MsgNameCrcs {
- val, ok := api.GetRegisteredMessages()[msg]
- if !ok {
- resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
- continue
+ for path, messages := range api.GetRegisteredMessages() {
+ resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
+ resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
+
+ for _, msg := range req.MsgNameCrcs {
+ val, ok := messages[msg]
+ if !ok {
+ resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
+ continue
+ }
+ if err = ch.CheckCompatiblity(val); err != nil {
+ resp.IncompatibleMsgs[path] = append(resp.IncompatibleMsgs[path], msg)
+ } else {
+ resp.CompatibleMsgs[path] = append(resp.CompatibleMsgs[path], msg)
+ }
}
+ }
- if err = ch.CheckCompatiblity(val); err != nil {
- resp.IncompatibleMsgs = append(resp.IncompatibleMsgs, msg)
+ compatible := false
+ for path, incompatibleMsgs := range resp.IncompatibleMsgs {
+ if len(incompatibleMsgs) == 0 {
+ compatible = true
} else {
- resp.CompatibleMsgs = append(resp.CompatibleMsgs, msg)
+ log.Debugf("messages are incompatible for path %s", path)
}
}
-
- if len(resp.IncompatibleMsgs) > 0 {
- return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs)
+ if !compatible {
+ return errors.New("compatibility check failed")
}
return nil