Skip to content

Commit

Permalink
issue: 1792164 Add MSG_ZEROCOPY processing
Browse files Browse the repository at this point in the history
These changes make workable MSG_ZEROCOPY send flow
including notification mechanizm.
It is needed to notify the process when it is safe to reuse a
previously passed buffer. It queues completion notifications
on the socket error queue.

But copy avoidance internally is not done. So all data
is copied in internal buffers as without MSG_ZEROCOPY.

Full zcopy support will be implemented later.

Signed-off-by: Igor Ivanov <[email protected]>
  • Loading branch information
igor-ivanov committed Jun 11, 2020
1 parent 687271f commit de11e9d
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 4 deletions.
6 changes: 6 additions & 0 deletions src/vma/dev/buffer_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@

inline static void free_lwip_pbuf(struct pbuf_custom *pbuf_custom)
{
mem_buf_desc_t* p_desc = (mem_buf_desc_t *)pbuf_custom;

if (p_desc->m_flags & mem_buf_desc_t::ZCOPY) {
p_desc->tx.zc.callback(p_desc);
}
pbuf_custom->pbuf.type = 0;
pbuf_custom->pbuf.flags = 0;
pbuf_custom->pbuf.ref = 0;
}
Expand Down
3 changes: 3 additions & 0 deletions src/vma/lwip/pbuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ pbuf_header(struct pbuf *p, s16_t header_size_increment)
return 1;
/* AlexV: we need to check that the header EXPANTION is legal for PBUF_REF & PBUF_ROM pbufs! */
p->payload = (u8_t *)p->payload - header_size_increment;
} else if (type == PBUF_ZEROCOPY) {
/* temporary do the same as for PBUF_RAM until zcopy support is not ready */
p->payload = (u8_t *)p->payload - header_size_increment;
} else {
/* Unknown type */
LWIP_ASSERT("bad pbuf type", 0);
Expand Down
2 changes: 1 addition & 1 deletion src/vma/lwip/tcp_out.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ tcp_write(struct tcp_pcb *pcb, const void *arg, u32_t len, u8_t apiflags)
struct iovec piov[piov_max_size];
int piov_cur_index = 0;
int piov_cur_len = 0;
pbuf_type type = PBUF_RAM;
pbuf_type type = (apiflags & TCP_WRITE_ZEROCOPY ? PBUF_ZEROCOPY : PBUF_RAM);

int byte_queued = pcb->snd_nxt - pcb->lastack;
if ( len < pcb->mss && !(apiflags & TCP_WRITE_DUMMY))
Expand Down
2 changes: 1 addition & 1 deletion src/vma/proto/dst_entry_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void dst_entry_tcp::put_buffer(mem_buf_desc_t * p_desc)

if (p_desc->lwip_pbuf.pbuf.ref == 0) {
p_desc->p_next_desc = NULL;
g_buffer_pool_tx->put_buffers_thread_safe(p_desc);
buffer_pool::free_tx_lwip_pbuf_custom(&p_desc->lwip_pbuf.pbuf);
}
}
}
93 changes: 91 additions & 2 deletions src/vma/sock/sockinfo_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,10 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg)
}
#endif

if ((__flags & MSG_ZEROCOPY) && (m_b_zc)) {
apiflags |= VMA_TX_PACKET_ZEROCOPY;
}

for (int i = 0; i < sz_iov; i++) {
si_tcp_logfunc("iov:%d base=%p len=%d", i, p_iov[i].iov_base, p_iov[i].iov_len);

Expand Down Expand Up @@ -948,6 +952,14 @@ ssize_t sockinfo_tcp::tx(vma_tx_call_attr_t &tx_arg)
m_p_socket_stats->n_tx_ready_byte_count += total_tx;
}

/* Each send call with MSG_ZEROCOPY that successfully sends
* data increments the counter.
* The counter is not incremented on failure or if called with length zero.
*/
if ((apiflags & VMA_TX_PACKET_ZEROCOPY) && (total_tx > 0)) {
atomic_fetch_and_inc(&m_zckey);
}

unlock_tcp_con();

#ifdef VMA_TIME_MEASURE
Expand Down Expand Up @@ -4540,9 +4552,11 @@ struct pbuf * sockinfo_tcp::tcp_tx_pbuf_alloc(void* p_conn, pbuf_type type)
dst_entry_tcp *p_dst = (dst_entry_tcp *)(p_si_tcp->m_p_connected_dst_entry);
mem_buf_desc_t* p_desc = NULL;

NOT_IN_USE(type);
if (likely(p_dst)) {
p_desc = p_dst->get_buffer();
if (p_desc && (type == PBUF_ZEROCOPY)) {
p_desc = p_si_tcp->tcp_tx_zc_alloc(p_desc);
}
}
return (struct pbuf *)p_desc;
}
Expand All @@ -4565,11 +4579,86 @@ void sockinfo_tcp::tcp_tx_pbuf_free(void* p_conn, struct pbuf *p_buff)

if (p_desc->lwip_pbuf.pbuf.ref == 0) {
p_desc->p_next_desc = NULL;
g_buffer_pool_tx->put_buffers_thread_safe(p_desc);
buffer_pool::free_tx_lwip_pbuf_custom(p_buff);
}
}
}

struct mem_buf_desc_t* sockinfo_tcp::tcp_tx_zc_alloc(mem_buf_desc_t* p_desc)
{
p_desc->m_flags |= mem_buf_desc_t::ZCOPY;
p_desc->tx.zc.id = atomic_read(&m_zckey);
p_desc->tx.zc.count = 1;
p_desc->tx.zc.len = p_desc->lwip_pbuf.pbuf.len;
p_desc->tx.zc.ctx = (void *)this;
p_desc->tx.zc.callback = tcp_tx_zc_callback;

return p_desc;
}

void sockinfo_tcp::tcp_tx_zc_callback(mem_buf_desc_t* p_desc)
{
uint32_t lo, hi;
uint16_t count;
uint32_t prev_lo, prev_hi;
mem_buf_desc_t* err_queue = NULL;
sockinfo_tcp* sock = NULL;

if (!p_desc || !p_desc->tx.zc.ctx) {
return;
}

sock = (sockinfo_tcp *)p_desc->tx.zc.ctx;

if (sock->m_state != SOCKINFO_OPENED) {
return;
}

count = p_desc->tx.zc.count;
lo = p_desc->tx.zc.id;
hi = lo + count - 1;
memset(&p_desc->ee, 0, sizeof(p_desc->ee));
p_desc->ee.ee_errno = 0;
p_desc->ee.ee_origin = SO_EE_ORIGIN_ZEROCOPY;
p_desc->ee.ee_data = hi;
p_desc->ee.ee_info = lo;
// p_desc->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;

/* Update last error queue element in case it has the same type */
err_queue = sock->m_error_queue.back();
if (err_queue &&
(err_queue->ee.ee_origin == p_desc->ee.ee_origin) &&
(err_queue->ee.ee_code == p_desc->ee.ee_code)) {
uint64_t sum_count = 0;

prev_hi = err_queue->ee.ee_data;
prev_lo = err_queue->ee.ee_info;
sum_count = prev_hi - prev_lo + 1ULL + count;

if (lo == prev_lo) {
err_queue->ee.ee_data = hi;
} else if ((sum_count >= (1ULL << 32)) || (lo != prev_hi + 1)) {
err_queue = NULL;
} else {
err_queue->ee.ee_data += count;
}
}

/* Add information into error queue element */
if (!err_queue) {
err_queue = p_desc->clone();
sock->m_error_queue.push_back(err_queue);
}

/* Clean up */
p_desc->m_flags &= ~mem_buf_desc_t::ZCOPY;
memset(&p_desc->tx.zc, 0, sizeof(p_desc->tx.zc));

/* Signal events on socket */
NOTIFY_ON_EVENTS(sock, EPOLLERR);
sock->do_wakeup();
}

struct tcp_seg * sockinfo_tcp::tcp_seg_alloc(void* p_conn)
{
sockinfo_tcp *p_si_tcp = (sockinfo_tcp *)(((struct tcp_pcb*)p_conn)->my_container);
Expand Down
3 changes: 3 additions & 0 deletions src/vma/sock/sockinfo_tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class sockinfo_tcp : public sockinfo, public timer_handler
static struct tcp_seg * tcp_seg_alloc(void* p_conn);
static void tcp_seg_free(void* p_conn, struct tcp_seg * seg);

struct mem_buf_desc_t* tcp_tx_zc_alloc(mem_buf_desc_t* p_desc);
static void tcp_tx_zc_callback(mem_buf_desc_t* p_desc);

bool inline is_readable(uint64_t *p_poll_sn, fd_array_t *p_fd_array = NULL);
bool inline is_writeable();
bool inline is_errorable(int *errors);
Expand Down

0 comments on commit de11e9d

Please sign in to comment.