소스 검색

Merge pull request #8 from coroot/ebpf_l7

Capturing L7 protocols at the eBPF level
Anton Petruhin 3 년 전
부모
커밋
02681c1abf

+ 2 - 0
.dockerignore

@@ -0,0 +1,2 @@
+**/.git/
+**/.idea/

+ 109 - 18
containers/container.go

@@ -3,6 +3,7 @@ package containers
 import (
 	"github.com/coroot/coroot-node-agent/cgroup"
 	"github.com/coroot/coroot-node-agent/common"
+	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/coroot/coroot-node-agent/flags"
 	"github.com/coroot/coroot-node-agent/logs"
 	"github.com/coroot/coroot-node-agent/node"
@@ -14,6 +15,7 @@ import (
 	"inet.af/netaddr"
 	"k8s.io/klog/v2"
 	"os"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -57,6 +59,17 @@ type AddrPair struct {
 	dst netaddr.IPPort
 }
 
+type ActiveConnection struct {
+	ActualDest netaddr.IPPort
+	Pid        uint32
+	Fd         uint64
+}
+
+type L7Stats struct {
+	Requests *prometheus.CounterVec
+	Latency  prometheus.Histogram
+}
+
 type Container struct {
 	cgroup   *cgroup.Cgroup
 	metadata *ContainerMetadata
@@ -73,11 +86,13 @@ type Container struct {
 
 	listens map[netaddr.IPPort]map[uint32]time.Time // listen addr -> pid -> close time
 
-	connectsSuccessful map[AddrPair]int             // dst:actual_dst -> count
-	connectsFailed     map[netaddr.IPPort]int       // dst -> count
+	connectsSuccessful map[AddrPair]int64           // dst:actual_dst -> count
+	connectsFailed     map[netaddr.IPPort]int64     // dst -> count
 	connectLastAttempt map[netaddr.IPPort]time.Time // dst -> time
-	connectionsActive  map[AddrPair]netaddr.IPPort  // src:dst -> actual_dst
-	retransmits        map[AddrPair]int             // dst:actual_dst -> count
+	connectionsActive  map[AddrPair]ActiveConnection
+	retransmits        map[AddrPair]int64 // dst:actual_dst -> count
+
+	l7Stats map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats // protocol -> dst:actual_dst -> stats
 
 	oomKills int
 
@@ -101,11 +116,12 @@ func NewContainer(cg *cgroup.Cgroup, md *ContainerMetadata) *Container {
 
 		listens: map[netaddr.IPPort]map[uint32]time.Time{},
 
-		connectsSuccessful: map[AddrPair]int{},
-		connectsFailed:     map[netaddr.IPPort]int{},
+		connectsSuccessful: map[AddrPair]int64{},
+		connectsFailed:     map[netaddr.IPPort]int64{},
 		connectLastAttempt: map[netaddr.IPPort]time.Time{},
-		connectionsActive:  map[AddrPair]netaddr.IPPort{},
-		retransmits:        map[AddrPair]int{},
+		connectionsActive:  map[AddrPair]ActiveConnection{},
+		retransmits:        map[AddrPair]int64{},
+		l7Stats:            map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats{},
 
 		mountIds: map[string]struct{}{},
 
@@ -147,6 +163,12 @@ func (c *Container) Describe(ch chan<- *prometheus.Desc) {
 	for _, m := range metricsList {
 		ch <- m
 	}
+	for _, protoStats := range c.l7Stats {
+		for _, s := range protoStats {
+			s.Requests.Describe(ch)
+			s.Latency.Describe(ch)
+		}
+	}
 }
 
 func (c *Container) Collect(ch chan<- prometheus.Metric) {
@@ -242,8 +264,8 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 	}
 
 	connections := map[AddrPair]int{}
-	for c, actualDst := range c.connectionsActive {
-		connections[AddrPair{src: c.dst, dst: actualDst}]++
+	for c, conn := range c.connectionsActive {
+		connections[AddrPair{src: c.dst, dst: conn.ActualDest}]++
 	}
 	for d, count := range connections {
 		ch <- gauge(metrics.NetConnectionsActive, float64(count), d.src.String(), d.dst.String())
@@ -271,7 +293,14 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		ch <- gauge(metrics.ApplicationType, 1, appType)
 	}
 
-	if !*flags.NoPingUpstreams {
+	for _, protoStats := range c.l7Stats {
+		for _, s := range protoStats {
+			s.Requests.Collect(ch)
+			s.Latency.Collect(ch)
+		}
+	}
+
+	if !*flags.DisablePinger {
 		for ip, rtt := range c.ping(netNs) {
 			ch <- gauge(metrics.NetLatency, rtt, ip.String())
 		}
@@ -316,7 +345,7 @@ func (c *Container) onProcessExit(pid uint32, oomKill bool) {
 	}
 }
 
-func (c *Container) onFileOpen(pid uint32, fd uint32) {
+func (c *Container) onFileOpen(pid uint32, fd uint64) {
 	mntId, logPath := resolveFd(pid, fd)
 	c.lock.Lock()
 	defer c.lock.Unlock()
@@ -349,7 +378,7 @@ func (c *Container) onListenClose(pid uint32, addr netaddr.IPPort) {
 	}
 }
 
-func (c *Container) onConnectionOpen(pid uint32, src, dst netaddr.IPPort, failed bool) {
+func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPPort, failed bool) {
 	if dst.IP().IsLoopback() {
 		netNs, err := proc.GetNetNs(pid)
 		isHostNs := err == nil && hostNetNsId == netNs.UniqueId()
@@ -376,7 +405,11 @@ func (c *Container) onConnectionOpen(pid uint32, src, dst netaddr.IPPort, failed
 	} else {
 		actualDst := ConntrackGetActualDestination(src, dst)
 		c.connectsSuccessful[AddrPair{src: dst, dst: actualDst}]++
-		c.connectionsActive[AddrPair{src: src, dst: dst}] = actualDst
+		c.connectionsActive[AddrPair{src: src, dst: dst}] = ActiveConnection{
+			ActualDest: actualDst,
+			Pid:        pid,
+			Fd:         fd,
+		}
 	}
 	c.connectLastAttempt[dst] = time.Now()
 }
@@ -391,14 +424,65 @@ func (c *Container) onConnectionClose(srcDst AddrPair) bool {
 	return true
 }
 
+func (c *Container) onL7Request(pid uint32, fd uint64, r *ebpftracer.L7Request) {
+	for dest, conn := range c.connectionsActive {
+		if conn.Pid == pid && conn.Fd == fd {
+			key := AddrPair{src: dest.dst, dst: conn.ActualDest}
+			stats := c.l7Stats[r.Protocol]
+			if stats == nil {
+				stats = map[AddrPair]*L7Stats{}
+				c.l7Stats[r.Protocol] = stats
+			}
+			s := stats[key]
+			if s == nil {
+				constLabels := map[string]string{"destination": key.src.String(), "actual_destination": key.dst.String()}
+				cOpts, ok := L7Requests[r.Protocol]
+				if !ok {
+					klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
+					return
+				}
+				hOpts, ok := L7Latency[r.Protocol]
+				if !ok {
+					klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
+					return
+				}
+				s = &L7Stats{
+					Requests: prometheus.NewCounterVec(
+						prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help, ConstLabels: constLabels},
+						[]string{"status"},
+					),
+					Latency: prometheus.NewHistogram(
+						prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
+					),
+				}
+				stats[key] = s
+			}
+			status := ""
+			switch r.Protocol {
+			case ebpftracer.L7ProtocolHTTP:
+				status = strconv.Itoa(r.Status)
+			default:
+				if r.Status == 500 {
+					status = "failed"
+				} else {
+					status = "ok"
+				}
+			}
+			s.Requests.WithLabelValues(status).Inc()
+			s.Latency.Observe(r.Duration.Seconds())
+			return
+		}
+	}
+}
+
 func (c *Container) onRetransmit(srcDst AddrPair) bool {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	actualDst, ok := c.connectionsActive[srcDst]
+	conn, ok := c.connectionsActive[srcDst]
 	if !ok {
 		return false
 	}
-	c.retransmits[AddrPair{src: srcDst.dst, dst: actualDst}]++
+	c.retransmits[AddrPair{src: srcDst.dst, dst: conn.ActualDest}]++
 	return true
 }
 
@@ -566,7 +650,7 @@ func (c *Container) ping(netNs netns.NsHandle) map[netaddr.IP]float64 {
 }
 
 func (c *Container) runLogParser(logPath string) {
-	if *flags.NoParseLogs {
+	if *flags.DisableLogParsing {
 		return
 	}
 
@@ -667,6 +751,13 @@ func (c *Container) gc(now time.Time) {
 					delete(c.retransmits, d)
 				}
 			}
+			for _, protoStats := range c.l7Stats {
+				for d := range protoStats {
+					if d.src == dst {
+						delete(protoStats, d)
+					}
+				}
+			}
 		}
 	}
 }
@@ -738,7 +829,7 @@ func (c *Container) revalidateListens(now time.Time, actualListens map[netaddr.I
 	}
 }
 
-func resolveFd(pid uint32, fd uint32) (mntId string, logPath string) {
+func resolveFd(pid uint32, fd uint64) (mntId string, logPath string) {
 	info := proc.GetFdInfo(pid, fd)
 	if info == nil {
 		return

+ 20 - 0
containers/metrics.go

@@ -1,6 +1,7 @@
 package containers
 
 import (
+	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"github.com/prometheus/client_golang/prometheus"
 	"reflect"
 )
@@ -71,6 +72,25 @@ var metrics = struct {
 	ApplicationType: metric("container_application_type", "Type of the application running in the container (e.g. memcached, postgres, mysql)", "application_type"),
 }
 
+var (
+	L7Requests = map[ebpftracer.L7Protocol]prometheus.CounterOpts{
+		ebpftracer.L7ProtocolHTTP:      {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
+		ebpftracer.L7ProtocolPostgres:  {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
+		ebpftracer.L7ProtocolRedis:     {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
+		ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
+		ebpftracer.L7ProtocolMysql:     {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
+		ebpftracer.L7ProtocolMongo:     {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
+	}
+	L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
+		ebpftracer.L7ProtocolHTTP:      {Name: "container_http_request_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
+		ebpftracer.L7ProtocolPostgres:  {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
+		ebpftracer.L7ProtocolRedis:     {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
+		ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
+		ebpftracer.L7ProtocolMysql:     {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
+		ebpftracer.L7ProtocolMongo:     {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
+	}
+)
+
 func metric(name, help string, labels ...string) *prometheus.Desc {
 	return prometheus.NewDesc(name, help, labels, nil)
 }

+ 11 - 3
containers/registry.go

@@ -5,6 +5,7 @@ import (
 	"github.com/coroot/coroot-node-agent/cgroup"
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"github.com/coroot/coroot-node-agent/flags"
 	"github.com/coroot/coroot-node-agent/proc"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/vishvananda/netns"
@@ -78,7 +79,7 @@ func NewRegistry(reg prometheus.Registerer, kernelVersion string) (*Registry, er
 	}
 
 	go cs.handleEvents(cs.events)
-	t, err := ebpftracer.NewTracer(cs.events, kernelVersion)
+	t, err := ebpftracer.NewTracer(cs.events, kernelVersion, *flags.DisableL7Tracing)
 	if err != nil {
 		close(cs.events)
 		return nil, err
@@ -177,13 +178,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 
 			case ebpftracer.EventTypeConnectionOpen:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					c.onConnectionOpen(e.Pid, e.SrcAddr, e.DstAddr, false)
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, false)
 				} else {
 					klog.Infoln("TCP connection from unknown container", e)
 				}
 			case ebpftracer.EventTypeConnectionError:
 				if c := r.getOrCreateContainer(e.Pid); c != nil {
-					c.onConnectionOpen(e.Pid, e.SrcAddr, e.DstAddr, true)
+					c.onConnectionOpen(e.Pid, e.Fd, e.SrcAddr, e.DstAddr, true)
 				} else {
 					klog.Infoln("TCP connection error from unknown container", e)
 				}
@@ -201,6 +202,13 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 						break
 					}
 				}
+			case ebpftracer.EventTypeL7Request:
+				if e.L7Request == nil {
+					continue
+				}
+				if c := r.containersByPid[e.Pid]; c != nil {
+					c.onL7Request(e.Pid, e.Fd, e.L7Request)
+				}
 			}
 		}
 	}

+ 2 - 2
ebpftracer/Dockerfile

@@ -2,8 +2,8 @@ FROM alpine:3.13
 
 RUN apk add llvm clang libbpf-dev linux-headers
 
-COPY ebpf/* /tmp/
-WORKDIR /tmp
+COPY ebpf /tmp/ebpf
+WORKDIR /tmp/ebpf
 
 RUN clang -g -O2 -target bpf -D__KERNEL=416 -c ebpf.c -o ebpf416.o && llvm-strip --strip-debug ebpf416.o
 RUN clang -g -O2 -target bpf -D__KERNEL=420 -c ebpf.c -o ebpf420.o && llvm-strip --strip-debug ebpf420.o

+ 3 - 2
ebpftracer/Makefile

@@ -1,7 +1,8 @@
 build:
 	@echo ===BUILDING===
-	docker build -t ebpftracer .
-	docker cp $(shell docker create --rm ebpftracer):/tmp/ebpf.go ./ebpf.go
+	docker rmi -f ebpftracer
+	docker build -t ebpftracer --progress plain .
+	docker run --rm --name ebpftracer ebpftracer cat /tmp/ebpf/ebpf.go > ./ebpf.go
 	@echo
 
 test: test_vm1 test_vm2 test_vm3 test_vm4

파일 크기가 너무 크기때문에 변경 상태를 표시하지 않습니다.
+ 0 - 1
ebpftracer/ebpf.go


+ 4 - 2
ebpftracer/ebpf/ebpf.c

@@ -2,6 +2,7 @@
 #include <bpf/bpf_helpers.h>
 #include <bpf/bpf_core_read.h>
 #include <bpf/bpf_tracing.h>
+#include <bpf/bpf_endian.h>
 
 #define EVENT_TYPE_PROCESS_START	1
 #define EVENT_TYPE_PROCESS_EXIT		2
@@ -17,7 +18,8 @@
 
 #include "proc.c"
 #include "file.c"
-#include "tcp_state.c"
-#include "tcp_retransmit.c"
+#include "tcp/state.c"
+#include "tcp/retransmit.c"
+#include "l7/l7.c"
 
 char _license[] SEC("license") = "GPL";

+ 1 - 1
ebpftracer/ebpf/file.c

@@ -3,7 +3,7 @@
 struct file_event {
 	__u32 type;
 	__u32 pid;
-	__u32 fd;
+	__u64 fd;
 };
 
 struct {

+ 60 - 0
ebpftracer/ebpf/l7/http.c

@@ -0,0 +1,60 @@
+
+static __always_inline
+int is_http_request(char *buf) {
+    char b[16];
+    if (bpf_probe_read_str(&b, sizeof(b), (void *)buf) < 16) {
+        return 0;
+    }
+    if (b[0] == 'G' && b[1] == 'E' && b[2] == 'T') {
+        return 1;
+    }
+    if (b[0] == 'P' && b[1] == 'O' && b[2] == 'S' && b[3] == 'T') {
+        return 1;
+    }
+    if (b[0] == 'H' && b[1] == 'E' && b[2] == 'A' && b[3] == 'D') {
+        return 1;
+    }
+    if (b[0] == 'P' && b[1] == 'U' && b[2] == 'T') {
+        return 1;
+    }
+    if (b[0] == 'D' && b[1] == 'E' && b[2] == 'L' && b[3] == 'E' && b[4] == 'T' && b[5] == 'E') {
+        return 1;
+    }
+    if (b[0] == 'C' && b[1] == 'O' && b[2] == 'N' && b[3] == 'N' && b[4] == 'E' && b[5] == 'C' && b[6] == 'T') {
+        return 1;
+    }
+    if (b[0] == 'O' && b[1] == 'P' && b[2] == 'T' && b[3] == 'I' && b[4] == 'O' && b[5] == 'N' && b[6] == 'S') {
+        return 1;
+    }
+    if (b[0] == 'P' && b[1] == 'A' && b[2] == 'T' && b[3] == 'C' && b[4] == 'H') {
+        return 1;
+    }
+    return 0;
+}
+
+static __always_inline
+__u32 parse_http_status(char *buf) {
+    char b[16];
+    if (bpf_probe_read_str(&b, sizeof(b), (void *)buf) < 16) {
+        return 0;
+    }
+    if (b[0] != 'H' || b[1] != 'T' || b[2] != 'T' || b[3] != 'P' || b[4] != '/') {
+        return 0;
+    }
+    if (b[5] < '0' || b[5] > '9') {
+        return 0;
+    }
+    if (b[6] != '.') {
+        return 0;
+    }
+    if (b[7] < '0' || b[7] > '9') {
+        return 0;
+    }
+    if (b[8] != ' ') {
+        return 0;
+    }
+    if (b[9] < '0' || b[9] > '9' || b[10] < '0' || b[10] > '9' || b[11] < '0' || b[11] > '9') {
+        return 0;
+    }
+    return (b[9]-'0')*100 + (b[10]-'0')*10 + (b[11]-'0');
+}

+ 220 - 0
ebpftracer/ebpf/l7/l7.c

@@ -0,0 +1,220 @@
+#include "http.c"
+#include "postgres.c"
+#include "redis.c"
+#include "memcached.c"
+#include "mysql.c"
+#include "mongo.c"
+
+#define PROTOCOL_UNKNOWN    0
+#define PROTOCOL_HTTP	    1
+#define PROTOCOL_POSTGRES	2
+#define PROTOCOL_REDIS	    3
+#define PROTOCOL_MEMCACHED  4
+#define PROTOCOL_MYSQL      5
+#define PROTOCOL_MONGO      6
+
+struct l7_event {
+    __u64 fd;
+    __u32 pid;
+    __u32 status;
+    __u64 duration;
+    __u8 protocol;
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} l7_events SEC(".maps");
+
+struct rw_args_t {
+    __u64 fd;
+    char* buf;
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(key_size, sizeof(__u64));
+    __uint(value_size, sizeof(struct rw_args_t));
+    __uint(max_entries, 10240);
+} active_reads SEC(".maps");
+
+struct socket_key {
+    __u64 fd;
+    __u32 pid;
+};
+
+struct l7_request {
+    __u64 ns;
+    __u8 protocol;
+    __u8 partial;
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
+    __uint(key_size, sizeof(struct socket_key));
+    __uint(value_size, sizeof(struct l7_request));
+    __uint(max_entries, 32768);
+} active_l7_requests SEC(".maps");
+
+struct trace_event_raw_sys_enter_rw__stub {
+    __u64 unused;
+    long int id;
+    __u64 fd;
+    char* buf;
+    __u64 size;
+};
+
+struct trace_event_raw_sys_exit_rw__stub {
+    __u64 unused;
+    long int id;
+    long int ret;
+};
+
+static inline __attribute__((__always_inline__))
+int trace_enter_write(__u64 fd, char *buf, __u64 size) {
+    __u32 pid = bpf_get_current_pid_tgid() >> 32;
+    struct l7_request req = {};
+    req.partial = 0;
+    if (is_http_request(buf)) {
+        req.protocol = PROTOCOL_HTTP;
+    } else if (is_postgres_query(buf, size)) {
+        req.protocol = PROTOCOL_POSTGRES;
+    } else if (is_redis_query(buf)) {
+        req.protocol = PROTOCOL_REDIS;
+    } else if (is_memcached_query(buf, size)) {
+        req.protocol = PROTOCOL_MEMCACHED;
+    } else if (is_mysql_query(buf, size)) {
+        req.protocol = PROTOCOL_MYSQL;
+    } else if (is_mongo_query(buf, size)) {
+        req.protocol = PROTOCOL_MONGO;
+    } else {
+        return 0;
+    }
+    req.ns = bpf_ktime_get_ns();
+    struct socket_key k = {};
+    k.pid = pid;
+    k.fd = fd;
+    bpf_map_update_elem(&active_l7_requests, &k, &req, BPF_ANY);
+    return 0;
+}
+
+static inline __attribute__((__always_inline__))
+int trace_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    struct rw_args_t args = {};
+    args.fd = ctx->fd;
+    args.buf = ctx->buf;
+    bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
+    return 0;
+}
+
+static inline __attribute__((__always_inline__))
+int trace_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+
+    struct rw_args_t *args = bpf_map_lookup_elem(&active_reads, &id);
+    if (!args) {
+        return 0;
+    }
+    bpf_map_delete_elem(&active_reads, &id);
+    if (ctx->ret <= 0) {
+        return 0;
+    }
+    struct socket_key k = {};
+    k.pid = id >> 32;
+    k.fd = args->fd;
+
+    struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
+    if (!req) {
+        return 0;
+    }
+    bpf_map_delete_elem(&active_l7_requests, &k);
+
+    struct l7_event e = {};
+    if (req->protocol == PROTOCOL_HTTP) {
+        e.status = parse_http_status(args->buf);
+    } else if (req->protocol == PROTOCOL_POSTGRES) {
+        e.status = parse_postgres_status(args->buf, ctx->ret);
+    } else if (req->protocol == PROTOCOL_REDIS) {
+        e.status = parse_redis_status(args->buf, ctx->ret);
+    } else if (req->protocol == PROTOCOL_MEMCACHED) {
+        e.status = parse_memcached_status(args->buf, ctx->ret);
+    } else if (req->protocol == PROTOCOL_MYSQL) {
+        e.status = parse_mysql_status(args->buf, ctx->ret);
+    } else if (req->protocol == PROTOCOL_MONGO) {
+        e.status = parse_mongo_status(args->buf, ctx->ret, req->partial);
+        if (e.status == 1) {
+            req->partial = 1;
+            bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
+            return 0;
+        }
+    }
+    if (e.status == 0) {
+        return 0;
+    }
+    e.protocol = req->protocol;
+    e.fd = k.fd;
+    e.pid = k.pid;
+    e.duration = bpf_ktime_get_ns() - req->ns;
+    bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+    return 0;
+}
+
+SEC("tracepoint/syscalls/sys_enter_writev")
+int sys_enter_writev(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    void *vec;
+    if (bpf_probe_read(&vec, sizeof(void*), (void *)ctx->buf) < 0) {
+        return 0;
+    }
+    return trace_enter_write(ctx->fd, vec, 0);
+}
+
+SEC("tracepoint/syscalls/sys_enter_write")
+int sys_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    return trace_enter_write(ctx->fd, ctx->buf, ctx->size);
+}
+
+SEC("tracepoint/syscalls/sys_enter_sendto")
+int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    return trace_enter_write(ctx->fd, ctx->buf, ctx->size);
+}
+
+SEC("tracepoint/syscalls/sys_enter_read")
+int sys_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    return trace_enter_read(ctx);
+}
+
+SEC("tracepoint/syscalls/sys_enter_readv")
+int sys_enter_readv(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    void *vec;
+    if (bpf_probe_read(&vec, sizeof(void*), (void *)ctx->buf) < 0) {
+        return 0;
+    }
+    struct rw_args_t args = {};
+    args.fd = ctx->fd;
+    args.buf = vec;
+    bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
+    return 0;
+}
+
+SEC("tracepoint/syscalls/sys_enter_recvfrom")
+int sys_enter_recvfrom(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    return trace_enter_read(ctx);
+}
+
+SEC("tracepoint/syscalls/sys_exit_read")
+int sys_exit_read(struct trace_event_raw_sys_exit_rw__stub* ctx) {
+    return trace_exit_read(ctx);
+}
+
+SEC("tracepoint/syscalls/sys_exit_readv")
+int sys_exit_readv(struct trace_event_raw_sys_exit_rw__stub* ctx) {
+    return trace_exit_read(ctx);
+}
+
+SEC("tracepoint/syscalls/sys_exit_recvfrom")
+int sys_exit_recvfrom(struct trace_event_raw_sys_exit_rw__stub* ctx) {
+    return trace_exit_read(ctx);
+}

+ 98 - 0
ebpftracer/ebpf/l7/memcached.c

@@ -0,0 +1,98 @@
+// https://github.com/memcached/memcached/blob/master/doc/protocol.txt
+static __always_inline
+int is_memcached_query(char *buf, int buf_size) {
+    if (buf_size < 1) {
+        return 0;
+    }
+    char b[7];
+    char end[2];
+    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (bpf_probe_read(&end, sizeof(end), (void *)((char *)buf+buf_size-2)) < 0) {
+        return 0;
+    }
+    if (end[0] != '\r' || end[1] != '\n') {
+        return 0;
+    }
+    if (b[0] == 's' && b[1] == 'e' && b[2] == 't' && b[3] == ' ') {
+        return 1;
+    }
+    if (b[0] == 'a' && b[1] == 'd' && b[2] == 'd' && b[3] == ' ') {
+        return 1;
+    }
+    if (b[0] == 'c' && b[1] == 'a' && b[2] == 's' && b[3] == ' ') {
+        return 1;
+    }
+    if (b[0] == 'g' && (b[1] == 'a' || b[1] == 'e') && b[2] == 't' && (b[3] == ' ' || b[3] == 's')) { // get/gets/gat/gats
+        return 1;
+    }
+    if (b[0] == 'i' && b[1] == 'n' && b[2] == 'c' && b[3] == 'r' && b[4] == ' ') {
+        return 1;
+    }
+    if (b[0] == 'd' && b[1] == 'e' && b[2] == 'c' && b[3] == 'r' && b[4] == ' ') {
+        return 1;
+    }
+    if (b[0] == 't' && b[1] == 'o' && b[2] == 'u' && b[3] == 'c' && b[4] == 'h' && b[5] == ' ') {
+        return 1;
+    }
+    if (b[0] == 'd' && b[1] == 'e' && b[2] == 'l' && b[3] == 'e' && b[4] == 't' && b[5] == 'e' && b[6] == ' ') {
+        return 1;
+    }
+    if (b[0] == 'a' && b[1] == 'p' && b[2] == 'p' && b[3] == 'e' && b[4] == 'n' && b[5] == 'd' && b[6] == ' ') {
+        return 1;
+    }
+    if (b[0] == 'p' && b[1] == 'r' && b[2] == 'e' && b[3] == 'p' && b[4] == 'e' && b[5] == 'n' && b[6] == 'd') {
+        return 1;
+    }
+    if (b[0] == 'r' && b[1] == 'e' && b[2] == 'p' && b[3] == 'l' && b[4] == 'a' && b[5] == 'c' && b[6] == 'e') {
+        return 1;
+    }
+    return 0;
+}
+
+static __always_inline
+__u32 parse_memcached_status(char *buf, int buf_size) {
+    char r[3];
+    char end[2];
+    if (bpf_probe_read(&r, sizeof(r), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (bpf_probe_read(&end, sizeof(end), (void *)((char *)buf+buf_size-2)) < 0) {
+        return 0;
+    }
+    if (end[0] != '\r' || end[1] != '\n') {
+        return 0;
+    }
+    if (r[0] == 'V' && r[1] == 'A' && r[2] == 'L') { //VALUE
+        return 200;
+    }
+    if (r[0] == 'S' && r[1] == 'T' && r[2] == 'O') { //STORED
+        return 200;
+    }
+    if (r[0] == 'D' && r[1] == 'E' && r[2] == 'L') { //DELETED
+        return 200;
+    }
+    if (r[0] == 'T' && r[1] == 'O' && r[2] == 'C') { //TOUCHED
+        return 200;
+    }
+    if (r[0] == 'N' && r[1] == 'O' && r[2] == 'T') { //NOT_STORED || NOT_FOUND
+        return 200;
+    }
+    if (r[0] == 'E' && r[1] == 'X' && r[2] == 'I') { //EXISTS
+        return 200;
+    }
+    if (r[0] == 'E' && r[1] == 'R' && r[2] == 'R') { //ERROR
+        return 500;
+    }
+    if (r[0] == 'C' && r[1] == 'L' && r[2] == 'I') { //CLIENT_ERROR
+        return 500;
+    }
+    if (r[0] == 'S' && r[1] == 'E' && r[2] == 'R') { //SERVER_ERROR
+        return 500;
+    }
+    if (r[0] >= '0' && r[0] <= '9') { // incr/decr response: <value>\r\n
+        return 200;
+    }
+    return 0;
+}

+ 55 - 0
ebpftracer/ebpf/l7/mongo.c

@@ -0,0 +1,55 @@
+// https://www.mongodb.com/docs/manual/reference/mongodb-wire-protocol/
+
+#define MONGO_OP_COMPRESSED 2012
+#define MONGO_OP_MSG        2013
+
+struct mongo_header {
+    __s32 length;
+    __s32 request_id;
+    __s32 response_to;
+    __s32 op_code;
+};
+
+static __always_inline
+int is_mongo_query(char *buf, int buf_size) {
+    if (buf_size < 1) {
+        return 0;
+    }
+    struct mongo_header h = {};
+    if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (h.response_to == 0 && (h.op_code == MONGO_OP_MSG || h.op_code == MONGO_OP_COMPRESSED)) {
+        return 1;
+    }
+    return 0;
+}
+
+static __always_inline
+__u32 parse_mongo_status(char *buf, int buf_size, __u8 partial) {
+    if (partial == 0 && buf_size == 4) { //partial read
+        return 1;
+    }
+    struct mongo_header h = {};
+    if (partial) {
+        if (bpf_probe_read(&h.response_to, sizeof(h.response_to), (void *)((char *)buf+4)) < 0) {
+            return 0;
+        }
+        if (bpf_probe_read(&h.op_code, sizeof(h.op_code), (void *)((char *)buf+8)) < 0) {
+            return 0;
+        }
+    } else {
+        if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+            return 0;
+        }
+    }
+    if (h.response_to == 0) {
+        return 0;
+    }
+    if (h.op_code == MONGO_OP_MSG || h.op_code == MONGO_OP_COMPRESSED) {
+        return 200;
+    }
+    return 0;
+}
+
+

+ 45 - 0
ebpftracer/ebpf/l7/mysql.c

@@ -0,0 +1,45 @@
+// https://dev.mysql.com/doc/dev/mysql-server/latest/PAGE_PROTOCOL.html
+#define MYSQL_COM_QUERY		    3
+#define MYSQL_COM_STMT_EXECUTE  23
+
+#define MYSQL_RESPONSE_OK    0x00
+#define MYSQL_RESPONSE_EOF   0xfe
+#define MYSQL_RESPONSE_ERROR 0xff
+
+static __always_inline
+int is_mysql_query(char *buf, int buf_size) {
+    if (buf_size < 1) {
+        return 0;
+    }
+    __u8 b[5];
+    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    int length = (int)b[0] | (int)b[1] << 8 | (int)b[2] << 16;
+    if (length+4 != buf_size || b[3] != 0) { // sequence must be 0
+        return 0;
+    }
+    if (b[4] ==  MYSQL_COM_QUERY || b[4] == MYSQL_COM_STMT_EXECUTE) {
+        return 1;
+    }
+    return 0;
+}
+
+static __always_inline
+__u32 parse_mysql_status(char *buf, int buf_size) {
+    __u8 b[5];
+    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (b[3] < 1) { // sequence must be > 0
+        return 0;
+    }
+    int length = (int)b[0] | (int)b[1] << 8 | (int)b[2] << 16;
+    if (length == 1 || b[4] == MYSQL_RESPONSE_OK || b[4] == MYSQL_RESPONSE_EOF) {
+        return 200;
+    }
+    if (b[4] == MYSQL_RESPONSE_ERROR) {
+        return 500;
+    }
+    return 0;
+}

+ 62 - 0
ebpftracer/ebpf/l7/postgres.c

@@ -0,0 +1,62 @@
+// Postgres wire protocol
+// https://www.pgcon.org/2014/schedule/attachments/330_postgres-for-the-wire.pdf
+// https://www.postgresql.org/docs/current/protocol-message-formats.html
+
+static __always_inline
+int is_postgres_query(char *buf, int buf_size) {
+    if (buf_size < 1) {
+        return 0;
+    }
+    char f_cmd;
+    int f_length;
+    if (bpf_probe_read(&f_cmd, sizeof(f_cmd), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (bpf_probe_read(&f_length, sizeof(f_length), (void *)((char *)buf+1)) < 0) {
+        return 0;
+    }
+    f_length = bpf_ntohl(f_length);
+    if (f_cmd == 'Q' && f_length+1 == buf_size) {
+        return 1;
+    }
+    char sync[5];
+    if (bpf_probe_read(&sync, sizeof(sync), (void *)((char *)buf+buf_size-5)) < 0) {
+        return 0;
+    }
+    if (sync[0] == 'S' && sync[1] == 0 && sync[2] == 0 && sync[3] == 0 && sync[4] == 4) {
+        return 1;
+    }
+    return 0;
+}
+
+static __always_inline
+__u32 parse_postgres_status(char *buf, int buf_size) {
+    char cmd;
+    int length;
+    if (bpf_probe_read(&cmd, sizeof(cmd), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (bpf_probe_read(&length, sizeof(length), (void *)((char *)buf+1)) < 0) {
+        return 0;
+    }
+    length = bpf_ntohl(length);
+
+    if (length+1 > buf_size) {
+        return 0;
+    }
+    if (cmd == '2' && length == 4 && buf_size >= 10) {
+        if (bpf_probe_read(&cmd, sizeof(cmd), (void *)((char *)buf+5)) < 0) {
+            return 0;
+        }
+        if (bpf_probe_read(&length, sizeof(length), (void *)((char *)buf+5+1)) < 0) {
+            return 0;
+        }
+    }
+    if (cmd == 'E') {
+        return 500;
+    }
+    if (cmd == 'T' || cmd == 'D' || cmd == 'C') {
+        return 200;
+    }
+    return 0;
+}

+ 44 - 0
ebpftracer/ebpf/l7/redis.c

@@ -0,0 +1,44 @@
+// Redis serialization protocol (RESP) specification
+// https://redis.io/docs/reference/protocol-spec/
+
+static __always_inline
+int is_redis_query(char *buf) {
+    char b[5];
+    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (b[0] != '*' || b[1] < '0' || b[1] > '9') {
+        return 0;
+    }
+    // *3\r\n...
+    if (b[2] == '\r' && b[3] == '\n') {
+        return 1;
+    }
+    // *12\r\n...
+    if (b[2] >= '0' && b[2] <= '9' && b[3] == '\r' && b[4] == '\n') {
+        return 1;
+    }
+    return 0;
+}
+
+static __always_inline
+__u32 parse_redis_status(char *buf, int buf_size) {
+    char type;
+    char end[2];
+    if (bpf_probe_read(&type, sizeof(type), (void *)((char *)buf)) < 0) {
+        return 0;
+    }
+    if (bpf_probe_read(&end, sizeof(end), (void *)((char *)buf+buf_size-2)) < 0) {
+        return 0;
+    }
+    if (end[0] != '\r' || end[1] != '\n') {
+        return 0;
+    }
+    if (type == '*' || type == ':' || type == '$' || type == '+') {
+        return 200;
+    }
+    if (type == '-') {
+        return 500;
+    }
+    return 0;
+}

+ 45 - 45
ebpftracer/ebpf/proc.c

@@ -2,79 +2,79 @@
 #define CLONE_THREAD 	0x00010000
 
 struct proc_event {
-	__u32 type;
-	__u32 pid;
-	__u32 reason;
+    __u32 type;
+    __u32 pid;
+    __u32 reason;
 };
 
 struct {
-	__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-	__uint(key_size, sizeof(int));
-	__uint(value_size, sizeof(int));
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
 } proc_events SEC(".maps");
 
 struct {
-	__uint(type, BPF_MAP_TYPE_HASH);
-	__uint(key_size, sizeof(__u32));
-	__uint(value_size, sizeof(__u32));
-	__uint(max_entries, 10240);
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(key_size, sizeof(__u32));
+    __uint(value_size, sizeof(__u32));
+    __uint(max_entries, 10240);
 } oom_info SEC(".maps");
 
 struct trace_event_raw_task_newtask__stub {
-	__u64 unused;
-	__u32 pid;
-	char comm[TASK_COMM_LEN];
-	long unsigned int clone_flags;
+    __u64 unused;
+    __u32 pid;
+    char comm[TASK_COMM_LEN];
+    long unsigned int clone_flags;
 };
 
 SEC("tracepoint/task/task_newtask")
 int task_newtask(struct trace_event_raw_task_newtask__stub *args)
 {
-	if (args->clone_flags & CLONE_THREAD) { // skipping threads
-		return 0;
-	}
-	struct proc_event e = {
-		.type = EVENT_TYPE_PROCESS_START,
-		.pid = args->pid,
-	};
-	bpf_perf_event_output(args, &proc_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
-	return 0;
+    if (args->clone_flags & CLONE_THREAD) { // skipping threads
+        return 0;
+    }
+    struct proc_event e = {
+        .type = EVENT_TYPE_PROCESS_START,
+        .pid = args->pid,
+    };
+    bpf_perf_event_output(args, &proc_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+    return 0;
 }
 
 struct trace_event_raw_sched_process_template__stub {
-	__u64 unused;
-	char comm[TASK_COMM_LEN];
-	__u32 pid;
+    __u64 unused;
+    char comm[TASK_COMM_LEN];
+    __u32 pid;
 };
 
 SEC("tracepoint/sched/sched_process_exit")
 int sched_process_exit(struct trace_event_raw_sched_process_template__stub *args)
 {
-	__u64 id = bpf_get_current_pid_tgid();
-	if (id >> 32 != (__u32)id) { // skipping threads
-		return 0;
-	}
-	struct proc_event e = {
-		.type = EVENT_TYPE_PROCESS_EXIT,
-		.pid = args->pid,
-	};
-	if (bpf_map_lookup_elem(&oom_info, &e.pid)) {
-		e.reason = EVENT_REASON_OOM_KILL;
-		bpf_map_delete_elem(&oom_info, &e.pid);
-	}
-	bpf_perf_event_output(args, &proc_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
-	return 0;
+    __u64 id = bpf_get_current_pid_tgid();
+    if (id >> 32 != (__u32)id) { // skipping threads
+        return 0;
+    }
+    struct proc_event e = {
+        .type = EVENT_TYPE_PROCESS_EXIT,
+        .pid = args->pid,
+    };
+    if (bpf_map_lookup_elem(&oom_info, &e.pid)) {
+        e.reason = EVENT_REASON_OOM_KILL;
+        bpf_map_delete_elem(&oom_info, &e.pid);
+    }
+    bpf_perf_event_output(args, &proc_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+    return 0;
 }
 
 struct trace_event_raw_mark_victim__stub {
-	__u64 unused;
-	int pid;
+    __u64 unused;
+    int pid;
 };
 
 SEC("tracepoint/oom/mark_victim")
 int oom_mark_victim(struct trace_event_raw_mark_victim__stub *args)
 {
-	__u32 pid = args->pid;
-	bpf_map_update_elem(&oom_info, &pid, &pid, BPF_ANY);
-	return 0;
+    __u32 pid = args->pid;
+    bpf_map_update_elem(&oom_info, &pid, &pid, BPF_ANY);
+    return 0;
 }

+ 39 - 0
ebpftracer/ebpf/tcp/retransmit.c

@@ -0,0 +1,39 @@
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} tcp_retransmit_events SEC(".maps");
+
+struct trace_event_raw_tcp_event_sk_skb__stub {
+    __u64 unused;
+    void *sbkaddr;
+    void *skaddr;
+#if __KERNEL >= 420
+    int state;
+#endif
+    __u16 sport;
+    __u16 dport;
+#if __KERNEL >= 512
+    __u16 family;
+#endif
+    __u8 saddr[4];
+    __u8 daddr[4];
+    __u8 saddr_v6[16];
+    __u8 daddr_v6[16];
+};
+
+SEC("tracepoint/tcp/tcp_retransmit_skb")
+int tcp_retransmit_skb(struct trace_event_raw_tcp_event_sk_skb__stub *args)
+{
+    struct tcp_event e = {
+        .type = EVENT_TYPE_TCP_RETRANSMIT,
+        .sport = args->sport,
+        .dport = args->dport,
+    };
+    __builtin_memcpy(&e.saddr, &args->saddr_v6, sizeof(e.saddr));
+    __builtin_memcpy(&e.daddr, &args->daddr_v6, sizeof(e.daddr));
+
+    bpf_perf_event_output(args, &tcp_retransmit_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
+
+    return 0;
+}

+ 162 - 0
ebpftracer/ebpf/tcp/state.c

@@ -0,0 +1,162 @@
+#define IPPROTO_TCP 6
+
+struct tcp_event {
+    __u64 fd;
+    __u32 type;
+    __u32 pid;
+    __u16 sport;
+    __u16 dport;
+    __u8 saddr[16];
+    __u8 daddr[16];
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} tcp_listen_events SEC(".maps");
+
+struct {
+    __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
+    __uint(key_size, sizeof(int));
+    __uint(value_size, sizeof(int));
+} tcp_connect_events SEC(".maps");
+
+struct trace_event_raw_inet_sock_set_state__stub {
+    __u64 unused;
+    void *skaddr;
+    int oldstate;
+    int newstate;
+    __u16 sport;
+    __u16 dport;
+    __u16 family;
+#if __KERNEL >= 506
+    __u16 protocol;
+#else
+    __u8 protocol;
+#endif
+    __u8 saddr[4];
+    __u8 daddr[4];
+    __u8 saddr_v6[16];
+    __u8 daddr_v6[16];
+};
+
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(key_size, sizeof(__u64));
+    __uint(value_size, sizeof(__u64));
+    __uint(max_entries, 10240);
+} fd_by_pid_tgid SEC(".maps");
+
+struct sk_info {
+    __u64 fd;
+    __u32 pid;
+};
+struct {
+    __uint(type, BPF_MAP_TYPE_HASH);
+    __uint(key_size, sizeof(void *));
+    __uint(value_size, sizeof(struct sk_info));
+    __uint(max_entries, 10240);
+} sk_info SEC(".maps");
+
+SEC("tracepoint/sock/inet_sock_set_state")
+int inet_sock_set_state(void *ctx)
+{
+    struct trace_event_raw_inet_sock_set_state__stub args = {};
+    if (bpf_probe_read(&args, sizeof(args), ctx) < 0) {
+        return 0;
+    }
+    if (args.protocol != IPPROTO_TCP) {
+        return 0;
+    }
+    __u64 id = bpf_get_current_pid_tgid();
+    __u32 pid = id >> 32;
+
+    if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_SYN_SENT) {
+        __u64 *fdp = bpf_map_lookup_elem(&fd_by_pid_tgid, &id);
+
+        if (!fdp) {
+            return 0;
+        }
+        bpf_map_delete_elem(&fd_by_pid_tgid, &id);
+        struct sk_info i = {};
+        i.pid = pid;
+        i.fd = *fdp;
+        bpf_map_update_elem(&sk_info, &args.skaddr, &i, BPF_ANY);
+        return 0;
+    }
+
+    __u64 fd = 0;
+    __u32 type = 0;
+    void *map = &tcp_connect_events;
+    if (args.oldstate == BPF_TCP_SYN_SENT) {
+        if (args.newstate == BPF_TCP_ESTABLISHED) {
+            type = EVENT_TYPE_CONNECTION_OPEN;
+        } else if (args.newstate == BPF_TCP_CLOSE) {
+            type = EVENT_TYPE_CONNECTION_ERROR;
+        } else {
+            return 0;
+        }
+        struct sk_info *i = bpf_map_lookup_elem(&sk_info, &args.skaddr);
+        if (!i) {
+            return 0;
+        }
+        pid = i->pid;
+        fd = i->fd;
+        bpf_map_delete_elem(&sk_info, &args.skaddr);
+    }
+    if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
+        pid = 0;
+        type = EVENT_TYPE_CONNECTION_CLOSE;
+    }
+    if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_LISTEN) {
+        type = EVENT_TYPE_LISTEN_OPEN;
+        map = &tcp_listen_events;
+    }
+    if (args.oldstate == BPF_TCP_LISTEN && args.newstate == BPF_TCP_CLOSE) {
+        type = EVENT_TYPE_LISTEN_CLOSE;
+        map = &tcp_listen_events;
+    }
+
+    if (type == 0) {
+        return 0;
+    }
+
+    struct tcp_event e = {};
+    e.type = type;
+    e.pid = pid;
+    e.sport = args.sport;
+    e.dport = args.dport;
+    e.fd = fd;
+    __builtin_memcpy(&e.saddr, &args.saddr_v6, sizeof(e.saddr));
+    __builtin_memcpy(&e.daddr, &args.daddr_v6, sizeof(e.saddr));
+
+    bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, &e, sizeof(e));
+
+    return 0;
+}
+
+struct trace_event_raw_sys_enter_connect__stub {
+    __u64 unused;
+    long int id;
+    __u64 fd;
+};
+
+SEC("tracepoint/syscalls/sys_enter_connect")
+int sys_enter_connect(void *ctx) {
+    struct trace_event_raw_sys_enter_connect__stub args = {};
+    if (bpf_probe_read(&args, sizeof(args), ctx) < 0) {
+        return 0;
+    }
+    __u64 id = bpf_get_current_pid_tgid();
+    bpf_map_update_elem(&fd_by_pid_tgid, &id, &args.fd, BPF_ANY);
+    return 0;
+}
+
+SEC("tracepoint/syscalls/sys_exit_connect")
+int sys_exit_connect(void *ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    bpf_map_delete_elem(&fd_by_pid_tgid, &id);
+    return 0;
+}
+

+ 0 - 39
ebpftracer/ebpf/tcp_retransmit.c

@@ -1,39 +0,0 @@
-struct {
-	__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-	__uint(key_size, sizeof(int));
-	__uint(value_size, sizeof(int));
-} tcp_retransmit_events SEC(".maps");
-
-struct trace_event_raw_tcp_event_sk_skb__stub {
-	__u64 unused;
-	void *sbkaddr;
-	void *skaddr;
-#if __KERNEL >= 420
-	int state;
-#endif
-	__u16 sport;
-	__u16 dport;
-#if __KERNEL >= 512
-	__u16 family;
-#endif
-	__u8 saddr[4];
-	__u8 daddr[4];
-	__u8 saddr_v6[16];
-	__u8 daddr_v6[16];
-};
-
-SEC("tracepoint/tcp/tcp_retransmit_skb")
-int tcp_retransmit_skb(struct trace_event_raw_tcp_event_sk_skb__stub *args)
-{
-	struct tcp_event e = {
-		.type = EVENT_TYPE_TCP_RETRANSMIT,
-		.sport = args->sport,
-		.dport = args->dport,
-	};
-	__builtin_memcpy(&e.saddr, &args->saddr_v6, sizeof(e.saddr));
-	__builtin_memcpy(&e.daddr, &args->daddr_v6, sizeof(e.daddr));
-
-	bpf_perf_event_output(args, &tcp_retransmit_events, BPF_F_CURRENT_CPU, &e, sizeof(e));
-
-	return 0;
-}

+ 0 - 118
ebpftracer/ebpf/tcp_state.c

@@ -1,118 +0,0 @@
-#define IPPROTO_TCP 6
-
-struct tcp_event {
-	__u32 type;
-	__u32 pid;
-	__u16 sport;
-	__u16 dport;
-	__u8 saddr[16];
-	__u8 daddr[16];
-};
-
-struct {
-	__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-	__uint(key_size, sizeof(int));
-	__uint(value_size, sizeof(int));
-} tcp_listen_events SEC(".maps");
-
-struct {
-	__uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
-	__uint(key_size, sizeof(int));
-	__uint(value_size, sizeof(int));
-} tcp_connect_events SEC(".maps");
-
-struct trace_event_raw_inet_sock_set_state__stub {
-	__u64 unused;
-	void *skaddr;
-	int oldstate;
-	int newstate;
-	__u16 sport;
-	__u16 dport;
-	__u16 family;
-#if __KERNEL >= 506
-	__u16 protocol;
-#else
-	__u8 protocol;
-#endif
-	__u8 saddr[4];
-	__u8 daddr[4];
-	__u8 saddr_v6[16];
-	__u8 daddr_v6[16];
-};
-
-struct sk_info {
-	__u32 pid;
-//	__u64 ts;
-};
-struct {
-	__uint(type, BPF_MAP_TYPE_HASH);
-	__uint(key_size, sizeof(void *));
-	__uint(value_size, sizeof(struct sk_info));
-	__uint(max_entries, 10240);
-} sk_info SEC(".maps");
-
-SEC("tracepoint/sock/inet_sock_set_state")
-int inet_sock_set_state(void *ctx)
-{
-	struct trace_event_raw_inet_sock_set_state__stub args = {};
-	if (bpf_probe_read(&args, sizeof(args), ctx) < 0) {
-		return 0;
-	}
-	if (args.protocol != IPPROTO_TCP) {
-		return 0;
-	}
-	if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_SYN_SENT) {
-		struct sk_info i = {};
-		i.pid = bpf_get_current_pid_tgid() >> 32;
-		bpf_map_update_elem(&sk_info, &args.skaddr, &i, BPF_ANY);
-		return 0;
-	}
-
-	__u32 pid = bpf_get_current_pid_tgid() >> 32;
-	__u32 type = 0;
-	void *map = &tcp_connect_events;
-	if (args.oldstate == BPF_TCP_SYN_SENT) {
-		if (args.newstate == BPF_TCP_ESTABLISHED) {
-			type = EVENT_TYPE_CONNECTION_OPEN;
-		} else if (args.newstate == BPF_TCP_CLOSE) {
-			type = EVENT_TYPE_CONNECTION_ERROR;
-		} else {
-			return 0;
-		}
-		struct sk_info *i = bpf_map_lookup_elem(&sk_info, &args.skaddr);
-		if (!i) {
-			return 0;
-		}
-		pid = i->pid;
-		bpf_map_delete_elem(&sk_info, &args.skaddr);
-	}
-	if (args.oldstate == BPF_TCP_ESTABLISHED && (args.newstate == BPF_TCP_FIN_WAIT1 || args.newstate == BPF_TCP_CLOSE_WAIT)) {
-		pid = 0;
-		type = EVENT_TYPE_CONNECTION_CLOSE;
-	}
-	if (args.oldstate == BPF_TCP_CLOSE && args.newstate == BPF_TCP_LISTEN) {
-		type = EVENT_TYPE_LISTEN_OPEN;
-		map = &tcp_listen_events;
-	}
-	if (args.oldstate == BPF_TCP_LISTEN && args.newstate == BPF_TCP_CLOSE) {
-		type = EVENT_TYPE_LISTEN_CLOSE;
-		map = &tcp_listen_events;
-	}
-
-	if (type == 0) {
-		return 0;
-	}
-
-	struct tcp_event e = {
-		.type = type,
-		.pid = pid,
-		.sport = args.sport,
-		.dport = args.dport,
-	};
-	__builtin_memcpy(&e.saddr, &args.saddr_v6, sizeof(e.saddr));
-	__builtin_memcpy(&e.daddr, &args.daddr_v6, sizeof(e.saddr));
-
-	bpf_perf_event_output(ctx, map, BPF_F_CURRENT_CPU, &e, sizeof(e));
-
-	return 0;
-}

+ 3 - 1
ebpftracer/init.go

@@ -8,11 +8,12 @@ import (
 
 type file struct {
 	pid uint32
-	fd  uint32
+	fd  uint64
 }
 
 type sock struct {
 	pid uint32
+	fd  uint64
 	proc.Sock
 }
 
@@ -46,6 +47,7 @@ func readFds(pids []uint32) (files []file, socks []sock) {
 			switch {
 			case fd.SocketInode != "":
 				if s, ok := sockets[fd.SocketInode]; ok {
+					s.fd = fd.Fd
 					s.pid = pid
 					socks = append(socks, s)
 				}

+ 86 - 11
ebpftracer/tracer.go

@@ -17,6 +17,7 @@ import (
 	"runtime"
 	"strconv"
 	"strings"
+	"time"
 )
 
 type EventType uint32
@@ -32,18 +33,55 @@ const (
 	EventTypeListenClose     EventType = 7
 	EventTypeFileOpen        EventType = 8
 	EventTypeTCPRetransmit   EventType = 9
+	EventTypeL7Request       EventType = 10
 
 	EventReasonNone    EventReason = 0
 	EventReasonOOMKill EventReason = 1
 )
 
+type L7Protocol uint8
+
+const (
+	L7ProtocolHTTP      L7Protocol = 1
+	L7ProtocolPostgres  L7Protocol = 2
+	L7ProtocolRedis     L7Protocol = 3
+	L7ProtocolMemcached L7Protocol = 4
+	L7ProtocolMysql     L7Protocol = 5
+	L7ProtocolMongo     L7Protocol = 6
+)
+
+func (p L7Protocol) String() string {
+	switch p {
+	case L7ProtocolHTTP:
+		return "HTTP"
+	case L7ProtocolPostgres:
+		return "Postgres"
+	case L7ProtocolRedis:
+		return "Redis"
+	case L7ProtocolMemcached:
+		return "Memcached"
+	case L7ProtocolMysql:
+		return "Mysql"
+	case L7ProtocolMongo:
+		return "Mongo"
+	}
+	return "UNKNOWN:" + strconv.Itoa(int(p))
+}
+
+type L7Request struct {
+	Protocol L7Protocol
+	Status   int
+	Duration time.Duration
+}
+
 type Event struct {
-	Type    EventType
-	Reason  EventReason
-	Pid     uint32
-	SrcAddr netaddr.IPPort
-	DstAddr netaddr.IPPort
-	Fd      uint32
+	Type      EventType
+	Reason    EventReason
+	Pid       uint32
+	SrcAddr   netaddr.IPPort
+	DstAddr   netaddr.IPPort
+	Fd        uint64
+	L7Request *L7Request
 }
 
 type Tracer struct {
@@ -52,11 +90,14 @@ type Tracer struct {
 	links      []link.Link
 }
 
-func NewTracer(events chan<- Event, kernelVersion string) (*Tracer, error) {
+func NewTracer(events chan<- Event, kernelVersion string, disableL7Tracing bool) (*Tracer, error) {
 	t := &Tracer{readers: map[string]*perf.Reader{}}
-	if err := t.ebpf(events, kernelVersion); err != nil {
+	if err := t.ebpf(events, kernelVersion, disableL7Tracing); err != nil {
 		return nil, err
 	}
+	if disableL7Tracing {
+		klog.Infoln("L7 tracing is disabled")
+	}
 	if err := t.init(events); err != nil {
 		return nil, err
 	}
@@ -104,6 +145,7 @@ func (t *Tracer) init(ch chan<- Event) error {
 		ch <- Event{
 			Type:    typ,
 			Pid:     s.pid,
+			Fd:      s.fd,
 			SrcAddr: s.SAddr,
 			DstAddr: s.DAddr,
 		}
@@ -111,7 +153,7 @@ func (t *Tracer) init(ch chan<- Event) error {
 	return nil
 }
 
-func (t *Tracer) ebpf(ch chan<- Event, kernelVersion string) error {
+func (t *Tracer) ebpf(ch chan<- Event, kernelVersion string, disableL7Tracing bool) error {
 	kv := "v" + common.KernelMajorMinor(kernelVersion)
 	var prg []byte
 	for _, p := range ebpfProg {
@@ -146,6 +188,10 @@ func (t *Tracer) ebpf(ch chan<- Event, kernelVersion string) error {
 		"tcp_retransmit_events": &tcpEvent{},
 		"file_events":           &fileEvent{},
 	}
+	if !disableL7Tracing {
+		events["l7_events"] = &l7Event{}
+	}
+
 	for name, typ := range events {
 		r, err := perf.NewReader(t.collection.Maps[name], os.Getpagesize())
 		if err != nil {
@@ -161,6 +207,16 @@ func (t *Tracer) ebpf(ch chan<- Event, kernelVersion string) error {
 		if runtime.GOARCH == "arm64" && (spec.Name == "sys_enter_open" || spec.Name == "sys_exit_open") {
 			continue
 		}
+		if disableL7Tracing {
+			switch spec.Name {
+			case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto":
+				continue
+			case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom":
+				continue
+			case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom":
+				continue
+			}
+		}
 		var err error
 		var l link.Link
 		switch spec.Type {
@@ -200,6 +256,8 @@ func (t EventType) String() string {
 		return "file-open"
 	case EventTypeTCPRetransmit:
 		return "tcp-retransmit"
+	case EventTypeL7Request:
+		return "l7-request"
 	}
 	return "unknown: " + strconv.Itoa(int(t))
 }
@@ -229,6 +287,7 @@ func (e procEvent) Event() Event {
 }
 
 type tcpEvent struct {
+	Fd    uint64
 	Type  uint32
 	Pid   uint32
 	SPort uint16
@@ -238,19 +297,35 @@ type tcpEvent struct {
 }
 
 func (e tcpEvent) Event() Event {
-	return Event{Type: EventType(e.Type), Pid: e.Pid, SrcAddr: ipPort(e.SAddr, e.SPort), DstAddr: ipPort(e.DAddr, e.DPort)}
+	return Event{Type: EventType(e.Type), Pid: e.Pid, SrcAddr: ipPort(e.SAddr, e.SPort), DstAddr: ipPort(e.DAddr, e.DPort), Fd: e.Fd}
 }
 
 type fileEvent struct {
 	Type uint32
 	Pid  uint32
-	Fd   uint32
+	Fd   uint64
 }
 
 func (e fileEvent) Event() Event {
 	return Event{Type: EventType(e.Type), Pid: e.Pid, Fd: e.Fd}
 }
 
+type l7Event struct {
+	Fd       uint64
+	Pid      uint32
+	Status   uint32
+	Duration uint64
+	Protocol uint8
+}
+
+func (e l7Event) Event() Event {
+	return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, L7Request: &L7Request{
+		Protocol: L7Protocol(e.Protocol),
+		Status:   int(e.Status),
+		Duration: time.Duration(e.Duration),
+	}}
+}
+
 func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
 	for {
 		rec, err := r.Read()

+ 1 - 1
ebpftracer/tracer_test.go

@@ -323,7 +323,7 @@ func runTracer(t *testing.T, verbose bool) (func() *Event, func()) {
 	assert.NoError(t, unix.Uname(&uname))
 
 	go func() {
-		tt, err := NewTracer(events, string(bytes.Split(uname.Release[:], []byte{0})[0]))
+		tt, err := NewTracer(events, string(bytes.Split(uname.Release[:], []byte{0})[0]), false)
 		require.NoError(t, err)
 		<-done
 		tt.Close()

+ 6 - 4
flags/flags.go

@@ -9,10 +9,12 @@ import (
 )
 
 var (
-	ListenAddress             = kingpin.Flag("listen", "Listen address - ip:port or :port").Default("0.0.0.0:80").String()
-	CgroupRoot                = kingpin.Flag("cgroupfs-root", "The mount point of the host cgroupfs root").Default("/sys/fs/cgroup").String()
-	NoParseLogs               = kingpin.Flag("no-parse-logs", "Disable container logs parsing").Default("false").Bool()
-	NoPingUpstreams           = kingpin.Flag("no-ping-upstreams", "Disable container upstreams ping").Default("false").Bool()
+	ListenAddress     = kingpin.Flag("listen", "Listen address - ip:port or :port").Default("0.0.0.0:80").String()
+	CgroupRoot        = kingpin.Flag("cgroupfs-root", "The mount point of the host cgroupfs root").Default("/sys/fs/cgroup").String()
+	DisableLogParsing = kingpin.Flag("disable-log-parsing", "Disable container log parsing").Default("false").Bool()
+	DisablePinger     = kingpin.Flag("disable-pinger", "Don't ping upstreams").Default("false").Bool()
+	DisableL7Tracing  = kingpin.Flag("disable-l7-tracing", "Disable L7 tracing").Default("false").Bool()
+
 	externalNetworksWhitelist = kingpin.Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").Strings()
 	ExternalNetworksWhitelist []netaddr.IPPrefix
 

+ 5 - 5
proc/fd.go

@@ -8,7 +8,7 @@ import (
 )
 
 type Fd struct {
-	Fd   uint32
+	Fd   uint64
 	Dest string
 
 	SocketInode string
@@ -22,7 +22,7 @@ func ReadFds(pid uint32) ([]Fd, error) {
 	}
 	res := make([]Fd, 0, len(entries))
 	for _, entry := range entries {
-		fd, err := strconv.Atoi(entry.Name())
+		fd, err := strconv.ParseUint(entry.Name(), 10, 64)
 		if err != nil {
 			continue
 		}
@@ -34,7 +34,7 @@ func ReadFds(pid uint32) ([]Fd, error) {
 		if strings.HasPrefix(dest, "socket:[") && strings.HasSuffix(dest, "]") {
 			socketInode = dest[len("socket:[") : len(dest)-1]
 		}
-		res = append(res, Fd{Fd: uint32(fd), Dest: dest, SocketInode: socketInode})
+		res = append(res, Fd{Fd: fd, Dest: dest, SocketInode: socketInode})
 	}
 	return res, nil
 }
@@ -45,8 +45,8 @@ type FdInfo struct {
 	Dest  string
 }
 
-func GetFdInfo(pid uint32, fd uint32) *FdInfo {
-	fds := strconv.Itoa(int(fd))
+func GetFdInfo(pid uint32, fd uint64) *FdInfo {
+	fds := strconv.FormatUint(fd, 10)
 	data, err := os.ReadFile(Path(pid, "fdinfo", fds))
 	if err != nil {
 		return nil

이 변경점에서 너무 많은 파일들이 변경되어 몇몇 파일들은 표시되지 않았습니다.