Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor flow cleanups/20240613/v2 #11311

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 74 additions & 33 deletions src/flow-manager.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2023 Open Information Security Foundation
/* Copyright (C) 2007-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand Down Expand Up @@ -175,10 +175,10 @@ void FlowDisableFlowManagerThread(void)
* \param f flow
* \param ts timestamp
*
* \retval 0 not timed out
* \retval 1 timed out
* \retval false not timed out
* \retval true timed out
*/
static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
static bool FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
{
uint32_t flow_times_out_at = f->timeout_at;
if (emerg) {
Expand All @@ -190,10 +190,10 @@ static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const

/* do the timeout check */
if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
return 0;
return false;
}

return 1;
return true;
}

/** \internal
Expand All @@ -203,14 +203,14 @@ static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const
* \param ts timestamp
* \param counters Flow timeout counters
*
* \retval 0 not timeout
* \retval 1 timeout (or not capture bypassed)
* \retval false not timeout
* \retval true timeout (or not capture bypassed)
*/
static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
static inline bool FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
{
#ifdef CAPTURE_OFFLOAD
if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
return 1;
return true;
}

FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
Expand All @@ -233,30 +233,41 @@ static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters
}
counters->bypassed_pkts += pkts_tosrc + pkts_todst;
counters->bypassed_bytes += bytes_tosrc + bytes_todst;
return 0;
} else {
SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
if (f->livedev) {
if (FLOW_IS_IPV4(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET);
} else if (FLOW_IS_IPV6(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
}
return false;
}
SCLogDebug("No new packet, dead flow %" PRId64 "", FlowGetId(f));
if (f->livedev) {
if (FLOW_IS_IPV4(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET);
} else if (FLOW_IS_IPV6(f)) {
LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
}
counters->bypassed_count++;
return 1;
}
counters->bypassed_count++;
}
#endif /* CAPTURE_OFFLOAD */
return 1;
return true;
}

typedef struct FlowManagerTimeoutThread {
/* used to temporarily store flows that have timed out and are
* removed from the hash */
* removed from the hash to reduce locking contention */
FlowQueuePrivate aside_queue;
} FlowManagerTimeoutThread;

/**
* \internal
*
* \brief Process the temporary Aside Queue
* This means that as long as a flow f is not waiting on detection
* engine to finish dealing with it, f will be put in the recycle
* queue for further processing later on.
*
* \param td FM Timeout Thread instance
* \param counters Flow Timeout counters to be updated
*
* \retval Number of flows that were recycled
*/
static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
{
FlowQueuePrivate recycle = { NULL, NULL, 0 };
Expand All @@ -269,11 +280,11 @@ static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCount

if (f->proto == IPPROTO_TCP &&
!(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
!FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
!FlowIsBypassed(f) && FlowNeedsReassembly(f)) {
/* Send the flow to its thread */
FlowForceReassemblyForFlow(f);
FlowSendToLocalThread(f);
FLOWLOCK_UNLOCK(f);
/* flow ownership is passed to the worker thread */
/* flow ownership is already passed to the worker thread */

counters->flows_aside_needs_work++;
continue;
Expand Down Expand Up @@ -319,8 +330,7 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
* be modified when we have both the flow and hash row lock */

/* timeout logic goes here */
if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {

if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == false) {
counters->flows_notimeout++;

prev_f = f;
Expand Down Expand Up @@ -358,8 +368,17 @@ static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCT
counters->rows_maxlen = checked;
}

static void FlowManagerHashRowClearEvictedList(
FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
/**
* \internal
*
* \brief Clear evicted list from Flow Manager.
* All the evicted flows are removed from the Flow bucket and added
* to the temporary Aside Queue.
*
* \param td FM timeout thread instance
* \param f head of the evicted list
*/
static void FlowManagerHashRowClearEvictedList(FlowManagerTimeoutThread *td, Flow *f)
{
inashivb marked this conversation as resolved.
Show resolved Hide resolved
do {
FLOWLOCK_WRLOCK(f);
Expand Down Expand Up @@ -434,6 +453,7 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
SC_ATOMIC_SET(fb->next_ts, next_ts);
}
if (fb->evicted == NULL && fb->head == NULL) {
/* row is empty */
SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
}
} else {
Expand All @@ -443,7 +463,7 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
FBLOCK_UNLOCK(fb);
/* processed evicted list */
if (evicted) {
FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
FlowManagerHashRowClearEvictedList(td, evicted);
}
} else {
rows_skipped++;
Expand All @@ -467,8 +487,19 @@ static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const
}

/** \internal
*
* \brief handle timeout for a slice of hash rows
* If we wrap around we call FlowTimeoutHash twice */
* If we wrap around we call FlowTimeoutHash twice
* \param td FM timeout thread
* \param ts timeout in seconds
* \param hash_min lower bound of the row slice
* \param hash_max upper bound of the row slice
* \param counters Flow timeout counters to be passed
* \param rows number of rows for this worker unit
* \param pos position of the beginning of row slice in the hash table
*
* \retval number of successfully timed out flows
*/
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
const uint32_t rows, uint32_t *pos)
Expand Down Expand Up @@ -503,8 +534,10 @@ static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t t
* \brief move all flows out of a hash row
*
* \param f last flow in the hash row
* \param recycle_q Flow recycle queue
* \param mode emergency or not
*
* \retval cnt removed out flows
* \retval cnt number of flows removed from the hash and added to the recycle queue
*/
static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
{
Expand Down Expand Up @@ -569,6 +602,7 @@ static uint32_t FlowCleanupHash(void)
FlowWakeupFlowRecyclerThread();
}
}
DEBUG_VALIDATE_BUG_ON(local_queue.len >= 25);
FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
inashivb marked this conversation as resolved.
Show resolved Hide resolved
FlowWakeupFlowRecyclerThread();

Expand Down Expand Up @@ -709,6 +743,13 @@ static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
* a rapid increase of the busy score, which could lead to the flow manager
* suddenly scanning a much larger slice of the hash leading to a burst
* in scan/eviction work.
*
* \param rows number of rows for the work unit
* \param mp current memcap pressure value
* \param emergency emergency mode is set or not
* \param wu_sleep holds value of sleep time per worker unit
* \param wu_rows holds value of calculated rows to be processed per second
* \param rows_sec same as wu_rows, only used for counter updates
*/
static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
Expand Down
55 changes: 27 additions & 28 deletions src/flow-timeout.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2017 Open Information Security Foundation
/* Copyright (C) 2007-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand Down Expand Up @@ -66,17 +66,18 @@

/**
* \internal
* \brief Pseudo packet setup for flow forced reassembly.
* \brief Pseudo packet setup to finish a flow when needed.
*
* \param p a dummy pseudo packet from packet pool. Not all pseudo
* packets need to force reassembly, in which case we just
* set dummy ack/seq values.
* \param direction Direction of the packet. 0 indicates toserver and 1
* indicates toclient.
* \param f Pointer to the flow.
* \param ssn Pointer to the tcp session.
* \param dummy Indicates to create a dummy pseudo packet. Not all pseudo
* packets need to force reassembly, in which case we just
* set dummy ack/seq values.
* \retval pseudo packet with everything set up
*/
static inline Packet *FlowForceReassemblyPseudoPacketSetup(
static inline Packet *FlowPseudoPacketSetup(
Packet *p, int direction, Flow *f, const TcpSession *ssn)
{
const int orig_dir = direction;
Expand Down Expand Up @@ -263,7 +264,7 @@ static inline Packet *FlowForceReassemblyPseudoPacketSetup(
return NULL;
}

Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn)
Packet *FlowPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn)
{
PacketPoolWait();
Packet *p = PacketPoolGetPacket();
Expand All @@ -273,22 +274,22 @@ Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, const TcpSess

PACKET_PROFILING_START(p);

return FlowForceReassemblyPseudoPacketSetup(p, direction, f, ssn);
return FlowPseudoPacketSetup(p, direction, f, ssn);
}

/**
* \brief Check if a flow needs forced reassembly, or any other processing
*
* \param f *LOCKED* flow
*
* \retval 0 no
* \retval 1 yes
* \retval false no
* \retval true yes
*/
int FlowForceReassemblyNeedReassembly(Flow *f)
bool FlowNeedsReassembly(Flow *f)
{

if (f == NULL || f->protoctx == NULL) {
SCReturnInt(0);
return false;
}

TcpSession *ssn = (TcpSession *)f->protoctx;
Expand Down Expand Up @@ -320,17 +321,17 @@ int FlowForceReassemblyNeedReassembly(Flow *f)
/* nothing to do */
if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE &&
server == STREAM_HAS_UNPROCESSED_SEGMENTS_NONE) {
SCReturnInt(0);
return false;
}

f->ffr_ts = client;
f->ffr_tc = server;
SCReturnInt(1);
return true;
}

/**
* \internal
* \brief Forces reassembly for flow if it needs it.
* \brief Sends the flow to its respective thread's flow queue.
*
* The function requires flow to be locked beforehand.
*
Expand All @@ -339,10 +340,8 @@ int FlowForceReassemblyNeedReassembly(Flow *f)
* flag is set, choose the second thread_id (to client/source).
*
* \param f Pointer to the flow.
*
* \retval 0 This flow doesn't need any reassembly processing; 1 otherwise.
*/
void FlowForceReassemblyForFlow(Flow *f)
void FlowSendToLocalThread(Flow *f)
{
// Choose the thread_id based on whether the flow has been
// reversed.
Expand All @@ -352,7 +351,8 @@ void FlowForceReassemblyForFlow(Flow *f)

/**
* \internal
* \brief Forces reassembly for flows that need it.
* \brief Remove flows from the hash bucket as they have more work to be done in
* in the detection engine.
*
* When this function is called we're running in virtually dead engine,
* so locking the flows is not strictly required. The reasons it is still
Expand All @@ -362,10 +362,8 @@ void FlowForceReassemblyForFlow(Flow *f)
* - allow us to aggressively check using debug validation assertions
* - be robust in case of future changes
* - locking overhead is negligible when no other thread fights us
*
* \param q The queue to process flows from.
*/
static inline void FlowForceReassemblyForHash(void)
static inline void FlowRemoveHash(void)
{
for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
FlowBucket *fb = &flow_hash[idx];
Expand All @@ -392,10 +390,10 @@ static inline void FlowForceReassemblyForHash(void)

/* in case of additional work, we pull the flow out of the
* hash and xfer ownership to the injected packet(s) */
if (FlowForceReassemblyNeedReassembly(f) == 1) {
if (FlowNeedsReassembly(f)) {
RemoveFromHash(f, prev_f);
f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN;
FlowForceReassemblyForFlow(f);
FlowSendToLocalThread(f);
FLOWLOCK_UNLOCK(f);
f = next_f;
continue;
Expand All @@ -412,10 +410,11 @@ static inline void FlowForceReassemblyForHash(void)
}

/**
* \brief Force reassembly for all the flows that have unprocessed segments.
* \brief Clean up all the flows that have unprocessed segments and have
* some work to do in the detection engine.
*/
void FlowForceReassembly(void)
void FlowWorkToDoCleanup(void)
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not really happy with this name. Suggestions welcome.

/* Carry out flow reassembly for unattended flows */
FlowForceReassemblyForHash();
/* Carry out cleanup of unattended flows */
FlowRemoveHash();
}
10 changes: 5 additions & 5 deletions src/flow-timeout.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/* Copyright (C) 2007-2012 Open Information Security Foundation
/* Copyright (C) 2007-2024 Open Information Security Foundation
*
* You can copy, redistribute or modify this Program under the terms of
* the GNU General Public License version 2 as published by the Free
Expand Down Expand Up @@ -26,9 +26,9 @@

#include "stream-tcp-private.h"

void FlowForceReassemblyForFlow(Flow *f);
int FlowForceReassemblyNeedReassembly(Flow *f);
void FlowForceReassembly(void);
Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn);
void FlowSendToLocalThread(Flow *f);
bool FlowNeedsReassembly(Flow *f);
void FlowWorkToDoCleanup(void);
Packet *FlowPseudoPacketGet(int direction, Flow *f, const TcpSession *ssn);

#endif /* SURICATA_FLOW_TIMEOUT_H */
Loading
Loading