Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve eBPF capture backend for Cgroup V1 #127

Merged
merged 1 commit into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 95 additions & 71 deletions bpf/packet_sniffer_v1.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
#include "include/common.h"
#include "include/cgroups.h"

#define PF_INET 2
#define PF_INET6 10
#define IPPROTO_ICMPV6 58
#define PF_INET 2
#define PF_INET6 10
#define IPPROTO_ICMPV6 58

typedef union iphdrs_t
{
Expand Down Expand Up @@ -72,44 +72,45 @@ struct

static __always_inline bool is_family_supported(struct socket *sock)
{
struct sock *sk = (void *) BPF_CORE_READ(sock, sk);
struct sock_common *common = (void *) sk;
struct sock *sk = (void *)BPF_CORE_READ(sock, sk);
struct sock_common *common = (void *)sk;
u8 family = BPF_CORE_READ(common, skc_family);

switch (family) {
case PF_INET:
case PF_INET6:
break;
// case PF_UNSPEC:
// case PF_LOCAL: // PF_UNIX or PF_FILE
// case PF_NETLINK:
// case PF_VSOCK:
// case PF_XDP:
// case PF_BRIDGE:
// case PF_PACKET:
// case PF_MPLS:
// case PF_BLUETOOTH:
// case PF_IB:
// ...
default:
return 0; // not supported
switch (family)
{
case PF_INET:
case PF_INET6:
break;
// case PF_UNSPEC:
// case PF_LOCAL: // PF_UNIX or PF_FILE
// case PF_NETLINK:
// case PF_VSOCK:
// case PF_XDP:
// case PF_BRIDGE:
// case PF_PACKET:
// case PF_MPLS:
// case PF_BLUETOOTH:
// case PF_IB:
// ...
default:
return 0; // not supported
}

return 1; // supported
}


struct sock___old {
struct sock_common __sk_common;
unsigned int __sk_flags_offset[0];
unsigned int sk_padding : 1,
sk_kern_sock : 1,
sk_no_check_tx : 1,
sk_no_check_rx : 1,
sk_userlocks : 4,
sk_protocol : 8,
sk_type : 16;
u16 sk_gso_max_segs;
struct sock___old
{
struct sock_common __sk_common;
unsigned int __sk_flags_offset[0];
unsigned int sk_padding : 1,
sk_kern_sock : 1,
sk_no_check_tx : 1,
sk_no_check_rx : 1,
sk_userlocks : 4,
sk_protocol : 8,
sk_type : 16;
u16 sk_gso_max_segs;
};

static __always_inline u16 get_sock_protocol(struct sock *sock)
Expand All @@ -118,10 +119,13 @@ static __always_inline u16 get_sock_protocol(struct sock *sock)

// commit bf9765145b85 ("sock: Make sk_protocol a 16-bit value")
struct sock___old *check = NULL;
if (bpf_core_field_exists(check->__sk_flags_offset)) {
check = (struct sock___old *) sock;
bpf_core_read(&protocol, 1, (void *) (&check->sk_gso_max_segs) - 3);
} else {
if (bpf_core_field_exists(check->__sk_flags_offset))
{
check = (struct sock___old *)sock;
bpf_core_read(&protocol, 1, (void *)(&check->sk_gso_max_segs) - 3);
}
else
{
protocol = BPF_CORE_READ(sock, sk_protocol);
}

Expand All @@ -130,22 +134,23 @@ static __always_inline u16 get_sock_protocol(struct sock *sock)

static __always_inline bool is_socket_supported(struct socket *sock)
{
struct sock *sk = (void *) BPF_CORE_READ(sock, sk);
struct sock *sk = (void *)BPF_CORE_READ(sock, sk);
u16 protocol = get_sock_protocol(sk);
switch (protocol) {
// case IPPROTO_IPIP:
// case IPPROTO_DCCP:
// case IPPROTO_SCTP:
// case IPPROTO_UDPLITE:
case IPPROTO_IP:
case IPPROTO_IPV6:
case IPPROTO_TCP:
case IPPROTO_UDP:
case IPPROTO_ICMP:
case IPPROTO_ICMPV6:
break;
default:
return 0; // not supported
switch (protocol)
{
// case IPPROTO_IPIP:
// case IPPROTO_DCCP:
// case IPPROTO_SCTP:
// case IPPROTO_UDPLITE:
case IPPROTO_IP:
case IPPROTO_IPV6:
case IPPROTO_TCP:
case IPPROTO_UDP:
case IPPROTO_ICMP:
case IPPROTO_ICMPV6:
break;
default:
return 0; // not supported
}

return 1; // supported
Expand Down Expand Up @@ -276,12 +281,28 @@ int BPF_KPROBE(cgroup_bpf_run_filter_skb)
u8 proto = 0;

// Parse the packet layer 3 headers.
__u8 ip_version = 0;
switch (family)
{
case PF_INET:
if (nethdrs->iphdrs.iphdr.version != 4) // IPv4
if (nethdrs->iphdrs.iphdr.version != 4)
{
return 1;
}
ip_version = nethdrs->iphdrs.iphdr.version;
break;

case PF_INET6:
ip_version = nethdrs->iphdrs.ipv6hdr.version;
break;

default:
return 1;
}

switch (ip_version)
{
case 4:
if (nethdrs->iphdrs.iphdr.ihl > 5)
{ // re-read IP header if needed
l3_size -= bpf_core_type_size(struct iphdr);
Expand All @@ -306,12 +327,7 @@ int BPF_KPROBE(cgroup_bpf_run_filter_skb)
indexer.dst.in6_u.u6_addr32[0] = nethdrs->iphdrs.iphdr.daddr;
break;

case PF_INET6:
// TODO: dual-stack IP implementation unsupported for now
// https://en.wikipedia.org/wiki/IPv6_transition_mechanism
if (nethdrs->iphdrs.ipv6hdr.version != 6) // IPv6
return 1;

case 6:
proto = nethdrs->iphdrs.ipv6hdr.nexthdr;
switch (proto)
{
Expand Down Expand Up @@ -485,7 +501,6 @@ int BPF_KPROBE(security_sk_clone)
return 0;
}


// implementation ogriginally borrowd from tracee
static __always_inline __u64 get_packet_cgroup(struct __sk_buff *ctx, void *cgrpctxmap)
{
Expand All @@ -496,7 +511,7 @@ static __always_inline __u64 get_packet_cgroup(struct __sk_buff *ctx, void *cgrp
case PF_INET6:
break;
default:
return 1;
return cgroup_id;
}

struct bpf_sock *sk = ctx->sk;
Expand Down Expand Up @@ -537,12 +552,27 @@ static __always_inline __u64 get_packet_cgroup(struct __sk_buff *ctx, void *cgrp
indexer.ts = ctx->tstamp;

u32 ihl = 0;
__u8 ip_version = 0;
switch (family)
{
case PF_INET:
if (nethdrs->iphdrs.iphdr.version != 4)
return cgroup_id;
if (nethdrs->iphdrs.iphdr.version == 4)
{
ip_version = nethdrs->iphdrs.iphdr.version;
}
break;

case PF_INET6:
ip_version = nethdrs->iphdrs.ipv6hdr.version;
break;

default:
return cgroup_id;
}

switch (ip_version)
{
case 4:
ihl = nethdrs->iphdrs.iphdr.ihl;
if (ihl > 5)
{ // re-read IPv4 header if needed
Expand All @@ -567,12 +597,7 @@ static __always_inline __u64 get_packet_cgroup(struct __sk_buff *ctx, void *cgrp
indexer.dst.in6_u.u6_addr32[0] = nethdrs->iphdrs.iphdr.daddr;
break;

case PF_INET6:
// TODO: dual-stack IP implementation unsupported for now
// https://en.wikipedia.org/wiki/IPv6_transition_mechanism
if (nethdrs->iphdrs.ipv6hdr.version != 6)
return cgroup_id;

case 6:
switch (nethdrs->iphdrs.ipv6hdr.nexthdr)
{
case IPPROTO_TCP:
Expand Down Expand Up @@ -608,4 +633,3 @@ static __always_inline __u64 get_packet_cgroup(struct __sk_buff *ctx, void *cgrp

return cgroup_id;
}

Binary file modified pkg/bpf/tracer46_bpfel_arm64.o
Binary file not shown.
Binary file modified pkg/bpf/tracer46_bpfel_x86.o
Binary file not shown.
Binary file modified pkg/bpf/tracer_bpfel_arm64.o
Binary file not shown.
Binary file modified pkg/bpf/tracer_bpfel_x86.o
Binary file not shown.
4 changes: 4 additions & 0 deletions pkg/discoverer/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ func (e *InternalEventsDiscovererImpl) scanExistingCgroups(isCgroupsV2 bool) {
return nil
}

if !isCgroupsV2 && !strings.HasPrefix(s, "/sys/fs/cgroup/cpuset") {
return nil
}

if cgroupID, contId, ok := e.cgroupsController.AddCgroupPath(s); ok {
log.Debug().Uint64("Cgroup ID", cgroupID).Str("Container ID", contId).Msg("Initial cgroup is detected")
}
Expand Down
34 changes: 22 additions & 12 deletions pkg/poller/packets/packets_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cilium/ebpf/perf"
"github.com/go-errors/errors"

"encoding/binary"
"github.com/kubeshark/gopacket"
"github.com/kubeshark/gopacket/layers"
"github.com/kubeshark/tracer/misc/ethernet"
Expand Down Expand Up @@ -45,8 +46,9 @@ type pktBuffer struct {
}

type PacketsPoller struct {
ipv4Decoder gopacket.Decoder
ethernetDecoder gopacket.Decoder
ethhdr *layers.Ethernet
ethhdrContent []byte
mtx sync.Mutex
chunksReader *perf.Reader
rawWriter bpf.RawWriter
Expand All @@ -63,17 +65,21 @@ func NewPacketsPoller(
) (*PacketsPoller, error) {
var err error

ipv4Decoder := gopacket.DecodersByLayerName["IPv4"]
if ipv4Decoder == nil {
return nil, errors.New("Failed to get IPv4 decoder")
ethernetDecoder := gopacket.DecodersByLayerName["Ethernet"]
if ethernetDecoder == nil {
return nil, errors.New("Failed to get Ethernet decoder")
}

ethhdrContent := make([]byte, 14)
binary.BigEndian.PutUint16(ethhdrContent[12:14], uint16(layers.EthernetTypeIPv4))

poller := &PacketsPoller{
ipv4Decoder: ipv4Decoder,
ethhdr: ethernet.NewEthernetLayer(layers.EthernetTypeIPv4),
rawWriter: rawWriter,
gopacketWriter: gopacketWriter,
pktsMap: make(map[uint64]*pktBuffer),
ethernetDecoder: ethernetDecoder,
ethhdr: ethernet.NewEthernetLayer(layers.EthernetTypeIPv4),
ethhdrContent: ethhdrContent,
rawWriter: rawWriter,
gopacketWriter: gopacketWriter,
pktsMap: make(map[uint64]*pktBuffer),
}

poller.chunksReader, err = perf.NewReader(perfBuffer, os.Getpagesize()*10000)
Expand Down Expand Up @@ -148,11 +154,15 @@ func (p *PacketsPoller) handlePktChunk(chunk tracerPktChunk) error {
}

if p.gopacketWriter != nil {
pktBuf := append(p.ethhdr.Contents, pkts.buf[:pkts.len]...)
pkt := gopacket.NewPacket(pktBuf, p.ipv4Decoder, gopacket.NoCopy, ptr.CgroupID, unixpacket.PacketDirection(ptr.Direction))
pktBuf := append(p.ethhdrContent, pkts.buf[:pkts.len]...)
pkt := gopacket.NewPacket(pktBuf, p.ethernetDecoder, gopacket.NoCopy, ptr.CgroupID, unixpacket.PacketDirection(ptr.Direction))
m := pkt.Metadata()
ci := &m.CaptureInfo
ci.Timestamp = time.Unix(0, int64(ptr.Timestamp))
if ptr.Timestamp != 0 {
ci.Timestamp = time.Unix(0, int64(ptr.Timestamp))
} else {
ci.Timestamp = time.Now()
}
ci.CaptureLength = len(pktBuf)
ci.Length = len(pktBuf)
ci.CaptureBackend = gopacket.CaptureBackendEbpf
Expand Down
Loading