From c0da1f2999a6b08d003c0fed1a23e1ca60dd1571 Mon Sep 17 00:00:00 2001 From: mhalaj1 Date: Thu, 22 Jul 2021 11:58:59 +0200 Subject: proxy update to vpp 20.05 Signed-off-by: mhalaj1 Change-Id: I1c7b11950756d0fe789eb7badc3e883c12826671 --- proxy/server.go | 128 +++++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 117 insertions(+), 11 deletions(-) (limited to 'proxy/server.go') diff --git a/proxy/server.go b/proxy/server.go index e395468..243001a 100644 --- a/proxy/server.go +++ b/proxy/server.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package proxy import ( + "context" "errors" "fmt" "reflect" @@ -230,11 +231,17 @@ type BinapiCompatibilityResponse struct { IncompatibleMsgs map[string][]string } -// BinapiRPC is a RPC server for proxying client request to api.Channel. +// BinapiRPC is a RPC server for proxying client request to api.Channel +// or api.Stream. type BinapiRPC struct { binapiConn *core.Connection binapi adapter.VppAPI + streamsLock sync.Mutex + // local ID, different from api.Stream ID + maxStreamID uint32 + streams map[uint32]api.Stream + events chan core.ConnectionEvent done chan struct{} // non-zero if the RPC service is available @@ -324,6 +331,101 @@ func (s *BinapiRPC) serviceAvailable() bool { return atomic.LoadUint32(&s.available) == 1 } +type RPCStreamReqResp struct { + ID uint32 + Msg api.Message +} + +func (s *BinapiRPC) NewAPIStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.NewAPIStream - REQ: %#v", req) + + stream, err := s.binapiConn.NewStream(context.Background()) + if err != nil { + return err + } + + if s.streams == nil { + s.streams = make(map[uint32]api.Stream) + } + + s.streamsLock.Lock() + s.maxStreamID++ + s.streams[s.maxStreamID] = stream + resp.ID = s.maxStreamID + s.streamsLock.Unlock() + + return nil +} + +func (s *BinapiRPC) SendMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.SendMessage - REQ: %#v", req) + + stream, err := s.getStream(req.ID) + if err != nil { + return err + } + + return stream.SendMsg(req.Msg) +} + +func (s *BinapiRPC) ReceiveMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.ReceiveMessage - REQ: %#v", req) + + stream, err := s.getStream(req.ID) + if err != nil { + return err + } + + resp.Msg, err = stream.RecvMsg() + return err +} + +func (s *BinapiRPC) CloseStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.CloseStream - REQ: %#v", req) + + stream, err := s.getStream(req.ID) + if err != nil { + return err + } + + s.streamsLock.Lock() + delete(s.streams, req.ID) + s.streamsLock.Unlock() + + return stream.Close() +} + +func (s *BinapiRPC) getStream(id uint32) (api.Stream, error) { + s.streamsLock.Lock() + stream := s.streams[id] + s.streamsLock.Unlock() + + if stream == nil || reflect.ValueOf(stream).IsNil() { + s.streamsLock.Lock() + // delete the stream in case it is still in the map + delete(s.streams, id) + s.streamsLock.Unlock() + return nil, errors.New("BinapiRPC stream closed") + } + return stream, nil +} + func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error { if !s.serviceAvailable() { log.Println(binapiErrorMsg) @@ -383,12 +485,9 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo resp.IncompatibleMsgs = make(map[string][]string) for path, messages := range api.GetRegisteredMessages() { - if resp.IncompatibleMsgs[path] == nil { - resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) - } - if resp.CompatibleMsgs[path] == nil { - resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) - } + resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) + resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) + for _, msg := range req.MsgNameCrcs { val, ok := messages[msg] if !ok { @@ -402,11 +501,18 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo } } } - for _, messages := range resp.IncompatibleMsgs { - if len(messages) > 0 { - return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs) + + compatible := false + for path, incompatibleMsgs := range resp.IncompatibleMsgs { + if len(incompatibleMsgs) == 0 { + compatible = true + } else { + log.Debugf("messages are incompatible for path %s", path) } } + if !compatible { + return errors.New("compatibility check failed") + } return nil } -- cgit 1.2.3-korg