diff --git a/src/vma/dev/buffer_pool.h b/src/vma/dev/buffer_pool.h index 606f37b095..1c229a7fa6 100644 --- a/src/vma/dev/buffer_pool.h +++ b/src/vma/dev/buffer_pool.h @@ -41,6 +41,12 @@ inline static void free_lwip_pbuf(struct pbuf_custom *pbuf_custom) { + mem_buf_desc_t* p_desc = (mem_buf_desc_t *)pbuf_custom; + + if (p_desc->m_flags & mem_buf_desc_t::ZCOPY) { + p_desc->tx.zc.callback(p_desc); + } + pbuf_custom->pbuf.type = 0; pbuf_custom->pbuf.flags = 0; pbuf_custom->pbuf.ref = 0; } diff --git a/src/vma/lwip/pbuf.c b/src/vma/lwip/pbuf.c index 7a90513d1a..6c499fcb4f 100644 --- a/src/vma/lwip/pbuf.c +++ b/src/vma/lwip/pbuf.c @@ -200,6 +200,9 @@ pbuf_header(struct pbuf *p, s16_t header_size_increment) return 1; /* AlexV: we need to check that the header EXPANTION is legal for PBUF_REF & PBUF_ROM pbufs! */ p->payload = (u8_t *)p->payload - header_size_increment; + } else if (type == PBUF_ZEROCOPY) { + /* temporary do the same as for PBUF_RAM until zcopy support is not ready */ + p->payload = (u8_t *)p->payload - header_size_increment; } else { /* Unknown type */ LWIP_ASSERT("bad pbuf type", 0); diff --git a/src/vma/lwip/tcp_out.c b/src/vma/lwip/tcp_out.c index 2c6af15e0b..1c7d9b108f 100644 --- a/src/vma/lwip/tcp_out.c +++ b/src/vma/lwip/tcp_out.c @@ -440,7 +440,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags) struct iovec piov[piov_max_size]; int piov_cur_index = 0; int piov_cur_len = 0; - pbuf_type type = PBUF_RAM; + pbuf_type type = (apiflags & TCP_WRITE_ZEROCOPY ? PBUF_ZEROCOPY : PBUF_RAM); int byte_queued = pcb->snd_nxt - pcb->lastack; if ( len < pcb->mss && !(apiflags & TCP_WRITE_DUMMY)) diff --git a/src/vma/proto/dst_entry_tcp.cpp b/src/vma/proto/dst_entry_tcp.cpp index f7bed9abdd..9629e6be55 100644 --- a/src/vma/proto/dst_entry_tcp.cpp +++ b/src/vma/proto/dst_entry_tcp.cpp @@ -463,7 +463,7 @@ void dst_entry_tcp::put_buffer(mem_buf_desc_t * p_desc) if (p_desc->lwip_pbuf.pbuf.ref == 0) { p_desc->p_next_desc = NULL; - g_buffer_pool_tx->put_buffers_thread_safe(p_desc); + buffer_pool::free_tx_lwip_pbuf_custom(&p_desc->lwip_pbuf.pbuf); } } } diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index cb9931230c..e34329eac6 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -824,6 +824,10 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg) } #endif + if ((__flags & MSG_ZEROCOPY) && (m_b_zc)) { + apiflags |= VMA_TX_PACKET_ZEROCOPY; + } + for (int i = 0; i < sz_iov; i++) { si_tcp_logfunc("iov:%d base=%p len=%d", i, p_iov[i].iov_base, p_iov[i].iov_len); @@ -948,6 +952,14 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg) m_p_socket_stats->n_tx_ready_byte_count += total_tx; } + /* Each send call with MSG_ZEROCOPY that successfully sends + * data increments the counter. + * The counter is not incremented on failure or if called with length zero. + */ + if ((apiflags & VMA_TX_PACKET_ZEROCOPY) && (total_tx > 0)) { + atomic_fetch_and_inc(&m_zckey); + } + unlock_tcp_con(); #ifdef VMA_TIME_MEASURE @@ -4540,9 +4552,11 @@ struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type) dst_entry_tcp *p_dst = (dst_entry_tcp *)(p_si_tcp->m_p_connected_dst_entry); mem_buf_desc_t* p_desc = NULL; - NOT_IN_USE(type); if (likely(p_dst)) { p_desc = p_dst->get_buffer(); + if (p_desc && (type == PBUF_ZEROCOPY)) { + p_desc = p_si_tcp->tcp_tx_zc_alloc(p_desc); + } } return (struct pbuf *)p_desc; } @@ -4565,11 +4579,86 @@ void sockinfo_tcp::tcp_tx_pbuf_free(void* p_conn, struct pbuf *p_buff) if (p_desc->lwip_pbuf.pbuf.ref == 0) { p_desc->p_next_desc = NULL; - g_buffer_pool_tx->put_buffers_thread_safe(p_desc); + buffer_pool::free_tx_lwip_pbuf_custom(p_buff); } } } +struct mem_buf_desc_t* sockinfo_tcp::tcp_tx_zc_alloc(mem_buf_desc_t* p_desc) +{ + p_desc->m_flags |= mem_buf_desc_t::ZCOPY; + p_desc->tx.zc.id = atomic_read(&m_zckey); + p_desc->tx.zc.count = 1; + p_desc->tx.zc.len = p_desc->lwip_pbuf.pbuf.len; + p_desc->tx.zc.ctx = (void *)this; + p_desc->tx.zc.callback = tcp_tx_zc_callback; + + return p_desc; +} + +void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc) +{ + uint32_t lo, hi; + uint16_t count; + uint32_t prev_lo, prev_hi; + mem_buf_desc_t* err_queue = NULL; + sockinfo_tcp* sock = NULL; + + if (!p_desc || !p_desc->tx.zc.ctx) { + return; + } + + sock = (sockinfo_tcp *)p_desc->tx.zc.ctx; + + if (sock->m_state != SOCKINFO_OPENED) { + return; + } + + count = p_desc->tx.zc.count; + lo = p_desc->tx.zc.id; + hi = lo + count - 1; + memset(&p_desc->ee, 0, sizeof(p_desc->ee)); + p_desc->ee.ee_errno = 0; + p_desc->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY; + p_desc->ee.ee_data = hi; + p_desc->ee.ee_info = lo; +// p_desc->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED; + + /* Update last error queue element in case it has the same type */ + err_queue = sock->m_error_queue.back(); + if (err_queue && + (err_queue->ee.ee_origin == p_desc->ee.ee_origin) && + (err_queue->ee.ee_code == p_desc->ee.ee_code)) { + uint64_t sum_count = 0; + + prev_hi = err_queue->ee.ee_data; + prev_lo = err_queue->ee.ee_info; + sum_count = prev_hi - prev_lo + 1ULL + count; + + if (lo == prev_lo) { + err_queue->ee.ee_data = hi; + } else if ((sum_count >= (1ULL << 32)) || (lo != prev_hi + 1)) { + err_queue = NULL; + } else { + err_queue->ee.ee_data += count; + } + } + + /* Add information into error queue element */ + if (!err_queue) { + err_queue = p_desc->clone(); + sock->m_error_queue.push_back(err_queue); + } + + /* Clean up */ + p_desc->m_flags &= ~mem_buf_desc_t::ZCOPY; + memset(&p_desc->tx.zc, 0, sizeof(p_desc->tx.zc)); + + /* Signal events on socket */ + NOTIFY_ON_EVENTS(sock, EPOLLERR); + sock->do_wakeup(); +} + struct tcp_seg * sockinfo_tcp::tcp_seg_alloc(void* p_conn) { sockinfo_tcp *p_si_tcp = (sockinfo_tcp *)(((struct tcp_pcb*)p_conn)->my_container); diff --git a/src/vma/sock/sockinfo_tcp.h b/src/vma/sock/sockinfo_tcp.h index 9324546dd7..19f7e45986 100644 --- a/src/vma/sock/sockinfo_tcp.h +++ b/src/vma/sock/sockinfo_tcp.h @@ -185,6 +185,9 @@ class sockinfo_tcp : public sockinfo, public timer_handler static struct tcp_seg * tcp_seg_alloc(void* p_conn); static void tcp_seg_free(void* p_conn, struct tcp_seg * seg); + struct mem_buf_desc_t* tcp_tx_zc_alloc(mem_buf_desc_t* p_desc); + static void tcp_tx_zc_callback(mem_buf_desc_t* p_desc); + bool inline is_readable(uint64_t *p_poll_sn, fd_array_t *p_fd_array = NULL); bool inline is_writeable(); bool inline is_errorable(int *errors);