Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor flow cleanups/20240613/v1 #11302

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 72 additions & 31 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2023 Open Information Security Foundation
/* Copyright (C) 2007-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand Down Expand Up @@ -175,10 +175,10 @@ void FlowDisableFlowManagerThread(void)
* \param f flow
* \param ts timestamp
*
* \retval 0 not timed out
* \retval 1 timed out
* \retval false not timed out
* \retval true timed out
*/
static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
{
uint32_t flow_times_out_at = f->timeout_at;
if (emerg) {
Expand All @@ -190,10 +190,10 @@ static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const

/* do the timeout check */
if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
return 0;
return false;
}

return 1;
return true;
}

/** \internal
Expand All @@ -203,14 +203,14 @@ static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const
* \param ts timestamp
* \param counters Flow timeout counters
*
* \retval 0 not timeout
* \retval 1 timeout (or not capture bypassed)
* \retval false not timeout
* \retval true timeout (or not capture bypassed)
*/
static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
static inline bool FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
{
#ifdef CAPTURE_OFFLOAD
if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
return 1;
return true;
}

FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
Expand All @@ -233,30 +233,41 @@ static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters
}
counters->bypassed_pkts += pkts_tosrc + pkts_todst;
counters->bypassed_bytes += bytes_tosrc + bytes_todst;
return 0;
} else {
SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
if (f->livedev) {
if (FLOW_IS_IPV4(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET);
} else if (FLOW_IS_IPV6(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
}
return false;
}
SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
if (f->livedev) {
if (FLOW_IS_IPV4(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET);
} else if (FLOW_IS_IPV6(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
}
counters->bypassed_count++;
return 1;
}
counters->bypassed_count++;
}
#endif /* CAPTURE_OFFLOAD */
return 1;
return true;
}

typedef struct FlowManagerTimeoutThread {
/* used to temporarily store flows that have timed out and are
* removed from the hash */
* removed from the hash to reduce locking contention */
FlowQueuePrivate aside_queue;
} FlowManagerTimeoutThread;

/**
* \internal
*
* \brief Process the temporary Aside Queue
* This means that as long as a flow f is not waiting on detection
* engine to finish dealing with it, f will be put in the recycle
* queue for further processing later on.
*
* \param td FM Timeout Thread instance
* \param counters Flow Timeout counters to be updated
*
* \retval Number of flows that were recycled
*/
static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
{
FlowQueuePrivate recycle = { NULL, NULL, 0 };
Expand All @@ -273,7 +284,7 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount
/* Send the flow to its thread */
FlowForceReassemblyForFlow(f);
FLOWLOCK_UNLOCK(f);
/* flow ownership is passed to the worker thread */
/* flow ownership is already passed to the worker thread */

counters->flows_aside_needs_work++;
continue;
Expand Down Expand Up @@ -319,8 +330,7 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
* be modified when we have both the flow and hash row lock */

/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {

if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) {
counters->flows_notimeout++;

prev_f = f;
Expand Down Expand Up @@ -358,8 +368,17 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
counters->rows_maxlen = checked;
}

static void FlowManagerHashRowClearEvictedList(
FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
/**
* \internal
*
* \brief Clear evicted list from Flow Manager.
* All the evicted flows are removed from the Flow bucket and added
* to the temporary Aside Queue.
*
* \param td FM timeout thread instance
* \param f head of the evicted list
*/
static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td, Flow *f)
{
do {
FLOWLOCK_WRLOCK(f);
Expand Down Expand Up @@ -434,6 +453,7 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
SC_ATOMIC_SET(fb->next_ts, next_ts);
}
if (fb->evicted == NULL && fb->head == NULL) {
/* row is empty */
SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
}
} else {
Expand All @@ -443,7 +463,7 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
FBLOCK_UNLOCK(fb);
/* processed evicted list */
if (evicted) {
FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
FlowManagerHashRowClearEvictedList(td, evicted);
}
} else {
rows_skipped++;
Expand All @@ -467,8 +487,19 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
}

/** \internal
*
* \brief handle timeout for a slice of hash rows
* If we wrap around we call FlowTimeoutHash twice */
* If we wrap around we call FlowTimeoutHash twice
* \param td FM timeout thread
* \param ts timeout in seconds
* \param hash_min lower bound of the row slice
* \param hash_max upper bound of the row slice
* \param counters Flow timeout counters to be passed
* \param rows number of rows for this worker unit
* \param pos position of the beginning of row slice in the hash table
*
* \retval number of successfully timed out flows
*/
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
const uint32_t rows, uint32_t *pos)
Expand Down Expand Up @@ -503,8 +534,10 @@ static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t t
* \brief move all flows out of a hash row
*
* \param f last flow in the hash row
* \param recycle_q Flow recycle queue
* \param mode emergency or not
*
* \retval cnt removed out flows
* \retval cnt number of flows removed from the hash and added to the recycle queue
*/
static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
{
Expand Down Expand Up @@ -569,6 +602,7 @@ static uint32_t FlowCleanupHash(void)
FlowWakeupFlowRecyclerThread();
}
}
DEBUG_VALIDATE_BUG_ON(local_queue.len >= 25);
FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
FlowWakeupFlowRecyclerThread();

Expand Down Expand Up @@ -709,6 +743,13 @@ static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
* a rapid increase of the busy score, which could lead to the flow manager
* suddenly scanning a much larger slice of the hash leading to a burst
* in scan/eviction work.
*
* \param rows number of rows for the work unit
* \param mp current memcap pressure value
* \param emergency emergency mode is set or not
* \param wu_sleep holds value of sleep time per worker unit
* \param wu_rows holds value of calculated rows to be processed per second
* \param rows_sec same as wu_rows, only used for counter updates
*/
static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
Expand Down
6 changes: 2 additions & 4 deletions src/tmqh-packetpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,8 @@ static void PacketPoolGetReturnedPackets(PktPool *pool)
Packet *PacketPoolGetPacket(void)
{
PktPool *pool = GetThreadPacketPool();
#ifdef DEBUG_VALIDATION
BUG_ON(pool->initialized == 0);
BUG_ON(pool->destroyed == 1);
#endif /* DEBUG_VALIDATION */
DEBUG_VALIDATE_BUG_ON(pool->initialized == 0);
DEBUG_VALIDATE_BUG_ON(pool->destroyed == 1);
if (pool->head) {
/* Stack is not empty. */
Packet *p = pool->head;
Expand Down
Loading