From 3dc9b9885e3d7041a7c349185dc88f3bed50be6d Mon Sep 17 00:00:00 2001 From: Ondrej Fabry Date: Fri, 5 Apr 2019 23:57:56 +0200 Subject: Split outgoing packet data by 4096 bytes Change-Id: I92e2b60c1460873e890b0e9b6736b5a221742349 Signed-off-by: Ondrej Fabry --- adapter/socketclient/socketclient.go | 111 +++++++++++++++++++++++------------ 1 file changed, 74 insertions(+), 37 deletions(-) diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index 4c576c3..8efc7ba 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -13,7 +13,6 @@ import ( "time" "github.com/fsnotify/fsnotify" - "github.com/lunixbochs/struc" logger "github.com/sirupsen/logrus" @@ -25,20 +24,24 @@ import ( const ( // DefaultSocketName is default VPP API socket file name DefaultSocketName = "/run/vpp-api.sock" - - sockCreateMsgId = 15 // hard-coded id for sockclnt_create message - govppClientName = "govppsock" // client name used for socket registration ) var ( - ConnectTimeout = time.Second * 3 - DisconnectTimeout = time.Second - + // DefaultConnectTimeout is default timeout for connecting + DefaultConnectTimeout = time.Second * 3 + // DefaultDisconnectTimeout is default timeout for discconnecting + DefaultDisconnectTimeout = time.Second // MaxWaitReady defines maximum duration before waiting for socket file // times out MaxWaitReady = time.Second * 15 + // ClientName is used for identifying client in socket registration + ClientName = "govppsock" +) - Debug = os.Getenv("DEBUG_GOVPP_SOCK") != "" +var ( + // Debug is global variable that determines debug mode + Debug = os.Getenv("DEBUG_GOVPP_SOCK") != "" + // DebugMsgIds is global variable that determines debug mode for msg ids DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != "" Log = logger.New() // global logger @@ -53,16 +56,22 @@ func init() { } type vppClient struct { - sockAddr string - conn *net.UnixConn - reader *bufio.Reader + sockAddr string + conn *net.UnixConn + reader *bufio.Reader + writer *bufio.Writer + + connectTimeout time.Duration + disconnectTimeout time.Duration + cb adapter.MsgCallback clientIndex uint32 msgTable map[string]uint16 sockDelMsgId uint16 writeMu sync.Mutex - quit chan struct{} - wg sync.WaitGroup + + quit chan struct{} + wg sync.WaitGroup } func NewVppClient(sockAddr string) *vppClient { @@ -70,13 +79,23 @@ func NewVppClient(sockAddr string) *vppClient { sockAddr = DefaultSocketName } return &vppClient{ - sockAddr: sockAddr, - cb: nilCallback, + sockAddr: sockAddr, + connectTimeout: DefaultConnectTimeout, + disconnectTimeout: DefaultDisconnectTimeout, + cb: func(msgID uint16, data []byte) { + Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data)) + }, } } -func nilCallback(msgID uint16, data []byte) { - Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data)) +// SetConnectTimeout sets timeout used during connecting. +func (c *vppClient) SetConnectTimeout(t time.Duration) { + c.connectTimeout = t +} + +// SetDisconnectTimeout sets timeout used during disconnecting. +func (c *vppClient) SetDisconnectTimeout(t time.Duration) { + c.disconnectTimeout = t } // WaitReady checks socket file existence and waits for it if necessary @@ -160,17 +179,24 @@ func (c *vppClient) connect(sockAddr string) error { c.conn = conn c.reader = bufio.NewReader(c.conn) + c.writer = bufio.NewWriter(c.conn) Log.Debugf("Connected to socket: %v", addr) return nil } +const ( + sockCreateMsgId = 15 // hard-coded sockclnt_create message ID + createMsgContext = byte(123) + deleteMsgContext = byte(124) +) + func (c *vppClient) open() error { msgCodec := new(codec.MsgCodec) req := &memclnt.SockclntCreate{ - Name: []byte(govppClientName), + Name: []byte(ClientName), } msg, err := msgCodec.EncodeMsg(req, sockCreateMsgId) if err != nil { @@ -178,14 +204,14 @@ func (c *vppClient) open() error { return err } // set non-0 context - msg[5] = 123 + msg[5] = createMsgContext if err := c.write(msg); err != nil { Log.Debugln("Write error: ", err) return err } - readDeadline := time.Now().Add(ConnectTimeout) + readDeadline := time.Now().Add(c.connectTimeout) if err := c.conn.SetReadDeadline(readDeadline); err != nil { return err } @@ -199,8 +225,6 @@ func (c *vppClient) open() error { return err } - //log.Printf("Client got (%d): % 0X", len(msgReply), msgReply) - reply := new(memclnt.SockclntCreateReply) if err := msgCodec.DecodeMsg(msgReply, reply); err != nil { Log.Println("Decode error:", err) @@ -266,7 +290,7 @@ func (c *vppClient) close() error { return err } // set non-0 context - msg[5] = 124 + msg[5] = deleteMsgContext Log.Debugf("sending socklntDel (%d byes): % 0X\n", len(msg), msg) if err := c.write(msg); err != nil { @@ -274,7 +298,7 @@ func (c *vppClient) close() error { return err } - readDeadline := time.Now().Add(DisconnectTimeout) + readDeadline := time.Now().Add(c.disconnectTimeout) if err := c.conn.SetReadDeadline(readDeadline); err != nil { return err } @@ -313,7 +337,7 @@ func (c *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) { } type reqHeader struct { - //MsgID uint16 + // MsgID uint16 ClientIndex uint32 Context uint32 } @@ -341,7 +365,7 @@ func (c *vppClient) SendMsg(context uint32, data []byte) error { func (c *vppClient) write(msg []byte) error { h := &msgheader{ - Data_len: uint32(len(msg)), + DataLen: uint32(len(msg)), } buf := new(bytes.Buffer) if err := struc.Pack(buf, h); err != nil { @@ -353,17 +377,31 @@ func (c *vppClient) write(msg []byte) error { c.writeMu.Lock() defer c.writeMu.Unlock() - var w io.Writer = c.conn - - if n, err := w.Write(header); err != nil { + if n, err := c.writer.Write(header); err != nil { return err } else { Log.Debugf(" - header sent (%d/%d): % 0X", n, len(header), header) } - if n, err := w.Write(msg); err != nil { + + if err := c.writer.Flush(); err != nil { return err - } else { - Log.Debugf(" - msg sent (%d/%d): % 0X", n, len(msg), msg) + } + + for i := 0; i <= len(msg)/c.writer.Size(); i++ { + x := i*c.writer.Size() + c.writer.Size() + if x > len(msg) { + x = len(msg) + } + Log.Debugf("x=%v i=%v len=%v mod=%v\n", x, i, len(msg), len(msg)/c.writer.Size()) + if n, err := c.writer.Write(msg[i*c.writer.Size() : x]); err != nil { + return err + } else { + Log.Debugf(" - msg sent x=%d (%d/%d): % 0X", x, n, len(msg), msg) + } + if err := c.writer.Flush(); err != nil { + return err + } + } return nil @@ -404,10 +442,9 @@ func (c *vppClient) readerLoop() { } type msgheader struct { - Q int `struc:"uint64"` - Data_len uint32 `struc:"uint32"` - Gc_mark_timestamp uint32 `struc:"uint32"` - //data [0]uint8 + Q int `struc:"uint64"` + DataLen uint32 `struc:"uint32"` + GcMarkTimestamp uint32 `struc:"uint32"` } func (c *vppClient) read() ([]byte, error) { @@ -434,7 +471,7 @@ func (c *vppClient) read() ([]byte, error) { } Log.Debugf(" - decoded header: %+v", h) - msgLen := int(h.Data_len) + msgLen := int(h.DataLen) msg := make([]byte, msgLen) n, err = c.reader.Read(msg) -- cgit 1.2.3-korg