From 023d6c2f0588016b1ed6e9046fc5d27a4985728d Mon Sep 17 00:00:00 2001 From: mrproliu <741550557@qq.com> Date: Mon, 16 Dec 2024 15:36:22 +0800 Subject: [PATCH] Introduce ringbuf queue in access log module (#170) --- .github/workflows/rover.yaml | 2 +- CHANGES.md | 1 + bpf/accesslog/common/connection.h | 63 +++++---- bpf/accesslog/syscalls/transfer.h | 17 +-- bpf/include/queue.h | 61 +++++++++ bpf/include/socket_data.h | 73 ++++++----- pkg/accesslog/bpf/loader.go | 2 +- pkg/accesslog/collector/protocols/queue.go | 4 +- .../continuous/checker/bpf/network/network.go | 3 +- .../task/network/analyze/layer7/events.go | 2 +- .../layer7/protocols/http1/reader/reader.go | 12 +- pkg/profiling/task/network/bpf/bpf.go | 2 +- pkg/profiling/task/offcpu/runner.go | 2 +- pkg/tools/btf/ebpf.go | 41 +++++- pkg/tools/btf/linker.go | 18 ++- pkg/tools/btf/queue.go | 122 ++++++++++++++++++ pkg/tools/buffer/buffer.go | 6 +- 17 files changed, 324 insertions(+), 107 deletions(-) create mode 100644 bpf/include/queue.h diff --git a/.github/workflows/rover.yaml b/.github/workflows/rover.yaml index a72e3bff..f33567ec 100644 --- a/.github/workflows/rover.yaml +++ b/.github/workflows/rover.yaml @@ -342,7 +342,7 @@ jobs: if: ${{ failure() }} name: Upload Logs with: - name: logs + name: logs ${{ matrix.test.name }} path: "${{ env.SW_INFRA_E2E_LOG_DIR }}" required: diff --git a/CHANGES.md b/CHANGES.md index ec2109da..a27176d5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -16,6 +16,7 @@ Release Notes. * Support parallel parsing protocol data in the access log module. * Upgrade Go library to `1.22`, eBPF library to `0.16.0`. * Reduce missing details issue in the access log module. +* Introduce ringbuf queue to improve performance in the access log module. #### Bug Fixes * Fix the base image cannot run in the arm64. diff --git a/bpf/accesslog/common/connection.h b/bpf/accesslog/common/connection.h index 31e9f78a..cb799e88 100644 --- a/bpf/accesslog/common/connection.h +++ b/bpf/accesslog/common/connection.h @@ -21,6 +21,7 @@ #include "socket.h" #include "data_args.h" #include "socket_opts.h" +#include "queue.h" // syscall:connect struct connect_args_t { @@ -106,19 +107,7 @@ struct socket_connect_event_t { __u64 conntrack_upstream_iph; __u32 conntrack_upstream_port; }; -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} socket_connection_event_queue SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); - __type(key, __u32); - __type(value, struct socket_connect_event_t); - __uint(max_entries, 1); -} socket_connect_event_per_cpu_map SEC(".maps"); -static __inline struct socket_connect_event_t* create_socket_connect_event() { - __u32 kZero = 0; - return bpf_map_lookup_elem(&socket_connect_event_per_cpu_map, &kZero); -} +DATA_QUEUE(socket_connection_event_queue, 1024 * 1024); // active connection cached into the hashmap // if connection closed, then deleted @@ -170,9 +159,7 @@ struct socket_close_event_t { // close success __u32 success; }; -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} socket_close_event_queue SEC(".maps"); +DATA_QUEUE(socket_close_event_queue, 1024 * 1024); static __inline bool family_should_trace(const __u32 family) { return family != AF_UNKNOWN && family != AF_INET && family != AF_INET6 ? false : true; @@ -182,7 +169,8 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32 struct sockaddr* addr, const struct socket* socket, struct connect_track_remote* conntrack, __u8 role) { // send to the user-space the connection event __u64 curr_nacs = bpf_ktime_get_ns(); - struct socket_connect_event_t *event = create_socket_connect_event(); + struct socket_connect_event_t *event; + event = rover_reserve_buf(&socket_connection_event_queue, sizeof(*event)); if (event == NULL) { return; } @@ -195,12 +183,11 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32 event->end_time = curr_nacs; event->func_name = func_name; if (func_name == SOCKET_OPTS_TYPE_CONNECT) { - event->role = CONNECTION_ROLE_TYPE_CLIENT; + role = CONNECTION_ROLE_TYPE_CLIENT; } else if (func_name == SOCKET_OPTS_TYPE_ACCEPT) { - event->role = CONNECTION_ROLE_TYPE_SERVER; - } else { - event->role = role; + role = CONNECTION_ROLE_TYPE_SERVER; } + event->role = role; event->pid = tgid; event->sockfd = fd; @@ -216,6 +203,7 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32 event->success = success; __u16 port; + __u8 socket_family; event->local_port = 0; event->remote_port = 0; if (socket == NULL) { @@ -236,6 +224,7 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32 short unsigned int skc_family; BPF_CORE_READ_INTO(&skc_family, s, __sk_common.skc_family); event->socket_family = skc_family; + socket_family = skc_family; if (event->socket_family == AF_INET) { BPF_CORE_READ_INTO(&port, s, __sk_common.skc_num); @@ -254,6 +243,7 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32 } } else if (addr != NULL) { event->socket_family = _(addr->sa_family); + socket_family = event->socket_family; if (event->socket_family == AF_INET) { struct sockaddr_in *daddr = (struct sockaddr_in *)addr; bpf_probe_read(&event->remote_addr_v4, sizeof(event->remote_addr_v4), &daddr->sin_addr.s_addr); @@ -270,9 +260,10 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32 } } else { event->socket_family = AF_UNKNOWN; + socket_family = AF_UNKNOWN; } - bpf_perf_event_output(ctx, &socket_connection_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event)); + rover_submit_buf(ctx, &socket_connection_event_queue, event, sizeof(*event)); if (success == false) { return; } @@ -280,11 +271,11 @@ static __always_inline void submit_new_connection(void* ctx, bool success, __u32 // if connect success, then add the activate connection into the kernel // active connection save struct active_connection_t con = {}; - con.random_id = event->random_id; + con.random_id = random_id; con.pid = tgid; con.sockfd = fd; - con.role = event->role; - con.socket_family = event->socket_family; + con.role = role; + con.socket_family = socket_family; bpf_map_update_elem(&active_connection_map, &conid, &con, 0); } @@ -312,17 +303,21 @@ static __inline void submit_connection_when_not_exists(void *ctx, __u64 id, stru } static __inline void notify_close_connection(void* ctx, __u64 conid, struct active_connection_t* con, __u64 start_time, __u64 end_time, int ret) { - struct socket_close_event_t close_event = {}; + struct socket_close_event_t *close_event; + close_event = rover_reserve_buf(&socket_close_event_queue, sizeof(*close_event)); + if (close_event == NULL) { + return; + } - close_event.conid = conid; - close_event.random_id = con->random_id; - close_event.start_time = start_time; - close_event.end_time = end_time; - close_event.pid = con->pid; - close_event.sockfd = con->sockfd; - close_event.success = ret > 0 ? true : false; + close_event->conid = conid; + close_event->random_id = con->random_id; + close_event->start_time = start_time; + close_event->end_time = end_time; + close_event->pid = con->pid; + close_event->sockfd = con->sockfd; + close_event->success = ret > 0 ? true : false; - bpf_perf_event_output(ctx, &socket_close_event_queue, BPF_F_CURRENT_CPU, &close_event, sizeof(close_event)); + rover_submit_buf(ctx, &socket_close_event_queue, close_event, sizeof(*close_event)); } static __inline void submit_close_connection(void* ctx, __u32 tgid, __u32 fd, __u64 start_nacs, int ret) { diff --git a/bpf/accesslog/syscalls/transfer.h b/bpf/accesslog/syscalls/transfer.h index e1f1ba79..053ce015 100644 --- a/bpf/accesslog/syscalls/transfer.h +++ b/bpf/accesslog/syscalls/transfer.h @@ -19,6 +19,7 @@ #include "socket_opts.h" #include "socket_data.h" #include "socket_reader.h" +#include "queue.h" #include "protocol_analyzer.h" #include "../common/connection.h" #include "../common/data_args.h" @@ -68,15 +69,7 @@ struct socket_detail_t { __u8 ssl; }; -struct { - __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); - __type(key, __u32); - __type(value, struct socket_detail_t); - __uint(max_entries, 1); -} socket_detail_event_per_cpu_map SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} socket_detail_data_queue SEC(".maps"); +DATA_QUEUE(socket_detail_queue, 1024 * 1024); static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_data_args_t *args, ssize_t bytes_count, __u32 data_direction, const bool vecs, __u8 func_name, bool ssl) { @@ -144,8 +137,8 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_ // 1. when the SSL connection sends SSL(unencrypted) message // 2. when the not SSL connection sends plain data if (conn->ssl == ssl) { - __u32 kZero = 0; - struct socket_detail_t *detail = bpf_map_lookup_elem(&socket_detail_event_per_cpu_map, &kZero); + struct socket_detail_t *detail; + detail = rover_reserve_buf(&socket_detail_queue, sizeof(*detail)); if (detail != NULL) { detail->connection_id = conid; detail->random_id = conn->random_id; @@ -177,7 +170,7 @@ static __always_inline void process_write_data(void *ctx, __u64 id, struct sock_ detail->l2_enter_queue_count = args->l2_enter_queue_count; detail->l4_package_rcv_from_queue_time = args->total_package_receive_from_queue_time; - bpf_perf_event_output(ctx, &socket_detail_data_queue, BPF_F_CURRENT_CPU, detail, sizeof(*detail)); + rover_submit_buf(ctx, &socket_detail_queue, detail, sizeof(*detail)); } } diff --git a/bpf/include/queue.h b/bpf/include/queue.h new file mode 100644 index 00000000..c6b532c1 --- /dev/null +++ b/bpf/include/queue.h @@ -0,0 +1,61 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "api.h" + +#define DATA_QUEUE(name, size) \ + struct { \ + __uint(type, BPF_MAP_TYPE_RINGBUF); \ + __uint(max_entries, size); \ + } name SEC(".maps"); \ + const void *rover_data_queue_##name __attribute__((unused)); + +struct { + __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY); + __uint(max_entries, 1); + __uint(key_size, sizeof(__u32)); + __uint(value_size, 10240); // all events are less than 10KB +} rover_data_heap SEC(".maps"); + +static __always_inline void *rover_reserve_buf(void *map, __u64 size) { + static const int zero = 0; + + if (bpf_core_enum_value_exists(enum bpf_func_id, + BPF_FUNC_ringbuf_reserve)) + return bpf_ringbuf_reserve(map, size, 0); + + return bpf_map_lookup_elem(&rover_data_heap, &zero); +} + +static __always_inline void rover_discard_buf(void *buf) +{ + if (bpf_core_enum_value_exists(enum bpf_func_id, + BPF_FUNC_ringbuf_discard)) + bpf_ringbuf_discard(buf, 0); +} + +static __always_inline long rover_submit_buf(void *ctx, void *map, void *buf, __u64 size) { + if (bpf_core_enum_value_exists(enum bpf_func_id, + BPF_FUNC_ringbuf_submit)) { + bpf_ringbuf_submit(buf, 0); + return 0; + } + + return bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, buf, size); +} \ No newline at end of file diff --git a/bpf/include/socket_data.h b/bpf/include/socket_data.h index cae2622b..0d62ccbd 100644 --- a/bpf/include/socket_data.h +++ b/bpf/include/socket_data.h @@ -19,6 +19,7 @@ #include "socket_opts.h" #include "protocol_analyzer.h" +#include "queue.h" #define SOCKET_UPLOAD_CHUNK_LIMIT 12 @@ -43,9 +44,7 @@ struct { __type(value, struct socket_data_upload_event); __uint(max_entries, 1); } socket_data_upload_event_per_cpu_map SEC(".maps"); -struct { - __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -} socket_data_upload_event_queue SEC(".maps"); +DATA_QUEUE(socket_data_upload_queue, 1024 * 1024); struct socket_data_sequence_t { __u64 data_id; @@ -106,23 +105,40 @@ static __always_inline struct upload_data_args* generate_socket_upload_args() { return bpf_map_lookup_elem(&socket_data_upload_args_per_cpu_map, &kZero); } -static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct socket_data_upload_event *event) { - if (size <= 0) { +static __always_inline void __upload_socket_data_with_buffer(void *ctx, __u8 index, char* buf, size_t size, __u32 is_finished, __u8 have_reduce_after_chunk, struct upload_data_args *args) { + struct socket_data_upload_event *socket_data_event; + socket_data_event = rover_reserve_buf(&socket_data_upload_queue, sizeof(*socket_data_event)); + if (socket_data_event == NULL) { return; } - if (size > sizeof(event->buffer)) { - size = sizeof(event->buffer); + + if (size > sizeof(socket_data_event->buffer)) { + size = sizeof(socket_data_event->buffer); + } + if (size <= 0) { + rover_discard_buf(socket_data_event); + return; } - event->sequence = index; - event->data_len = size; - event->finished = is_finished; - event->have_reduce_after_chunk = have_reduce_after_chunk; - bpf_probe_read(&event->buffer, size, buf); - bpf_perf_event_output(ctx, &socket_data_upload_event_queue, BPF_F_CURRENT_CPU, event, sizeof(*event)); + // basic data + socket_data_event->start_time = args->start_time; + socket_data_event->end_time = args->end_time; + socket_data_event->protocol = args->connection_protocol; + socket_data_event->direction = args->data_direction; + socket_data_event->conid = args->con_id; + socket_data_event->randomid = args->random_id; + socket_data_event->total_size = args->bytes_count; + socket_data_event->data_id = args->socket_data_id; + + socket_data_event->sequence = index; + socket_data_event->data_len = size; + socket_data_event->finished = is_finished; + socket_data_event->have_reduce_after_chunk = have_reduce_after_chunk; + bpf_probe_read(&socket_data_event->buffer, size, buf); + rover_submit_buf(ctx, &socket_data_upload_queue, socket_data_event, sizeof(*socket_data_event)); } -static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t size, struct socket_data_upload_event *event, __u8 force_unfinished) { +static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t size, struct upload_data_args *args, __u8 force_unfinished) { ssize_t already_send = 0; #pragma unroll for (__u8 index = 0; index < SOCKET_UPLOAD_CHUNK_LIMIT; index++) { @@ -141,9 +157,9 @@ static __always_inline void upload_socket_data_buf(void *ctx, char* buf, ssize_t __u8 sequence = index; if (force_unfinished == 1 && need_send_in_chunk > 0) { is_finished = 0; - sequence = generate_socket_sequence(event->conid, event->data_id); + sequence = generate_socket_sequence(args->con_id, args->socket_data_id); } - __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, event); + __upload_socket_data_with_buffer(ctx, sequence, buf + already_send, need_send_in_chunk, is_finished, have_reduce_after_chunk, args); already_send += need_send_in_chunk; } @@ -170,12 +186,12 @@ if (iov_index < iovlen) { \ have_reduce_after_chunk = 1; \ } \ __u32 is_finished = (need_send_in_chunk + already_send) >= size || loop_count == (SOCKET_UPLOAD_CHUNK_LIMIT - 1) ? true : false; \ - __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, event); \ + __upload_socket_data_with_buffer(ctx, loop_count, cur_iov.iov_base + cur_iov_sended, need_send_in_chunk, is_finished, have_reduce_after_chunk, args); \ already_send += need_send_in_chunk; \ loop_count++; \ } -static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov, const size_t iovlen, ssize_t size, struct socket_data_upload_event *event) { +static __always_inline void upload_socket_data_iov(void *ctx, struct iovec* iov, const size_t iovlen, ssize_t size, struct upload_data_args *args) { ssize_t already_send = 0; ssize_t cur_iov_sended = 0; __u8 iov_index = 0; @@ -198,26 +214,9 @@ static __inline void upload_socket_data(void *ctx, struct upload_data_args *args if (args->connection_protocol == CONNECTION_PROTOCOL_UNKNOWN || args->connection_ssl != args->socket_data_ssl || args->connection_skip_data_upload == 1) { return; } - // generate event - __u32 kZero = 0; - struct socket_data_upload_event *event = bpf_map_lookup_elem(&socket_data_upload_event_per_cpu_map, &kZero); - if (event == NULL) { - return; - } - - // basic data - event->start_time = args->start_time; - event->end_time = args->end_time; - event->protocol = args->connection_protocol; - event->direction = args->data_direction; - event->conid = args->con_id; - event->randomid = args->random_id; - event->total_size = args->bytes_count; - event->data_id = args->socket_data_id; - if (args->socket_data_buf != NULL) { - upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, event, args->socket_ssl_buffer_force_unfinished); + upload_socket_data_buf(ctx, args->socket_data_buf, args->bytes_count, args, args->socket_ssl_buffer_force_unfinished); } else if (args->socket_data_iovec != NULL) { - upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, event); + upload_socket_data_iov(ctx, args->socket_data_iovec, args->socket_data_iovlen, args->bytes_count, args); } } \ No newline at end of file diff --git a/pkg/accesslog/bpf/loader.go b/pkg/accesslog/bpf/loader.go index 5a4f827b..5058913a 100644 --- a/pkg/accesslog/bpf/loader.go +++ b/pkg/accesslog/bpf/loader.go @@ -34,7 +34,7 @@ type Loader struct { func NewLoader() (*Loader, error) { objs := bpfObjects{} - if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil { + if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil { return nil, err } diff --git a/pkg/accesslog/collector/protocols/queue.go b/pkg/accesslog/collector/protocols/queue.go index 833d323f..7e07eb38 100644 --- a/pkg/accesslog/collector/protocols/queue.go +++ b/pkg/accesslog/collector/protocols/queue.go @@ -92,13 +92,13 @@ func (q *AnalyzeQueue) Start(ctx context.Context) { func(num int) btf.PartitionContext { return NewPartitionContext(q.context, num, q.supportAnalyzers(q.context)) }) - q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailDataQueue, int(q.perCPUBuffer), + q.eventQueue.RegisterReceiver(q.context.BPF.SocketDetailQueue, int(q.perCPUBuffer), q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} { return q.detailSupplier() }, func(data interface{}) string { return fmt.Sprintf("%d", data.(events.SocketDetail).GetConnectionID()) }) - q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadEventQueue, int(q.perCPUBuffer), + q.eventQueue.RegisterReceiver(q.context.BPF.SocketDataUploadQueue, int(q.perCPUBuffer), q.context.Config.ProtocolAnalyze.ParseParallels, func() interface{} { return &events.SocketDataUploadEvent{} }, func(data interface{}) string { diff --git a/pkg/profiling/continuous/checker/bpf/network/network.go b/pkg/profiling/continuous/checker/bpf/network/network.go index ba03f211..a3cb6448 100644 --- a/pkg/profiling/continuous/checker/bpf/network/network.go +++ b/pkg/profiling/continuous/checker/bpf/network/network.go @@ -134,9 +134,10 @@ func startBPFIfNeed() error { } bpf = &bpfObjects{} - if err := loadBpfObjects(bpf, btf.GetEBPFCollectionOptionsIfNeed()); err != nil { + if err := btf.LoadBPFAndAssign(loadBpf, bpf); err != nil { return err } + bpfLinker = btf.NewLinker() bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_sendmsg": bpf.TcpSendmsg}) bpfLinker.AddLink(link.Kprobe, map[string]*ebpf.Program{"tcp_recvmsg": bpf.TcpRecvmsg}) diff --git a/pkg/profiling/task/network/analyze/layer7/events.go b/pkg/profiling/task/network/analyze/layer7/events.go index dab74d19..707aaa84 100644 --- a/pkg/profiling/task/network/analyze/layer7/events.go +++ b/pkg/profiling/task/network/analyze/layer7/events.go @@ -36,7 +36,7 @@ func (l *Listener) initSocketDataQueue(parallels, queueSize int, config *profili func (l *Listener) startSocketData(ctx context.Context, bpfLoader *bpf.Loader) { // socket buffer data - l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadEventQueue, l.protocolPerCPUBuffer, 1, func() interface{} { + l.socketDataQueue.RegisterReceiver(bpfLoader.SocketDataUploadQueue, l.protocolPerCPUBuffer, 1, func() interface{} { return &analyzeBase.SocketDataUploadEvent{} }, func(data interface{}) string { return data.(*analyzeBase.SocketDataUploadEvent).GenerateConnectionID() diff --git a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go index 8cc8c854..151e563c 100644 --- a/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go +++ b/pkg/profiling/task/network/analyze/layer7/protocols/http1/reader/reader.go @@ -104,11 +104,19 @@ func (m *MessageOpt) ContentTotalSize() int { } func (m *MessageOpt) StartTime() uint64 { - return m.HeaderBuffer().FirstSocketBuffer().StartTime() + socketBuffer := m.HeaderBuffer().FirstSocketBuffer() + if socketBuffer == nil { + return 0 + } + return socketBuffer.StartTime() } func (m *MessageOpt) EndTime() uint64 { - return m.BodyBuffer().LastSocketBuffer().EndTime() + socketBuffer := m.BodyBuffer().LastSocketBuffer() + if socketBuffer == nil { + return m.StartTime() + } + return socketBuffer.EndTime() } func (m *MessageOpt) Direction() enums.SocketDataDirection { diff --git a/pkg/profiling/task/network/bpf/bpf.go b/pkg/profiling/task/network/bpf/bpf.go index 083935c5..67a875fb 100644 --- a/pkg/profiling/task/network/bpf/bpf.go +++ b/pkg/profiling/task/network/bpf/bpf.go @@ -34,7 +34,7 @@ type Loader struct { func NewLoader() (*Loader, error) { objs := bpfObjects{} - if err := loadBpfObjects(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err != nil { + if err := btf.LoadBPFAndAssign(loadBpf, &objs); err != nil { return nil, err } diff --git a/pkg/profiling/task/offcpu/runner.go b/pkg/profiling/task/offcpu/runner.go index 492d9968..8ce3ee6f 100644 --- a/pkg/profiling/task/offcpu/runner.go +++ b/pkg/profiling/task/offcpu/runner.go @@ -113,7 +113,7 @@ func (r *Runner) Run(ctx context.Context, notify base.ProfilingRunningSuccessNot if !replacedPid { return fmt.Errorf("replace the monitor pid failure") } - if err1 := spec.LoadAndAssign(&objs, btf.GetEBPFCollectionOptionsIfNeed()); err1 != nil { + if err1 := spec.LoadAndAssign(&objs, btf.GetEBPFCollectionOptionsIfNeed(spec)); err1 != nil { return err1 } r.bpf = &objs diff --git a/pkg/tools/btf/ebpf.go b/pkg/tools/btf/ebpf.go index 71e20f30..307f29f5 100644 --- a/pkg/tools/btf/ebpf.go +++ b/pkg/tools/btf/ebpf.go @@ -20,6 +20,7 @@ package btf import ( "bytes" "embed" + "errors" "fmt" "path/filepath" "sync" @@ -41,7 +42,16 @@ var ( log = logger.GetLogger("tools", "btf") ) -func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions { +func LoadBPFAndAssign(loadBPF func() (*ebpf.CollectionSpec, error), objs interface{}) error { + bpf, err := loadBPF() + if err != nil { + return err + } + + return bpf.LoadAndAssign(objs, GetEBPFCollectionOptionsIfNeed(bpf)) +} + +func GetEBPFCollectionOptionsIfNeed(bpfSpec *ebpf.CollectionSpec) *ebpf.CollectionOptions { findBTFOnce.Do(func() { readSpec, err := getKernelBTFAddress() if err != nil { @@ -52,6 +62,7 @@ func GetEBPFCollectionOptionsIfNeed() *ebpf.CollectionOptions { spec = readSpec }) + enhanceDataQueueOpts(bpfSpec) return &ebpf.CollectionOptions{Programs: ebpf.ProgramOptions{KernelTypes: spec}} } @@ -87,3 +98,31 @@ func getKernelBTFAddress() (spec *btf.Spec, err error) { func asset(file string) ([]byte, error) { return assets.ReadFile(filepath.ToSlash(file)) } + +func validateGlobalConstVoidPtrVar(t btf.Type) error { + btfVar, ok := t.(*btf.Var) + if !ok { + return errors.New("not of type btf.Var") + } + + if btfVar.Linkage != btf.GlobalVar { + return fmt.Errorf("%q is not a global variable", btfVar.Name) + } + + btfPtr, ok := btfVar.Type.(*btf.Pointer) + if !ok { + return fmt.Errorf("%q is not a pointer", btfVar.Name) + } + + btfConst, ok := btfPtr.Target.(*btf.Const) + if !ok { + return fmt.Errorf("%q is not const", btfVar.Name) + } + + _, ok = btfConst.Type.(*btf.Void) + if !ok { + return fmt.Errorf("%q is not a const void pointer", btfVar.Name) + } + + return nil +} diff --git a/pkg/tools/btf/linker.go b/pkg/tools/btf/linker.go index f663737a..867d84e9 100644 --- a/pkg/tools/btf/linker.go +++ b/pkg/tools/btf/linker.go @@ -161,7 +161,7 @@ func (m *Linker) ReadEventAsync(emap *ebpf.Map, bufReader RingBufferReader, data func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBufferReader, perCPUBuffer, parallels int, dataSupplier func() interface{}) { - rd, err := perf.NewReader(emap, perCPUBuffer) + rd, err := newQueueReader(emap, perCPUBuffer) if err != nil { m.errors = multierror.Append(m.errors, fmt.Errorf("open ring buffer error: %v", err)) return @@ -177,10 +177,10 @@ func (m *Linker) ReadEventAsyncWithBufferSize(emap *ebpf.Map, bufReader RingBuff } } -func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier func() interface{}, bufReader RingBufferReader) { +func (m *Linker) asyncReadEvent(rd queueReader, emap *ebpf.Map, dataSupplier func() interface{}, bufReader RingBufferReader) { go func() { for { - record, err := rd.Read() + sample, err := rd.Read() if err != nil { if errors.Is(err, perf.ErrClosed) { return @@ -188,23 +188,21 @@ func (m *Linker) asyncReadEvent(rd *perf.Reader, emap *ebpf.Map, dataSupplier fu log.Warnf("read from %s ringbuffer error: %v", emap.String(), err) continue } - - if record.LostSamples != 0 { - log.Warnf("perf event queue(%s) full, dropped %d samples", emap.String(), record.LostSamples) + if len(sample) == 0 { continue } data := dataSupplier() if r, ok := data.(reader.EventReader); ok { - sampleReader := reader.NewReader(record.RawSample) + sampleReader := reader.NewReader(sample) r.ReadFrom(sampleReader) if readErr := sampleReader.HasError(); readErr != nil { - log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(sample), err) continue } } else { - if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, data); err != nil { - log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(record.RawSample), err) + if err := binary.Read(bytes.NewBuffer(sample), binary.LittleEndian, data); err != nil { + log.Warnf("parsing data from %s, raw size: %d, ringbuffer error: %v", emap.String(), len(sample), err) continue } } diff --git a/pkg/tools/btf/queue.go b/pkg/tools/btf/queue.go index 813979d5..290f7170 100644 --- a/pkg/tools/btf/queue.go +++ b/pkg/tools/btf/queue.go @@ -19,12 +19,134 @@ package btf import ( "context" + "fmt" "hash/fnv" + "os" + "strings" "sync" "github.com/cilium/ebpf" + "github.com/cilium/ebpf/perf" + "github.com/cilium/ebpf/ringbuf" ) +const dataQueuePrefix = "rover_data_queue_" + +var ( + ringbufChecker sync.Once + ringbufAvailable bool +) + +func isRingbufAvailable() bool { + ringbufChecker.Do(func() { + buf, err := ebpf.NewMap(&ebpf.MapSpec{ + Type: ebpf.RingBuf, + MaxEntries: uint32(os.Getpagesize()), + }) + + buf.Close() + + ringbufAvailable = err == nil + + if ringbufAvailable { + log.Infof("detect the ring buffer is available in current system for enhancement of data queue") + } + }) + + return ringbufAvailable +} + +func enhanceDataQueueOpts(bpfSpec *ebpf.CollectionSpec) { + it := bpfSpec.Types.Iterate() + for it.Next() { + if !strings.HasPrefix(it.Type.TypeName(), dataQueuePrefix) { + continue + } + if err := validateGlobalConstVoidPtrVar(it.Type); err != nil { + panic(fmt.Errorf("invalid global const void ptr var %s: %v", it.Type.TypeName(), err)) + } + + // if the ringbuf not available, use perf event array + if !isRingbufAvailable() { + mapName := strings.TrimPrefix(it.Type.TypeName(), dataQueuePrefix) + mapSpec := bpfSpec.Maps[mapName] + mapSpec.Type = ebpf.PerfEventArray + mapSpec.KeySize = 4 + mapSpec.ValueSize = 4 + } + } +} + +type queueReader interface { + Read() ([]byte, error) + Close() error +} + +func newQueueReader(emap *ebpf.Map, perCPUBuffer int) (queueReader, error) { + switch emap.Type() { + case ebpf.RingBuf: + return newRingBufReader(emap) + case ebpf.PerfEventArray: + return newPerfQueueReader(emap, perCPUBuffer) + } + return nil, fmt.Errorf("unsupported map type: %s", emap.Type().String()) +} + +type perfQueueReader struct { + name string + reader *perf.Reader +} + +func newPerfQueueReader(emap *ebpf.Map, perCPUBuffer int) (*perfQueueReader, error) { + reader, err := perf.NewReader(emap, perCPUBuffer) + if err != nil { + return nil, err + } + return &perfQueueReader{reader: reader, name: emap.String()}, nil +} + +func (p *perfQueueReader) Read() ([]byte, error) { + read, err := p.reader.Read() + if err != nil { + return nil, err + } + + if read.LostSamples != 0 { + log.Warnf("perf event queue(%s) full, dropped %d samples", p.name, read.LostSamples) + return nil, nil + } + + return read.RawSample, nil +} + +func (p *perfQueueReader) Close() error { + return p.reader.Close() +} + +type ringBufReader struct { + reader *ringbuf.Reader +} + +func newRingBufReader(emap *ebpf.Map) (*ringBufReader, error) { + reader, err := ringbuf.NewReader(emap) + if err != nil { + return nil, err + } + return &ringBufReader{reader: reader}, nil +} + +func (r *ringBufReader) Read() ([]byte, error) { + read, err := r.reader.Read() + if err != nil { + return nil, err + } + return read.RawSample, nil +} + +func (r *ringBufReader) Close() error { + return r.reader.Close() +} + type PartitionContext interface { Start(ctx context.Context) Consume(data interface{}) diff --git a/pkg/tools/buffer/buffer.go b/pkg/tools/buffer/buffer.go index 9993f474..c16a9fd8 100644 --- a/pkg/tools/buffer/buffer.go +++ b/pkg/tools/buffer/buffer.go @@ -367,14 +367,14 @@ func (r *Buffer) DataSize() int64 { } func (r *Buffer) FirstSocketBuffer() SocketDataBuffer { - if r.dataEvents == nil || r.dataEvents.Len() == 0 { + if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 { return nil } return r.dataEvents.Front().Value.(SocketDataBuffer) } func (r *Buffer) LastSocketBuffer() SocketDataBuffer { - if r.dataEvents == nil || r.dataEvents.Len() == 0 { + if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 { return nil } return r.dataEvents.Back().Value.(SocketDataBuffer) @@ -382,7 +382,7 @@ func (r *Buffer) LastSocketBuffer() SocketDataBuffer { // DetectNotSendingLastPosition detect the buffer contains not sending data: the BPF limited socket data count func (r *Buffer) DetectNotSendingLastPosition() *Position { - if r.dataEvents.Len() == 0 { + if r == nil || r.dataEvents.Len() == 0 { return nil }