summaryrefslogtreecommitdiffstats
path: root/proxy/client.go
diff options
context:
space:
mode:
Diffstat (limited to 'proxy/client.go')
-rw-r--r--proxy/client.go78
1 files changed, 72 insertions, 6 deletions
diff --git a/proxy/client.go b/proxy/client.go
index aea9a94..d1c5d73 100644
--- a/proxy/client.go
+++ b/proxy/client.go
@@ -1,4 +1,4 @@
-// Copyright (c) 2019 Cisco and/or its affiliates.
+// Copyright (c) 2021 Cisco and/or its affiliates.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
package proxy
import (
+ "context"
"fmt"
"net/rpc"
"reflect"
@@ -132,6 +133,67 @@ type BinapiClient struct {
timeout time.Duration
}
+// RPCStream is a stream for forwarding requests to BinapiRPC's stream.
+type RPCStream struct {
+ rpc *rpc.Client
+ id uint32
+}
+
+func (s *RPCStream) SendMsg(msg api.Message) error {
+ req := RPCStreamReqResp{
+ ID: s.id,
+ Msg: msg,
+ }
+ resp := RPCStreamReqResp{}
+ err := s.rpc.Call("BinapiRPC.SendMessage", req, &resp)
+ if err != nil {
+ return fmt.Errorf("RPC SendMessage call failed: %v", err)
+ }
+ return nil
+}
+
+func (s *RPCStream) RecvMsg() (api.Message, error) {
+ req := RPCStreamReqResp{
+ ID: s.id,
+ }
+ resp := RPCStreamReqResp{}
+ err := s.rpc.Call("BinapiRPC.ReceiveMessage", req, &resp)
+ if err != nil {
+ return nil, fmt.Errorf("RPC ReceiveMessage call failed: %v", err)
+ }
+ return resp.Msg, nil
+}
+
+func (s *RPCStream) Close() error {
+ req := RPCStreamReqResp{
+ ID: s.id,
+ }
+ resp := RPCStreamReqResp{}
+ err := s.rpc.Call("BinapiRPC.CloseStream", req, &resp)
+ if err != nil {
+ return fmt.Errorf("RPC CloseStream call failed: %v", err)
+ }
+ return nil
+}
+
+func (b *BinapiClient) NewStream(_ context.Context, _ ...api.StreamOption) (api.Stream, error) {
+ stream := &RPCStream{
+ rpc: b.rpc,
+ }
+ req := RPCStreamReqResp{}
+ resp := RPCStreamReqResp{}
+ err := stream.rpc.Call("BinapiRPC.NewAPIStream", req, &resp)
+ if err != nil {
+ return nil, fmt.Errorf("RPC NewAPIStream call failed: %v", err)
+ }
+ stream.id = resp.ID
+ return stream, err
+}
+
+func (b *BinapiClient) Invoke(_ context.Context, request api.Message, reply api.Message) error {
+ return invokeInternal(b.rpc, request, reply, b.timeout)
+}
+
func (b *BinapiClient) SendRequest(msg api.Message) api.RequestCtx {
req := &requestCtx{
rpc: b.rpc,
@@ -149,20 +211,24 @@ type requestCtx struct {
}
func (r *requestCtx) ReceiveReply(msg api.Message) error {
+ return invokeInternal(r.rpc, r.req, msg, r.timeout)
+}
+
+func invokeInternal(rpc *rpc.Client, msgIn, msgOut api.Message, timeout time.Duration) error {
req := BinapiRequest{
- Msg: r.req,
- ReplyMsg: msg,
- Timeout: r.timeout,
+ Msg: msgIn,
+ ReplyMsg: msgOut,
+ Timeout: timeout,
}
resp := BinapiResponse{}
- err := r.rpc.Call("BinapiRPC.Invoke", req, &resp)
+ err := rpc.Call("BinapiRPC.Invoke", req, &resp)
if err != nil {
return fmt.Errorf("RPC call failed: %v", err)
}
// we set the value of msg to the value from response
- reflect.ValueOf(msg).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
+ reflect.ValueOf(msgOut).Elem().Set(reflect.ValueOf(resp.Msg).Elem())
return nil
}