diff options
Diffstat (limited to 'proxy')
-rw-r--r-- | proxy/client.go | 78 | ||||
-rw-r--r-- | proxy/server.go | 128 |
2 files changed, 189 insertions, 17 deletions
diff --git a/proxy/client.go b/proxy/client.go index aea9a94..d1c5d73 100644 --- a/proxy/client.go +++ b/proxy/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package proxy import ( + "context" "fmt" "net/rpc" "reflect" @@ -132,6 +133,67 @@ type BinapiClient struct { timeout time.Duration } +// RPCStream is a stream for forwarding requests to BinapiRPC's stream. +type RPCStream struct { + rpc *rpc.Client + id uint32 +} + +func (s *RPCStream) SendMsg(msg api.Message) error { + req := RPCStreamReqResp{ + ID: s.id, + Msg: msg, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp) + if err != nil { + return fmt.Errorf("RPC SendMessage call failed: %v", err) + } + return nil +} + +func (s *RPCStream) RecvMsg() (api.Message, error) { + req := RPCStreamReqResp{ + ID: s.id, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp) + if err != nil { + return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err) + } + return resp.Msg, nil +} + +func (s *RPCStream) Close() error { + req := RPCStreamReqResp{ + ID: s.id, + } + resp := RPCStreamReqResp{} + err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp) + if err != nil { + return fmt.Errorf("RPC CloseStream call failed: %v", err) + } + return nil +} + +func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) { + stream := &RPCStream{ + rpc: b.rpc, + } + req := RPCStreamReqResp{} + resp := RPCStreamReqResp{} + err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp) + if err != nil { + return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err) + } + stream.id = resp.ID + return stream, err +} + +func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error { + return invokeInternal(b.rpc, request, reply, b.timeout) +} + func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx { req := &requestCtx{ rpc: b.rpc, @@ -149,20 +211,24 @@ type requestCtx struct { } func (r *requestCtx) ReceiveReply(msg api.Message) error { + return invokeInternal(r.rpc, r.req, msg, r.timeout) +} + +func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error { req := BinapiRequest{ - Msg: r.req, - ReplyMsg: msg, - Timeout: r.timeout, + Msg: msgIn, + ReplyMsg: msgOut, + Timeout: timeout, } resp := BinapiResponse{} - err := r.rpc.Call("BinapiRPC.Invoke", req, &resp) + err := rpc.Call("BinapiRPC.Invoke", req, &resp) if err != nil { return fmt.Errorf("RPC call failed: %v", err) } // we set the value of msg to the value from response - reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem()) + reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem()) return nil } diff --git a/proxy/server.go b/proxy/server.go index e395468..243001a 100644 --- a/proxy/server.go +++ b/proxy/server.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 Cisco and/or its affiliates. +// Copyright (c) 2021 Cisco and/or its affiliates. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,6 +15,7 @@ package proxy import ( + "context" "errors" "fmt" "reflect" @@ -230,11 +231,17 @@ type BinapiCompatibilityResponse struct { IncompatibleMsgs map[string][]string } -// BinapiRPC is a RPC server for proxying client request to api.Channel. +// BinapiRPC is a RPC server for proxying client request to api.Channel +// or api.Stream. type BinapiRPC struct { binapiConn *core.Connection binapi adapter.VppAPI + streamsLock sync.Mutex + // local ID, different from api.Stream ID + maxStreamID uint32 + streams map[uint32]api.Stream + events chan core.ConnectionEvent done chan struct{} // non-zero if the RPC service is available @@ -324,6 +331,101 @@ func (s *BinapiRPC) serviceAvailable() bool { return atomic.LoadUint32(&s.available) == 1 } +type RPCStreamReqResp struct { + ID uint32 + Msg api.Message +} + +func (s *BinapiRPC) NewAPIStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.NewAPIStream - REQ: %#v", req) + + stream, err := s.binapiConn.NewStream(context.Background()) + if err != nil { + return err + } + + if s.streams == nil { + s.streams = make(map[uint32]api.Stream) + } + + s.streamsLock.Lock() + s.maxStreamID++ + s.streams[s.maxStreamID] = stream + resp.ID = s.maxStreamID + s.streamsLock.Unlock() + + return nil +} + +func (s *BinapiRPC) SendMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.SendMessage - REQ: %#v", req) + + stream, err := s.getStream(req.ID) + if err != nil { + return err + } + + return stream.SendMsg(req.Msg) +} + +func (s *BinapiRPC) ReceiveMessage(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.ReceiveMessage - REQ: %#v", req) + + stream, err := s.getStream(req.ID) + if err != nil { + return err + } + + resp.Msg, err = stream.RecvMsg() + return err +} + +func (s *BinapiRPC) CloseStream(req RPCStreamReqResp, resp *RPCStreamReqResp) error { + if !s.serviceAvailable() { + log.Println(binapiErrorMsg) + return errors.New("server does not support RPC calls at this time, try again later") + } + log.Debugf("BinapiRPC.CloseStream - REQ: %#v", req) + + stream, err := s.getStream(req.ID) + if err != nil { + return err + } + + s.streamsLock.Lock() + delete(s.streams, req.ID) + s.streamsLock.Unlock() + + return stream.Close() +} + +func (s *BinapiRPC) getStream(id uint32) (api.Stream, error) { + s.streamsLock.Lock() + stream := s.streams[id] + s.streamsLock.Unlock() + + if stream == nil || reflect.ValueOf(stream).IsNil() { + s.streamsLock.Lock() + // delete the stream in case it is still in the map + delete(s.streams, id) + s.streamsLock.Unlock() + return nil, errors.New("BinapiRPC stream closed") + } + return stream, nil +} + func (s *BinapiRPC) Invoke(req BinapiRequest, resp *BinapiResponse) error { if !s.serviceAvailable() { log.Println(binapiErrorMsg) @@ -383,12 +485,9 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo resp.IncompatibleMsgs = make(map[string][]string) for path, messages := range api.GetRegisteredMessages() { - if resp.IncompatibleMsgs[path] == nil { - resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) - } - if resp.CompatibleMsgs[path] == nil { - resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) - } + resp.IncompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) + resp.CompatibleMsgs[path] = make([]string, 0, len(req.MsgNameCrcs)) + for _, msg := range req.MsgNameCrcs { val, ok := messages[msg] if !ok { @@ -402,11 +501,18 @@ func (s *BinapiRPC) Compatibility(req BinapiCompatibilityRequest, resp *BinapiCo } } } - for _, messages := range resp.IncompatibleMsgs { - if len(messages) > 0 { - return fmt.Errorf("compatibility check failed for messages: %v", resp.IncompatibleMsgs) + + compatible := false + for path, incompatibleMsgs := range resp.IncompatibleMsgs { + if len(incompatibleMsgs) == 0 { + compatible = true + } else { + log.Debugf("messages are incompatible for path %s", path) } } + if !compatible { + return errors.New("compatibility check failed") + } return nil } |