Skip to content

Commit

Permalink
issue: 1792164 Extend VMA_INTERNAL_THREAD_ARM_CQ variable
Browse files Browse the repository at this point in the history
Flexible tunning is added to control RX and TX polling.

Signed-off-by: Igor Ivanov <[email protected]>
  • Loading branch information
igor-ivanov committed Jul 1, 2020
1 parent b0a9bb0 commit 23f0f7f
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 91 deletions.
6 changes: 5 additions & 1 deletion README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -830,11 +830,15 @@ timer expiration (once every 100ms). Application threads may be blocked till in
Default value is 0 (deferred handling)

VMA_INTERNAL_THREAD_ARM_CQ
Wakeup the internal thread for each packet that the CQ receive.
Wakeup the internal thread for activity on TX/RX CQ.
Poll and process the packet and bring it to the socket layer.
This can minimize latency in case of a busy application which is not available to
receive the packet when it arrived.
However, this might decrease performance in case of high pps rate application.
Disable Arm CQ is 0
Check RX CQ is 1
Check TX CQ is 2
Check all CQs is 3
Default value is 0 (Disabled)

VMA_WAIT_AFTER_JOIN_MSEC
Expand Down
84 changes: 47 additions & 37 deletions src/vma/dev/net_device_val.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ const char* ring_alloc_logic_attr::to_str()
return m_str;
}

net_device_val::net_device_val(struct net_device_val_desc *desc) : m_lock("net_device_val lock")
net_device_val::net_device_val(struct net_device_val_desc *desc) :
m_lock("net_device_val lock"),
m_sysvar_internal_thread_arm_cq(safe_mce_sys().internal_thread_arm_cq)
{
bool valid = false;
ib_ctx_handler* ib_ctx;
Expand Down Expand Up @@ -1133,28 +1135,32 @@ int net_device_val::global_ring_poll_and_process_element(uint64_t *p_poll_sn, vo
auto_unlocker lock(m_lock);
rings_hash_map_t::iterator ring_iter;
for (ring_iter = m_h_ring_map.begin(); ring_iter != m_h_ring_map.end(); ring_iter++) {
int ret = THE_RING->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
nd_logerr("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno);
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn);
ret_total += ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) {
int ret = THE_RING->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
nd_logerr("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno);
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn);
ret_total += ret;
}
}

ret = THE_RING->poll_and_process_element_tx(p_poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
nd_logerr("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno);
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn);
ret_total += ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) {
int ret = THE_RING->poll_and_process_element_tx(p_poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
nd_logerr("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", THE_RING, errno);
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, *p_poll_sn);
ret_total += ret;
}
}
}
return ret_total;
Expand All @@ -1166,25 +1172,29 @@ int net_device_val::global_ring_request_notification(uint64_t poll_sn)
auto_unlocker lock(m_lock);
rings_hash_map_t::iterator ring_iter;
for (ring_iter = m_h_ring_map.begin(); ring_iter != m_h_ring_map.end(); ring_iter++) {
int ret = THE_RING->request_notification(CQT_RX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
nd_logerr("Error RX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno);
return ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) {
int ret = THE_RING->request_notification(CQT_RX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
nd_logerr("Error RX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno);
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn);
ret_total += ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
nd_logfunc("ring[%p] RX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn);
ret_total += ret;

ret = THE_RING->request_notification(CQT_TX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
nd_logerr("Error TX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno);
return ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) {
int ret = THE_RING->request_notification(CQT_TX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
nd_logerr("Error TX ring[%p]->request_notification() (errno=%d %m)", THE_RING, errno);
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn);
ret_total += ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
nd_logfunc("ring[%p] TX Returned with: %d (sn=%d)", THE_RING, ret, poll_sn);
ret_total += ret;
}
return ret_total;
}
Expand Down
3 changes: 3 additions & 0 deletions src/vma/dev/net_device_val.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ class net_device_val
std::string m_name; /* container for ifname */
char m_str[BUFF_SIZE]; /* detailed information about device */
char m_base_name[IFNAMSIZ]; /* base name of device basing ifname */

/* Global environment variables section */
const int m_sysvar_internal_thread_arm_cq;
};

class net_device_val_eth : public net_device_val
Expand Down
2 changes: 1 addition & 1 deletion src/vma/event/event_handler_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ void event_handler_manager::register_command_event(int fd, command* cmd)

event_handler_manager::event_handler_manager() :
m_reg_action_q_lock("reg_action_q_lock"),
m_b_sysvar_internal_thread_arm_cq_enabled(safe_mce_sys().internal_thread_arm_cq_enabled),
m_b_sysvar_internal_thread_arm_cq_enabled(safe_mce_sys().internal_thread_arm_cq),
m_n_sysvar_vma_time_measure_num_samples(safe_mce_sys().vma_time_measure_num_samples),
m_n_sysvar_timer_resolution_msec(safe_mce_sys().timer_resolution_msec)
{
Expand Down
95 changes: 53 additions & 42 deletions src/vma/iomux/epfd_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,13 @@ int epfd_info::remove_fd_from_epoll_os(int fd)

epfd_info::epfd_info(int epfd, int size) :
lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"),
m_lock_poll_os("epfd_lock_poll_os"), m_sysvar_thread_mode(safe_mce_sys().thread_mode),
m_b_os_data_available(false)
m_lock_poll_os("epfd_lock_poll_os"),
m_b_os_data_available(false),
m_sysvar_thread_mode(safe_mce_sys().thread_mode),
m_sysvar_internal_thread_arm_cq(safe_mce_sys().internal_thread_arm_cq)
{
__log_funcall("");

int max_sys_fd = get_sys_max_fd_num();
if (m_size<=max_sys_fd)
{
Expand Down Expand Up @@ -609,30 +612,34 @@ int epfd_info::ring_poll_and_process_element(uint64_t *p_poll_sn, void* pv_fd_re
m_ring_map_lock.lock();

for (ring_map_t::iterator iter = m_ring_map.begin(); iter != m_ring_map.end(); iter++) {
int ret = iter->first->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
__log_err("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
__log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn);
ret_total += ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) {
int ret = iter->first->poll_and_process_element_rx(p_poll_sn, pv_fd_ready_array);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
__log_err("Error in RX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
__log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn);
ret_total += ret;
}
}

ret = iter->first->poll_and_process_element_tx(p_poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
__log_err("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
__log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn);
ret_total += ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) {
int ret = iter->first->poll_and_process_element_tx(p_poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0 && errno != EAGAIN) {
__log_err("Error in TX ring->poll_and_process_element() of %p (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
if (ret > 0) {
__log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, *p_poll_sn);
ret_total += ret;
}
}
}

Expand Down Expand Up @@ -660,27 +667,31 @@ int epfd_info::ring_request_notification(uint64_t poll_sn)
m_ring_map_lock.lock();

for (ring_map_t::iterator iter = m_ring_map.begin(); iter != m_ring_map.end(); iter++) {
int ret = iter->first->request_notification(CQT_RX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
__log_err("Error RX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_RX) {
int ret = iter->first->request_notification(CQT_RX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
__log_err("Error RX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
__log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, poll_sn);
ret_total += ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
__log_func("ring[%p] RX Returned with: %d (sn=%d)", iter->first, ret, poll_sn);
ret_total += ret;

ret = iter->first->request_notification(CQT_TX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
__log_err("Error TX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
if (m_sysvar_internal_thread_arm_cq & mce_sys_var::ARM_CQ_TX) {
int ret = iter->first->request_notification(CQT_TX, poll_sn);
BULLSEYE_EXCLUDE_BLOCK_START
if (ret < 0) {
__log_err("Error TX ring[%p]->request_notification() (errno=%d %m)", iter->first, errno);
m_ring_map_lock.unlock();
return ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
__log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, poll_sn);
ret_total += ret;
}
BULLSEYE_EXCLUDE_BLOCK_END
__log_func("ring[%p] TX Returned with: %d (sn=%d)", iter->first, ret, poll_sn);
ret_total += ret;
}

m_ring_map_lock.unlock();
Expand Down
5 changes: 4 additions & 1 deletion src/vma/iomux/epfd_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,16 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake
ring_map_t m_ring_map;
lock_mutex_recursive m_ring_map_lock;
lock_spin m_lock_poll_os;
const thread_mode_t m_sysvar_thread_mode;
ready_cq_fd_q_t m_ready_cq_fd_q;
epoll_stats_t m_local_stats;
epoll_stats_t *m_stats;
int m_log_invalid_events;
bool m_b_os_data_available; // true when non offloaded data is available

/* Global environment variables section */
const thread_mode_t m_sysvar_thread_mode;
const int m_sysvar_internal_thread_arm_cq;

int add_fd(int fd, epoll_event *event);
int del_fd(int fd, bool passthrough = false);
int mod_fd(int fd, epoll_event *event);
Expand Down
2 changes: 1 addition & 1 deletion src/vma/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ void print_vma_global_settings()
VLOG_PARAM_NUMBER("Delay after join (msec)", safe_mce_sys().wait_after_join_msec, MCE_DEFAULT_WAIT_AFTER_JOIN_MSEC, SYS_VAR_WAIT_AFTER_JOIN_MSEC);
VLOG_STR_PARAM_STRING("Internal Thread Affinity", safe_mce_sys().internal_thread_affinity_str, MCE_DEFAULT_INTERNAL_THREAD_AFFINITY_STR, SYS_VAR_INTERNAL_THREAD_AFFINITY, safe_mce_sys().internal_thread_affinity_str);
VLOG_STR_PARAM_STRING("Internal Thread Cpuset", safe_mce_sys().internal_thread_cpuset, MCE_DEFAULT_INTERNAL_THREAD_CPUSET, SYS_VAR_INTERNAL_THREAD_CPUSET, safe_mce_sys().internal_thread_cpuset);
VLOG_PARAM_STRING("Internal Thread Arm CQ", safe_mce_sys().internal_thread_arm_cq_enabled, MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED, SYS_VAR_INTERNAL_THREAD_ARM_CQ, safe_mce_sys().internal_thread_arm_cq_enabled ? "Enabled " : "Disabled");
VLOG_PARAM_STRING("Internal Thread Arm CQ", safe_mce_sys().internal_thread_arm_cq, MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ, SYS_VAR_INTERNAL_THREAD_ARM_CQ, safe_mce_sys().arm_cq_str((mce_sys_var::arm_cq_t)safe_mce_sys().internal_thread_arm_cq));
VLOG_PARAM_NUMSTR("Internal Thread TCP Handling", safe_mce_sys().internal_thread_tcp_timer_handling, MCE_DEFAULT_INTERNAL_THREAD_TCP_TIMER_HANDLING, SYS_VAR_INTERNAL_THREAD_TCP_TIMER_HANDLING, internal_thread_tcp_timer_handling_str(safe_mce_sys().internal_thread_tcp_timer_handling));
VLOG_PARAM_STRING("Thread mode", safe_mce_sys().thread_mode, MCE_DEFAULT_THREAD_MODE, SYS_VAR_THREAD_MODE, thread_mode_str(safe_mce_sys().thread_mode));
VLOG_PARAM_NUMSTR("Buffer batching mode", safe_mce_sys().buffer_batching_mode, MCE_DEFAULT_BUFFER_BATCHING_MODE, SYS_VAR_BUFFER_BATCHING_MODE, buffer_batching_mode_str(safe_mce_sys().buffer_batching_mode));
Expand Down
16 changes: 10 additions & 6 deletions src/vma/util/sys_vars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ void mce_sys_var::get_env_params()
progress_engine_wce_max = MCE_DEFAULT_PROGRESS_ENGINE_WCE_MAX;
cq_keep_qp_full = MCE_DEFAULT_CQ_KEEP_QP_FULL;
qp_compensation_level = MCE_DEFAULT_QP_COMPENSATION_LEVEL;
internal_thread_arm_cq_enabled = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED;
internal_thread_arm_cq = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ;

offloaded_sockets = MCE_DEFAULT_OFFLOADED_SOCKETS;
timer_resolution_msec = MCE_DEFAULT_TIMER_RESOLUTION_MSEC;
Expand Down Expand Up @@ -1161,12 +1161,16 @@ void mce_sys_var::get_env_params()
tcp_timer_resolution_msec = timer_resolution_msec;
}

if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_ARM_CQ)) != NULL)
internal_thread_arm_cq_enabled = atoi(env_ptr) ? true : false;
if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_ARM_CQ)) != NULL) {
internal_thread_arm_cq = (arm_cq_t)atoi(env_ptr);
if (internal_thread_arm_cq < 0 || internal_thread_arm_cq > mce_sys_var::ARM_CQ_ALL) {
internal_thread_arm_cq = MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ;
}
}

if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_CPUSET)) != NULL) {
snprintf(internal_thread_cpuset, FILENAME_MAX, "%s", env_ptr);
}
if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_CPUSET)) != NULL) {
snprintf(internal_thread_cpuset, FILENAME_MAX, "%s", env_ptr);
}

// handle internal thread affinity - default is CPU-0
if ((env_ptr = getenv(SYS_VAR_INTERNAL_THREAD_AFFINITY)) != NULL) {
Expand Down
23 changes: 21 additions & 2 deletions src/vma/util/sys_vars.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,25 @@ struct mce_sys_var {
HYPER_VMWARE
};

enum arm_cq_t {
ARM_CQ_OFF = 0,
ARM_CQ_RX = 0x01,
ARM_CQ_TX = 0x02,
ARM_CQ_ALL = 0x03
} ;

inline const char* arm_cq_str(arm_cq_t value)
{
switch (value) {
case ARM_CQ_OFF: return "(Disabled)";
case ARM_CQ_RX: return "(Arm RX CQ)";
case ARM_CQ_TX: return "(Arm TX CQ)";
case ARM_CQ_ALL: return "(Arm All)";
default: break;
}
return "unsupported";
}

public:
void get_env_params();

Expand Down Expand Up @@ -406,7 +425,7 @@ struct mce_sys_var {
char internal_thread_cpuset[FILENAME_MAX];
char internal_thread_affinity_str[FILENAME_MAX];
cpu_set_t internal_thread_affinity;
bool internal_thread_arm_cq_enabled;
int internal_thread_arm_cq;
internal_thread_tcp_timer_handling_t internal_thread_tcp_timer_handling;
bool handle_bf;

Expand Down Expand Up @@ -647,7 +666,7 @@ extern mce_sys_var & safe_mce_sys();
#define MCE_DEFAULT_PROGRESS_ENGINE_WCE_MAX (10000)
#define MCE_DEFAULT_CQ_KEEP_QP_FULL (true)
#define MCE_DEFAULT_QP_COMPENSATION_LEVEL (256)
#define MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ_ENABLED (false)
#define MCE_DEFAULT_INTERNAL_THREAD_ARM_CQ (mce_sys_var::ARM_CQ_OFF)
#define MCE_DEFAULT_QP_FORCE_MC_ATTACH (false)
#define MCE_DEFAULT_OFFLOADED_SOCKETS (true)
#define MCE_DEFAULT_TIMER_RESOLUTION_MSEC (10)
Expand Down

0 comments on commit 23f0f7f

Please sign in to comment.