aboutsummaryrefslogtreecommitdiffstats
path: root/proxy/server.go
diff options
context:
space:
mode:
authormhalaj1 <matus.halaj@pantheon.tech>2021-07-22 11:58:59 +0200
committerOndrej Fabry <ofabry@cisco.com>2021-08-10 10:00:27 +0000
commitc0da1f2999a6b08d003c0fed1a23e1ca60dd1571 (patch)
tree03fba410b5d2331e00f7cc32261cf75db5f9e3a7 /proxy/server.go
parent24f179dbb9534ed7c05bee2a80f18b55443706ab (diff)
proxy update to vpp 20.05
Signed-off-by: mhalaj1 <matus.halaj@pantheon.tech> Change-Id: I1c7b11950756d0fe789eb7badc3e883c12826671
Diffstat (limited to 'proxy/server.go')
-rw-r--r--proxy/server.go128
1 files changed, 117 insertions, 11 deletions
diff --git a/proxy/server.go b/proxy/server.go
index e395468..243001a 100644
--- a/proxy/server.go
+++ b/proxy/server.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2019 Cisco and/or its affiliates.
+// Copyright (c) 2021 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
package proxy
import (
+ "context"
"errors"
"fmt"
"reflect"
@@ -230,11 +231,17 @@ type BinapiCompatibilityResponse struct {
IncompatibleMsgs map[string][]string
}
-// BinapiRPC is a RPC server for proxying client request to api.Channel.
+// BinapiRPC is a RPC server for proxying client request to api.Channel
+// or api.Stream.
type BinapiRPC struct {
binapiConn *core.Connection
binapi adapter.VppAPI
+ streamsLock sync.Mutex
+ // local ID, different from api.Stream ID
+ maxStreamID uint32
+ streams map[uint32]api.Stream
+
events chan core.ConnectionEvent
done chan struct{}
// non-zero if the RPC service is available
@@ -324,6 +331,101 @@ func (s *BinapiRPC) serviceAvailable() bool {
return atomic.LoadUint32(&s.available) == 1
}
+type RPCStreamReqResp struct {
+ ID uint32
+ Msg api.Message
+}
+
+func (s *BinapiRPC) NewAPIStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Println(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.NewAPIStream - REQ: %#v", req)
+
+ stream, err := s.binapiConn.NewStream(context.Background())
+ if err != nil {
+ return err
+ }
+
+ if s.streams == nil {
+ s.streams = make(map[uint32]api.Stream)
+ }
+
+ s.streamsLock.Lock()
+ s.maxStreamID++
+ s.streams[s.maxStreamID] = stream
+ resp.ID = s.maxStreamID
+ s.streamsLock.Unlock()
+
+ return nil
+}
+
+func (s *BinapiRPC) SendMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Println(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.SendMessage - REQ: %#v", req)
+
+ stream, err := s.getStream(req.ID)
+ if err != nil {
+ return err
+ }
+
+ return stream.SendMsg(req.Msg)
+}
+
+func (s *BinapiRPC) ReceiveMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Println(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.ReceiveMessage - REQ: %#v", req)
+
+ stream, err := s.getStream(req.ID)
+ if err != nil {
+ return err
+ }
+
+ resp.Msg, err = stream.RecvMsg()
+ return err
+}
+
+func (s *BinapiRPC) CloseStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error {
+ if !s.serviceAvailable() {
+ log.Println(binapiErrorMsg)
+ return errors.New("server does not support RPC calls at this time, try again later")
+ }
+ log.Debugf("BinapiRPC.CloseStream - REQ: %#v", req)
+
+ stream, err := s.getStream(req.ID)
+ if err != nil {
+ return err
+ }
+
+ s.streamsLock.Lock()
+ delete(s.streams, req.ID)
+ s.streamsLock.Unlock()
+
+ return stream.Close()
+}
+
+func (s *BinapiRPC) getStream(id uint32) (api.Stream, error) {
+ s.streamsLock.Lock()
+ stream := s.streams[id]
+ s.streamsLock.Unlock()
+
+ if stream == nil || reflect.ValueOf(stream).IsNil() {
+ s.streamsLock.Lock()
+ // delete the stream in case it is still in the map
+ delete(s.streams, id)
+ s.streamsLock.Unlock()
+ return nil, errors.New("BinapiRPC stream closed")
+ }
+ return stream, nil
+}
+
func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error {
if !s.serviceAvailable() {
log.Println(binapiErrorMsg)
@@ -383,12 +485,9 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo
resp.IncompatibleMsgs = make(map[string][]string)
for path, messages := range api.GetRegisteredMessages() {
- if resp.IncompatibleMsgs[path] == nil {
- resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
- }
- if resp.CompatibleMsgs[path] == nil {
- resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
- }
+ resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
+ resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs))
+
for _, msg := range req.MsgNameCrcs {
val, ok := messages[msg]
if !ok {
@@ -402,11 +501,18 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo
}
}
}
- for _, messages := range resp.IncompatibleMsgs {
- if len(messages) > 0 {
- return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs)
+
+ compatible := false
+ for path, incompatibleMsgs := range resp.IncompatibleMsgs {
+ if len(incompatibleMsgs) == 0 {
+ compatible = true
+ } else {
+ log.Debugf("messages are incompatible for path %s", path)
}
}
+ if !compatible {
+ return errors.New("compatibility check failed")
+ }
return nil
}