From 5257dbb8253fe5b70b75f9c064c4593ca7aee99f Mon Sep 17 00:00:00 2001 From: imarom Date: Wed, 4 Jan 2017 18:46:45 +0200 Subject: draft - unreviewed Signed-off-by: imarom --- src/stateless/rx/trex_stateless_capture.cpp | 28 ++- src/stateless/rx/trex_stateless_capture.h | 53 ++++-- src/stateless/rx/trex_stateless_rx_core.cpp | 65 ++++--- src/stateless/rx/trex_stateless_rx_core.h | 17 +- src/stateless/rx/trex_stateless_rx_defs.h | 2 + src/stateless/rx/trex_stateless_rx_port_mngr.cpp | 227 +---------------------- src/stateless/rx/trex_stateless_rx_port_mngr.h | 149 +-------------- 7 files changed, 127 insertions(+), 414 deletions(-) (limited to 'src/stateless/rx') diff --git a/src/stateless/rx/trex_stateless_capture.cpp b/src/stateless/rx/trex_stateless_capture.cpp index 83bb2d38..4ed126cc 100644 --- a/src/stateless/rx/trex_stateless_capture.cpp +++ b/src/stateless/rx/trex_stateless_capture.cpp @@ -53,22 +53,36 @@ TrexStatelessCapture::handle_pkt_rx(const rte_mbuf_t *m, int port) { m_pkt_buffer->push(m); } +void +TrexStatelessCaptureMngr::update_global_filter() { + CaptureFilter new_filter; + + for (TrexStatelessCapture *capture : m_captures) { + new_filter += capture->get_filter(); + } + + m_global_filter = new_filter; +} + capture_id_t TrexStatelessCaptureMngr::add(uint64_t limit, const CaptureFilter &filter) { if (m_captures.size() > MAX_CAPTURE_SIZE) { - throw TrexException(TrexException::T_CAPTURE_MAX_INSTANCES); + return CAPTURE_TOO_MANY_CAPTURES; } int new_id = m_id_counter++; TrexStatelessCapture *new_buffer = new TrexStatelessCapture(new_id, limit, filter); m_captures.push_back(new_buffer); + + /* update global filter */ + update_global_filter(); return new_id; } -void +capture_id_t TrexStatelessCaptureMngr::remove(capture_id_t id) { int index = -1; @@ -81,12 +95,18 @@ TrexStatelessCaptureMngr::remove(capture_id_t id) { /* does not exist */ if (index == -1) { - throw TrexException(TrexException::T_CAPTURE_INVALID_ID); + return CAPTURE_ID_NOT_FOUND; } TrexStatelessCapture *capture = m_captures[index]; m_captures.erase(m_captures.begin() + index); - delete capture; + + delete capture; + + /* update global filter */ + update_global_filter(); + + return id; } void diff --git a/src/stateless/rx/trex_stateless_capture.h b/src/stateless/rx/trex_stateless_capture.h index f7cd451f..4d0b6a78 100644 --- a/src/stateless/rx/trex_stateless_capture.h +++ b/src/stateless/rx/trex_stateless_capture.h @@ -31,16 +31,16 @@ limitations under the License. class CaptureFilter { public: CaptureFilter() { - tx_active = 0; - rx_active = 0; + m_tx_active = 0; + m_rx_active = 0; } void add_tx(uint8_t port_id) { - tx_active |= (1LL << port_id); + m_tx_active |= (1LL << port_id); } void add_rx(uint8_t port_id) { - rx_active |= (1LL << port_id); + m_rx_active |= (1LL << port_id); } void add(uint8_t port_id) { @@ -63,21 +63,36 @@ public: bool in_rx(uint8_t port_id) const { uint64_t bit = (1LL << port_id); - return ((rx_active & bit) == bit); + return ((m_rx_active & bit) == bit); } bool in_tx(uint8_t port_id) const { uint64_t bit = (1LL << port_id); - return ((tx_active & bit) == bit); + return ((m_tx_active & bit) == bit); + } + + bool in_any(uint8_t port_id) const { + return ( in_tx(port_id) || in_rx(port_id) ); + } + + CaptureFilter& operator +=(const CaptureFilter &other) { + m_tx_active |= other.m_tx_active; + m_rx_active |= other.m_rx_active; + + return *this; } private: - uint64_t tx_active; - uint64_t rx_active; + uint64_t m_tx_active; + uint64_t m_rx_active; }; -typedef uint64_t capture_id_t; +typedef int64_t capture_id_t; +enum { + CAPTURE_ID_NOT_FOUND = -1, + CAPTURE_TOO_MANY_CAPTURES = -2, +}; class TrexStatelessCapture { public: @@ -93,6 +108,10 @@ public: return m_id; } + const CaptureFilter & get_filter() const { + return m_filter; + } + private: TrexPktBuffer *m_pkt_buffer; CaptureFilter m_filter; @@ -122,9 +141,11 @@ public: /** - * stops capture mode + * stops capture mode + * on success, will return the ID of the removed one + * o.w it will be an error */ - void remove(capture_id_t id); + capture_id_t remove(capture_id_t id); /** * removes all captures @@ -139,8 +160,8 @@ public: * * @return bool */ - bool is_active() const { - return (m_captures.size() != 0); + bool is_active(uint8_t port) const { + return m_global_filter.in_any(port); } /** @@ -153,7 +174,7 @@ public: */ void handle_pkt_rx(const rte_mbuf_t *m, int port) { /* fast path */ - if (!is_active()) { + if (!is_active(port)) { return; } @@ -169,11 +190,15 @@ private: } void handle_pkt_rx_slow_path(const rte_mbuf_t *m, int port); + void update_global_filter(); std::vector m_captures; capture_id_t m_id_counter; + /* a union of all the filters curently active */ + CaptureFilter m_global_filter; + static const int MAX_CAPTURE_SIZE = 10; }; diff --git a/src/stateless/rx/trex_stateless_rx_core.cpp b/src/stateless/rx/trex_stateless_rx_core.cpp index d27485de..f1ba303a 100644 --- a/src/stateless/rx/trex_stateless_rx_core.cpp +++ b/src/stateless/rx/trex_stateless_rx_core.cpp @@ -69,11 +69,11 @@ void CRFC2544Info::export_data(rfc2544_info_t_ &obj) { void CRxCoreStateless::create(const CRxSlCfg &cfg) { m_capture = false; m_max_ports = cfg.m_max_ports; - + m_tx_cores = cfg.m_tx_cores; + CMessagingManager * cp_rx = CMsgIns::Ins()->getCpRx(); m_ring_from_cp = cp_rx->getRingCpToDp(0); - m_ring_to_cp = cp_rx->getRingDpToCp(0); m_state = STATE_IDLE; for (int i = 0; i < MAX_FLOW_STATS_PAYLOAD; i++) { @@ -130,6 +130,36 @@ bool CRxCoreStateless::periodic_check_for_cp_messages() { } +void +CRxCoreStateless::periodic_check_for_dp_messages() { + + for (int i = 0; i < m_tx_cores; i++) { + periodic_check_for_dp_messages_core(i); + } + +} + +void +CRxCoreStateless::periodic_check_for_dp_messages_core(uint32_t core_id) { + + CNodeRing *ring = CMsgIns::Ins()->getRxDp()->getRingDpToCp(core_id); + + /* fast path */ + if ( likely ( ring->isEmpty() ) ) { + return; + } + + while (true) { + CGenNode *node = NULL; + + if (ring->Dequeue(node) != 0) { + break; + } + + //assert(node); + } +} + void CRxCoreStateless::recalculate_next_state() { if (m_state == STATE_QUIT) { return; @@ -175,16 +205,6 @@ void CRxCoreStateless::idle_state_loop() { } } -/** - * for each port give a tick (for flushing if needed) - * - */ -void CRxCoreStateless::port_manager_tick() { - for (int i = 0; i < m_max_ports; i++) { - m_rx_port_mngr[i].tick(); - } -} - /** * for each port handle the grat ARP mechansim * @@ -199,7 +219,6 @@ void CRxCoreStateless::handle_work_stage() { /* set the next sync time to */ dsec_t sync_time_sec = now_sec() + (1.0 / 1000); - dsec_t tick_time_sec = now_sec() + 1.0; dsec_t grat_arp_sec = now_sec() + (double)CGlobalInfo::m_options.m_arp_ref_per; while (m_state == STATE_WORKING) { @@ -211,14 +230,10 @@ void CRxCoreStateless::handle_work_stage() { if ( (now - sync_time_sec) > 0 ) { periodic_check_for_cp_messages(); + //periodic_check_for_dp_messages(); sync_time_sec = now + (1.0 / 1000); } - if ( (now - tick_time_sec) > 0) { - port_manager_tick(); - tick_time_sec = now + 1.0; - } - if ( (now - grat_arp_sec) > 0) { handle_grat_arp(); grat_arp_sec = now + (double)CGlobalInfo::m_options.m_arp_ref_per; @@ -317,16 +332,14 @@ double CRxCoreStateless::get_cpu_util() { } -void -CRxCoreStateless::start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit) { - m_rx_port_mngr[port_id].start_recorder(pcap_filename, limit); - recalculate_next_state(); +capture_id_t +CRxCoreStateless::start_capture(uint64_t limit, const CaptureFilter &filter) { + return TrexStatelessCaptureMngr::getInstance().add(limit, filter); } -void -CRxCoreStateless::stop_recorder(uint8_t port_id) { - m_rx_port_mngr[port_id].stop_recorder(); - recalculate_next_state(); +capture_id_t +CRxCoreStateless::stop_capture(capture_id_t capture_id) { + return TrexStatelessCaptureMngr::getInstance().remove(capture_id); } void diff --git a/src/stateless/rx/trex_stateless_rx_core.h b/src/stateless/rx/trex_stateless_rx_core.h index 4eed59a1..21ed51ba 100644 --- a/src/stateless/rx/trex_stateless_rx_core.h +++ b/src/stateless/rx/trex_stateless_rx_core.h @@ -27,6 +27,7 @@ #include "pal/linux/sanb_atomic.h" #include "utl_cpuu.h" #include "trex_stateless_rx_port_mngr.h" +#include "trex_stateless_capture.h" class TrexStatelessCpToRxMsgBase; @@ -127,16 +128,16 @@ class CRxCoreStateless { double get_cpu_util(); void update_cpu_util(); - const RXPacketBuffer *get_rx_queue_pkts(uint8_t port_id) { + const TrexPktBuffer *get_rx_queue_pkts(uint8_t port_id) { return m_rx_port_mngr[port_id].get_pkt_buffer(); } /** - * start capturing of RX packets on a specific port + * start capturing packets * */ - void start_recorder(uint8_t port_id, const std::string &pcap_filename, uint64_t limit); - void stop_recorder(uint8_t port_id); + capture_id_t start_capture(uint64_t limit, const CaptureFilter &filter); + capture_id_t stop_capture(capture_id_t capture_id); /** * start RX queueing of packets @@ -162,7 +163,12 @@ class CRxCoreStateless { private: void handle_cp_msg(TrexStatelessCpToRxMsgBase *msg); + bool periodic_check_for_cp_messages(); + + void periodic_check_for_dp_messages(); + void periodic_check_for_dp_messages_core(uint32_t core_id); + void tickle(); void idle_state_loop(); @@ -172,7 +178,6 @@ class CRxCoreStateless { void capture_pkt(rte_mbuf_t *m); void handle_rx_queue_msgs(uint8_t thread_id, CNodeRing * r); void handle_work_stage(); - void port_manager_tick(); void handle_grat_arp(); int process_all_pending_pkts(bool flush_rx = false); @@ -186,10 +191,10 @@ class CRxCoreStateless { private: TrexMonitor m_monitor; uint32_t m_max_ports; + uint32_t m_tx_cores; bool m_capture; state_e m_state; CNodeRing *m_ring_from_cp; - CNodeRing *m_ring_to_cp; CCpuUtlDp m_cpu_dp_u; CCpuUtlCp m_cpu_cp_u; diff --git a/src/stateless/rx/trex_stateless_rx_defs.h b/src/stateless/rx/trex_stateless_rx_defs.h index aefcc133..367cf4e3 100644 --- a/src/stateless/rx/trex_stateless_rx_defs.h +++ b/src/stateless/rx/trex_stateless_rx_defs.h @@ -38,10 +38,12 @@ class CRxSlCfg { m_max_ports = 0; m_cps = 0.0; m_num_crc_fix_bytes = 0; + m_tx_cores = 0; } public: uint32_t m_max_ports; + uint32_t m_tx_cores; double m_cps; CPortLatencyHWBase * m_ports[TREX_MAX_PORTS]; uint8_t m_num_crc_fix_bytes; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp index e16b3d0c..d2e0b4e8 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.cpp +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.cpp @@ -20,48 +20,10 @@ */ #include "bp_sim.h" #include "trex_stateless_rx_port_mngr.h" -#include "common/captureFile.h" #include "trex_stateless_rx_core.h" #include "common/Network/Packet/Arp.h" #include "pkt_gen.h" - -/** - * copy MBUF to a flat buffer - * - * @author imarom (12/20/2016) - * - * @param dest - buffer with at least rte_pktmbuf_pkt_len(m) - * bytes - * @param m - MBUF to copy - * - * @return uint8_t* - */ -void copy_mbuf(uint8_t *dest, const rte_mbuf_t *m) { - - int index = 0; - for (const rte_mbuf_t *it = m; it != NULL; it = it->next) { - const uint8_t *src = rte_pktmbuf_mtod(it, const uint8_t *); - memcpy(dest + index, src, it->data_len); - index += it->data_len; - } -} - -/************************************** - * RX packet - * - *************************************/ -RXPacket::RXPacket(const rte_mbuf_t *m) { - - /* allocate buffer */ - m_size = m->pkt_len; - m_raw = new uint8_t[m_size]; - - /* copy data */ - copy_mbuf(m_raw, m); - - /* generate a packet timestamp */ - m_timestamp = now_sec(); -} +#include "trex_stateless_capture.h" /************************************** * latency RX feature @@ -231,79 +193,12 @@ RXLatency::to_json() const { * *************************************/ -RXPacketBuffer::RXPacketBuffer(uint64_t size) { - m_buffer = nullptr; - m_head = 0; - m_tail = 0; - m_size = (size + 1); // for the empty/full difference 1 slot reserved - - /* generate queue */ - m_buffer = new RXPacket*[m_size](); // zeroed -} - -RXPacketBuffer::~RXPacketBuffer() { - assert(m_buffer); - - while (!is_empty()) { - RXPacket *pkt = pop(); - delete pkt; - } - delete [] m_buffer; -} - -void -RXPacketBuffer::push(const rte_mbuf_t *m) { - /* if full - pop the oldest */ - if (is_full()) { - delete pop(); - } - - /* push packet */ - m_buffer[m_head] = new RXPacket(m); - m_head = next(m_head); -} - -RXPacket * -RXPacketBuffer::pop() { - assert(!is_empty()); - - RXPacket *pkt = m_buffer[m_tail]; - m_tail = next(m_tail); - - return pkt; -} - -uint64_t -RXPacketBuffer::get_element_count() const { - if (m_head >= m_tail) { - return (m_head - m_tail); - } else { - return ( get_capacity() - (m_tail - m_head - 1) ); - } -} - -Json::Value -RXPacketBuffer::to_json() const { - - Json::Value output = Json::arrayValue; - - int tmp = m_tail; - while (tmp != m_head) { - RXPacket *pkt = m_buffer[tmp]; - output.append(pkt->to_json()); - tmp = next(tmp); - } - - return output; -} - - void RXQueue::start(uint64_t size) { if (m_pkt_buffer) { delete m_pkt_buffer; } - m_pkt_buffer = new RXPacketBuffer(size); + m_pkt_buffer = new TrexPktBuffer(size, TrexPktBuffer::MODE_DROP_HEAD); } void @@ -314,7 +209,7 @@ RXQueue::stop() { } } -const RXPacketBuffer * +const TrexPktBuffer * RXQueue::fetch() { /* if no buffer or the buffer is empty - give a NULL one */ @@ -323,10 +218,10 @@ RXQueue::fetch() { } /* hold a pointer to the old one */ - RXPacketBuffer *old_buffer = m_pkt_buffer; + TrexPktBuffer *old_buffer = m_pkt_buffer; /* replace the old one with a new one and freeze the old */ - m_pkt_buffer = new RXPacketBuffer(old_buffer->get_capacity()); + m_pkt_buffer = new TrexPktBuffer(old_buffer->get_capacity(), old_buffer->get_mode()); return old_buffer; } @@ -348,97 +243,6 @@ RXQueue::to_json() const { return output; } -/************************************** - * RX feature recorder - * - *************************************/ - -RXPacketRecorder::RXPacketRecorder() { - m_writer = NULL; - m_count = 0; - m_limit = 0; - m_epoch = -1; - - m_pending_flush = false; -} - -void -RXPacketRecorder::start(const std::string &pcap, uint64_t limit) { - m_writer = CCapWriterFactory::CreateWriter(LIBPCAP, (char *)pcap.c_str()); - if (m_writer == NULL) { - std::stringstream ss; - ss << "unable to create PCAP file: " << pcap; - throw TrexException(ss.str()); - } - - assert(limit > 0); - - m_limit = limit; - m_count = 0; - m_pending_flush = false; - m_pcap_filename = pcap; -} - -void -RXPacketRecorder::stop() { - if (!m_writer) { - return; - } - - delete m_writer; - m_writer = NULL; -} - -void -RXPacketRecorder::flush_to_disk() { - - if (m_writer && m_pending_flush) { - m_writer->flush_to_disk(); - m_pending_flush = false; - } -} - -void -RXPacketRecorder::handle_pkt(const rte_mbuf_t *m) { - if (!m_writer) { - return; - } - - dsec_t now = now_sec(); - if (m_epoch < 0) { - m_epoch = now; - } - - dsec_t dt = now - m_epoch; - - CPktNsecTimeStamp t_c(dt); - m_pkt.time_nsec = t_c.m_time_nsec; - m_pkt.time_sec = t_c.m_time_sec; - - copy_mbuf((uint8_t *)m_pkt.raw, m); - m_pkt.pkt_len = m->pkt_len; - - m_writer->write_packet(&m_pkt); - m_count++; - m_pending_flush = true; - - if (m_count == m_limit) { - stop(); - } - -} - -Json::Value -RXPacketRecorder::to_json() const { - Json::Value output = Json::objectValue; - - output["pcap_filename"] = m_pcap_filename; - output["limit"] = Json::UInt64(m_limit); - output["count"] = Json::UInt64(m_count); - - return output; -} - /************************************** * RX feature server (ARP, ICMP) and etc. @@ -786,10 +590,6 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { m_latency.handle_pkt(m); } - if (is_feature_set(RECORDER)) { - m_recorder.handle_pkt(m); - } - if (is_feature_set(QUEUE)) { m_queue.handle_pkt(m); } @@ -797,6 +597,9 @@ void RXPortManager::handle_pkt(const rte_mbuf_t *m) { if (is_feature_set(SERVER)) { m_server.handle_pkt(m); } + + /* capture */ + TrexStatelessCaptureMngr::getInstance().handle_pkt_rx(m, m_port_id); } int RXPortManager::process_all_pending_pkts(bool flush_rx) { @@ -834,13 +637,6 @@ int RXPortManager::process_all_pending_pkts(bool flush_rx) { return cnt_p; } -void -RXPortManager::tick() { - if (is_feature_set(RECORDER)) { - m_recorder.flush_to_disk(); - } -} - void RXPortManager::send_next_grat_arp() { if (is_feature_set(GRAT_ARP)) { @@ -887,13 +683,6 @@ RXPortManager::to_json() const { output["latency"]["is_active"] = false; } - if (is_feature_set(RECORDER)) { - output["sniffer"] = m_recorder.to_json(); - output["sniffer"]["is_active"] = true; - } else { - output["sniffer"]["is_active"] = false; - } - if (is_feature_set(QUEUE)) { output["queue"] = m_queue.to_json(); output["queue"]["is_active"] = true; diff --git a/src/stateless/rx/trex_stateless_rx_port_mngr.h b/src/stateless/rx/trex_stateless_rx_port_mngr.h index 6efdae64..0cc60716 100644 --- a/src/stateless/rx/trex_stateless_rx_port_mngr.h +++ b/src/stateless/rx/trex_stateless_rx_port_mngr.h @@ -25,8 +25,7 @@ #include #include "common/base64.h" -#include "common/captureFile.h" - +#include "trex_stateless_pkt.h" class CPortLatencyHWBase; class CRFC2544Info; @@ -80,97 +79,12 @@ public: CRxCoreErrCntrs *m_err_cntrs; }; -/** - * describes a single saved RX packet - * - */ -class RXPacket { -public: - - RXPacket(const rte_mbuf_t *m); - - /* slow path and also RVO - pass by value is ok */ - Json::Value to_json() { - Json::Value output; - output["ts"] = m_timestamp; - output["binary"] = base64_encode(m_raw, m_size); - return output; - } - - ~RXPacket() { - if (m_raw) { - delete [] m_raw; - } - } - -private: - - uint8_t *m_raw; - uint16_t m_size; - dsec_t m_timestamp; -}; - /************************************** * RX feature queue * *************************************/ -class RXPacketBuffer { -public: - - RXPacketBuffer(uint64_t size); - ~RXPacketBuffer(); - - /** - * push a packet to the buffer - * - */ - void push(const rte_mbuf_t *m); - - /** - * generate a JSON output of the queue - * - */ - Json::Value to_json() const; - - - bool is_empty() const { - return (m_head == m_tail); - } - - bool is_full() const { - return ( next(m_head) == m_tail); - } - - /** - * return the total amount of space possible - */ - uint64_t get_capacity() const { - /* one slot is used for diff between full/empty */ - return (m_size - 1); - } - - /** - * returns how many elements are in the queue - */ - uint64_t get_element_count() const; - -private: - int next(int v) const { - return ( (v + 1) % m_size ); - } - - /* pop in case of full queue - internal usage */ - RXPacket * pop(); - - int m_head; - int m_tail; - int m_size; - RXPacket **m_buffer; -}; - - class RXQueue { public: RXQueue() { @@ -191,7 +105,7 @@ public: * fetch the current buffer * return NULL if no packets */ - const RXPacketBuffer * fetch(); + const TrexPktBuffer * fetch(); /** * stop RX queue @@ -204,42 +118,7 @@ public: Json::Value to_json() const; private: - RXPacketBuffer *m_pkt_buffer; -}; - -/************************************** - * RX feature PCAP recorder - * - *************************************/ - -class RXPacketRecorder { -public: - RXPacketRecorder(); - - ~RXPacketRecorder() { - stop(); - } - - void start(const std::string &pcap, uint64_t limit); - void stop(); - void handle_pkt(const rte_mbuf_t *m); - - /** - * flush any cached packets to disk - * - */ - void flush_to_disk(); - - Json::Value to_json() const; - -private: - CFileWriterBase *m_writer; - std::string m_pcap_filename; - CCapPktRaw m_pkt; - dsec_t m_epoch; - uint64_t m_limit; - uint64_t m_count; - bool m_pending_flush; + TrexPktBuffer *m_pkt_buffer; }; @@ -311,7 +190,6 @@ public: enum feature_t { NO_FEATURES = 0x0, LATENCY = 0x1, - RECORDER = 0x2, QUEUE = 0x4, SERVER = 0x8, GRAT_ARP = 0x10, @@ -354,17 +232,6 @@ public: unset_feature(LATENCY); } - /* recorder */ - void start_recorder(const std::string &pcap, uint64_t limit_pkts) { - m_recorder.start(pcap, limit_pkts); - set_feature(RECORDER); - } - - void stop_recorder() { - m_recorder.stop(); - unset_feature(RECORDER); - } - /* queue */ void start_queue(uint32_t size) { m_queue.start(size); @@ -376,7 +243,7 @@ public: unset_feature(QUEUE); } - const RXPacketBuffer *get_pkt_buffer() { + const TrexPktBuffer *get_pkt_buffer() { if (!is_feature_set(QUEUE)) { return nullptr; } @@ -414,13 +281,6 @@ public: */ void handle_pkt(const rte_mbuf_t *m); - /** - * maintenance - * - * @author imarom (11/24/2016) - */ - void tick(); - /** * send next grat arp (if on) * @@ -482,7 +342,6 @@ private: uint32_t m_features; uint8_t m_port_id; RXLatency m_latency; - RXPacketRecorder m_recorder; RXQueue m_queue; RXServer m_server; RXGratARP m_grat_arp; -- cgit 1.2.3-korg