summaryrefslogtreecommitdiffstats
path: root/adapter/socketclient
diff options
context:
space:
mode:
Diffstat (limited to 'adapter/socketclient')
-rw-r--r--adapter/socketclient/socketclient.go111
1 files changed, 74 insertions, 37 deletions
diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go
index 4c576c3..8efc7ba 100644
--- a/adapter/socketclient/socketclient.go
+++ b/adapter/socketclient/socketclient.go
@@ -13,7 +13,6 @@ import (
"time"
"github.com/fsnotify/fsnotify"
-
"github.com/lunixbochs/struc"
logger "github.com/sirupsen/logrus"
@@ -25,20 +24,24 @@ import (
const (
// DefaultSocketName is default VPP API socket file name
DefaultSocketName = "/run/vpp-api.sock"
-
- sockCreateMsgId = 15 // hard-coded id for sockclnt_create message
- govppClientName = "govppsock" // client name used for socket registration
)
var (
- ConnectTimeout = time.Second * 3
- DisconnectTimeout = time.Second
-
+ // DefaultConnectTimeout is default timeout for connecting
+ DefaultConnectTimeout = time.Second * 3
+ // DefaultDisconnectTimeout is default timeout for discconnecting
+ DefaultDisconnectTimeout = time.Second
// MaxWaitReady defines maximum duration before waiting for socket file
// times out
MaxWaitReady = time.Second * 15
+ // ClientName is used for identifying client in socket registration
+ ClientName = "govppsock"
+)
- Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
+var (
+ // Debug is global variable that determines debug mode
+ Debug = os.Getenv("DEBUG_GOVPP_SOCK") != ""
+ // DebugMsgIds is global variable that determines debug mode for msg ids
DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
Log = logger.New() // global logger
@@ -53,16 +56,22 @@ func init() {
}
type vppClient struct {
- sockAddr string
- conn *net.UnixConn
- reader *bufio.Reader
+ sockAddr string
+ conn *net.UnixConn
+ reader *bufio.Reader
+ writer *bufio.Writer
+
+ connectTimeout time.Duration
+ disconnectTimeout time.Duration
+
cb adapter.MsgCallback
clientIndex uint32
msgTable map[string]uint16
sockDelMsgId uint16
writeMu sync.Mutex
- quit chan struct{}
- wg sync.WaitGroup
+
+ quit chan struct{}
+ wg sync.WaitGroup
}
func NewVppClient(sockAddr string) *vppClient {
@@ -70,13 +79,23 @@ func NewVppClient(sockAddr string) *vppClient {
sockAddr = DefaultSocketName
}
return &vppClient{
- sockAddr: sockAddr,
- cb: nilCallback,
+ sockAddr: sockAddr,
+ connectTimeout: DefaultConnectTimeout,
+ disconnectTimeout: DefaultDisconnectTimeout,
+ cb: func(msgID uint16, data []byte) {
+ Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
+ },
}
}
-func nilCallback(msgID uint16, data []byte) {
- Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
+// SetConnectTimeout sets timeout used during connecting.
+func (c *vppClient) SetConnectTimeout(t time.Duration) {
+ c.connectTimeout = t
+}
+
+// SetDisconnectTimeout sets timeout used during disconnecting.
+func (c *vppClient) SetDisconnectTimeout(t time.Duration) {
+ c.disconnectTimeout = t
}
// WaitReady checks socket file existence and waits for it if necessary
@@ -160,17 +179,24 @@ func (c *vppClient) connect(sockAddr string) error {
c.conn = conn
c.reader = bufio.NewReader(c.conn)
+ c.writer = bufio.NewWriter(c.conn)
Log.Debugf("Connected to socket: %v", addr)
return nil
}
+const (
+ sockCreateMsgId = 15 // hard-coded sockclnt_create message ID
+ createMsgContext = byte(123)
+ deleteMsgContext = byte(124)
+)
+
func (c *vppClient) open() error {
msgCodec := new(codec.MsgCodec)
req := &memclnt.SockclntCreate{
- Name: []byte(govppClientName),
+ Name: []byte(ClientName),
}
msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId)
if err != nil {
@@ -178,14 +204,14 @@ func (c *vppClient) open() error {
return err
}
// set non-0 context
- msg[5] = 123
+ msg[5] = createMsgContext
if err := c.write(msg); err != nil {
Log.Debugln("Write error: ", err)
return err
}
- readDeadline := time.Now().Add(ConnectTimeout)
+ readDeadline := time.Now().Add(c.connectTimeout)
if err := c.conn.SetReadDeadline(readDeadline); err != nil {
return err
}
@@ -199,8 +225,6 @@ func (c *vppClient) open() error {
return err
}
- //log.Printf("Client got (%d): % 0X", len(msgReply), msgReply)
-
reply := new(memclnt.SockclntCreateReply)
if err := msgCodec.DecodeMsg(msgReply, reply); err != nil {
Log.Println("Decode error:", err)
@@ -266,7 +290,7 @@ func (c *vppClient) close() error {
return err
}
// set non-0 context
- msg[5] = 124
+ msg[5] = deleteMsgContext
Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg)
if err := c.write(msg); err != nil {
@@ -274,7 +298,7 @@ func (c *vppClient) close() error {
return err
}
- readDeadline := time.Now().Add(DisconnectTimeout)
+ readDeadline := time.Now().Add(c.disconnectTimeout)
if err := c.conn.SetReadDeadline(readDeadline); err != nil {
return err
}
@@ -313,7 +337,7 @@ func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
}
type reqHeader struct {
- //MsgID uint16
+ // MsgID uint16
ClientIndex uint32
Context uint32
}
@@ -341,7 +365,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error {
func (c *vppClient) write(msg []byte) error {
h := &msgheader{
- Data_len: uint32(len(msg)),
+ DataLen: uint32(len(msg)),
}
buf := new(bytes.Buffer)
if err := struc.Pack(buf, h); err != nil {
@@ -353,17 +377,31 @@ func (c *vppClient) write(msg []byte) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
- var w io.Writer = c.conn
-
- if n, err := w.Write(header); err != nil {
+ if n, err := c.writer.Write(header); err != nil {
return err
} else {
Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header)
}
- if n, err := w.Write(msg); err != nil {
+
+ if err := c.writer.Flush(); err != nil {
return err
- } else {
- Log.Debugf(" - msg sent (%d/%d): % 0X", n, len(msg), msg)
+ }
+
+ for i := 0; i <= len(msg)/c.writer.Size(); i++ {
+ x := i*c.writer.Size() + c.writer.Size()
+ if x > len(msg) {
+ x = len(msg)
+ }
+ Log.Debugf("x=%v i=%v len=%v mod=%v\n", x, i, len(msg), len(msg)/c.writer.Size())
+ if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil {
+ return err
+ } else {
+ Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg)
+ }
+ if err := c.writer.Flush(); err != nil {
+ return err
+ }
+
}
return nil
@@ -404,10 +442,9 @@ func (c *vppClient) readerLoop() {
}
type msgheader struct {
- Q int `struc:"uint64"`
- Data_len uint32 `struc:"uint32"`
- Gc_mark_timestamp uint32 `struc:"uint32"`
- //data [0]uint8
+ Q int `struc:"uint64"`
+ DataLen uint32 `struc:"uint32"`
+ GcMarkTimestamp uint32 `struc:"uint32"`
}
func (c *vppClient) read() ([]byte, error) {
@@ -434,7 +471,7 @@ func (c *vppClient) read() ([]byte, error) {
}
Log.Debugf(" - decoded header: %+v", h)
- msgLen := int(h.Data_len)
+ msgLen := int(h.DataLen)
msg := make([]byte, msgLen)
n, err = c.reader.Read(msg)