Skip to content

Commit

Permalink
Introduce ringbuf queue in access log module (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Dec 16, 2024
1 parent 089d9c9 commit 023d6c2
Show file tree
Hide file tree
Showing 17 changed files with 324 additions and 107 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/rover.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ jobs:
if: ${{ failure() }}
name: Upload Logs
with:
name: logs
name: logs ${{ matrix.test.name }}
path: "${{ env.SW_INFRA_E2E_LOG_DIR }}"

required:
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Release Notes.
* Support parallel parsing protocol data in the access log module.
* Upgrade Go library to `1.22`, eBPF library to `0.16.0`.
* Reduce missing details issue in the access log module.
* Introduce ringbuf queue to improve performance in the access log module.

#### Bug Fixes
* Fix the base image cannot run in the arm64.
Expand Down
63 changes: 29 additions & 34 deletions bpf/accesslog/common/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "socket.h"
#include "data_args.h"
#include "socket_opts.h"
#include "queue.h"

// syscall:connect
struct connect_args_t {
Expand Down Expand Up @@ -106,19 +107,7 @@ struct socket_connect_event_t {
__u64 conntrack_upstream_iph;
__u32 conntrack_upstream_port;
};
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} socket_connection_event_queue SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct socket_connect_event_t);
__uint(max_entries, 1);
} socket_connect_event_per_cpu_map SEC(".maps");
static __inline struct socket_connect_event_t* create_socket_connect_event() {
__u32 kZero = 0;
return bpf_map_lookup_elem(&socket_connect_event_per_cpu_map, &kZero);
}
DATA_QUEUE(socket_connection_event_queue, 1024 * 1024);

// active connection cached into the hashmap
// if connection closed, then deleted
Expand Down Expand Up @@ -170,9 +159,7 @@ struct socket_close_event_t {
// close success
__u32 success;
};
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} socket_close_event_queue SEC(".maps");
DATA_QUEUE(socket_close_event_queue, 1024 * 1024);

static __inline bool family_should_trace(const __u32 family) {
return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true;
Expand All @@ -182,7 +169,8 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32
struct sockaddr* addr, const struct socket* socket, struct connect_track_remote* conntrack, __u8 role) {
// send to the user-space the connection event
__u64 curr_nacs = bpf_ktime_get_ns();
struct socket_connect_event_t *event = create_socket_connect_event();
struct socket_connect_event_t *event;
event = rover_reserve_buf(&socket_connection_event_queue, sizeof(*event));
if (event == NULL) {
return;
}
Expand All @@ -195,12 +183,11 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32
event->end_time = curr_nacs;
event->func_name = func_name;
if (func_name == SOCKET_OPTS_TYPE_CONNECT) {
event->role = CONNECTION_ROLE_TYPE_CLIENT;
role = CONNECTION_ROLE_TYPE_CLIENT;
} else if (func_name == SOCKET_OPTS_TYPE_ACCEPT) {
event->role = CONNECTION_ROLE_TYPE_SERVER;
} else {
event->role = role;
role = CONNECTION_ROLE_TYPE_SERVER;
}
event->role = role;
event->pid = tgid;
event->sockfd = fd;

Expand All @@ -216,6 +203,7 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32
event->success = success;

__u16 port;
__u8 socket_family;
event->local_port = 0;
event->remote_port = 0;
if (socket == NULL) {
Expand All @@ -236,6 +224,7 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32
short unsigned int skc_family;
BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family);
event->socket_family = skc_family;
socket_family = skc_family;

if (event->socket_family == AF_INET) {
BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num);
Expand All @@ -254,6 +243,7 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32
}
} else if (addr != NULL) {
event->socket_family = _(addr->sa_family);
socket_family = event->socket_family;
if (event->socket_family == AF_INET) {
struct sockaddr_in *daddr = (struct sockaddr_in *)addr;
bpf_probe_read(&event->remote_addr_v4, sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr);
Expand All @@ -270,21 +260,22 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32
}
} else {
event->socket_family = AF_UNKNOWN;
socket_family = AF_UNKNOWN;
}

bpf_perf_event_output(ctx, &socket_connection_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event));
rover_submit_buf(ctx, &socket_connection_event_queue, event, sizeof(*event));
if (success == false) {
return;
}

// if connect success, then add the activate connection into the kernel
// active connection save
struct active_connection_t con = {};
con.random_id = event->random_id;
con.random_id = random_id;
con.pid = tgid;
con.sockfd = fd;
con.role = event->role;
con.socket_family = event->socket_family;
con.role = role;
con.socket_family = socket_family;
bpf_map_update_elem(&active_connection_map, &conid, &con, 0);
}

Expand Down Expand Up @@ -312,17 +303,21 @@ static __inline void submit_connection_when_not_exists(void *ctx, __u64 id, stru
}

static __inline void notify_close_connection(void* ctx, __u64 conid, struct active_connection_t* con, __u64 start_time, __u64 end_time, int ret) {
struct socket_close_event_t close_event = {};
struct socket_close_event_t *close_event;
close_event = rover_reserve_buf(&socket_close_event_queue, sizeof(*close_event));
if (close_event == NULL) {
return;
}

close_event.conid = conid;
close_event.random_id = con->random_id;
close_event.start_time = start_time;
close_event.end_time = end_time;
close_event.pid = con->pid;
close_event.sockfd = con->sockfd;
close_event.success = ret > 0 ? true : false;
close_event->conid = conid;
close_event->random_id = con->random_id;
close_event->start_time = start_time;
close_event->end_time = end_time;
close_event->pid = con->pid;
close_event->sockfd = con->sockfd;
close_event->success = ret > 0 ? true : false;

bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU, &close_event, sizeof(close_event));
rover_submit_buf(ctx, &socket_close_event_queue, close_event, sizeof(*close_event));
}

static __inline void submit_close_connection(void* ctx, __u32 tgid, __u32 fd, __u64 start_nacs, int ret) {
Expand Down
17 changes: 5 additions & 12 deletions bpf/accesslog/syscalls/transfer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "socket_opts.h"
#include "socket_data.h"
#include "socket_reader.h"
#include "queue.h"
#include "protocol_analyzer.h"
#include "../common/connection.h"
#include "../common/data_args.h"
Expand Down Expand Up @@ -68,15 +69,7 @@ struct socket_detail_t {
__u8 ssl;
};

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__type(key, __u32);
__type(value, struct socket_detail_t);
__uint(max_entries, 1);
} socket_detail_event_per_cpu_map SEC(".maps");
struct {
__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
} socket_detail_data_queue SEC(".maps");
DATA_QUEUE(socket_detail_queue, 1024 * 1024);

static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_data_args_t *args, ssize_t bytes_count,
__u32 data_direction, const bool vecs, __u8 func_name, bool ssl) {
Expand Down Expand Up @@ -144,8 +137,8 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_
// 1. when the SSL connection sends SSL(unencrypted) message
// 2. when the not SSL connection sends plain data
if (conn->ssl == ssl) {
__u32 kZero = 0;
struct socket_detail_t *detail = bpf_map_lookup_elem(&socket_detail_event_per_cpu_map, &kZero);
struct socket_detail_t *detail;
detail = rover_reserve_buf(&socket_detail_queue, sizeof(*detail));
if (detail != NULL) {
detail->connection_id = conid;
detail->random_id = conn->random_id;
Expand Down Expand Up @@ -177,7 +170,7 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_
detail->l2_enter_queue_count = args->l2_enter_queue_count;
detail->l4_package_rcv_from_queue_time = args->total_package_receive_from_queue_time;

bpf_perf_event_output(ctx, &socket_detail_data_queue, BPF_F_CURRENT_CPU, detail, sizeof(*detail));
rover_submit_buf(ctx, &socket_detail_queue, detail, sizeof(*detail));
}
}

Expand Down
61 changes: 61 additions & 0 deletions bpf/include/queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "api.h"

#define DATA_QUEUE(name, size) \
struct { \
__uint(type, BPF_MAP_TYPE_RINGBUF); \
__uint(max_entries, size); \
} name SEC(".maps"); \
const void *rover_data_queue_##name __attribute__((unused));

struct {
__uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
__uint(max_entries, 1);
__uint(key_size, sizeof(__u32));
__uint(value_size, 10240); // all events are less than 10KB
} rover_data_heap SEC(".maps");

static __always_inline void *rover_reserve_buf(void *map, __u64 size) {
static const int zero = 0;

if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_reserve))
return bpf_ringbuf_reserve(map, size, 0);

return bpf_map_lookup_elem(&rover_data_heap, &zero);
}

static __always_inline void rover_discard_buf(void *buf)
{
if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_discard))
bpf_ringbuf_discard(buf, 0);
}

static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, __u64 size) {
if (bpf_core_enum_value_exists(enum bpf_func_id,
BPF_FUNC_ringbuf_submit)) {
bpf_ringbuf_submit(buf, 0);
return 0;
}

return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size);
}
Loading

0 comments on commit 023d6c2

Please sign in to comment.