瀏覽代碼

eBPF tracer: HTTP/2 protocol support

Anton Petruhin 2 年之前
父節點
當前提交
739dc94854

+ 75 - 83
containers/container.go

@@ -5,6 +5,7 @@ import (
 	"github.com/coroot/coroot-node-agent/cgroup"
 	"github.com/coroot/coroot-node-agent/common"
 	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/flags"
 	"github.com/coroot/coroot-node-agent/logs"
 	"github.com/coroot/coroot-node-agent/node"
@@ -73,12 +74,9 @@ type ActiveConnection struct {
 	Timestamp  uint64
 	Closed     time.Time
 
-	PreparedStatements map[string]string
-}
-
-type L7Stats struct {
-	Requests *prometheus.CounterVec
-	Latency  prometheus.Histogram
+	http2Parser    *l7.Http2Parser
+	postgresParser *l7.PostgresParser
+	mysqlParser    *l7.MysqlParser
 }
 
 type ListenDetails struct {
@@ -123,7 +121,7 @@ type Container struct {
 	connectionsActive  map[AddrPair]*ActiveConnection
 	retransmits        map[AddrPair]int64 // dst:actual_dst -> count
 
-	l7Stats map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats // protocol -> dst:actual_dst -> stats
+	l7Stats L7Stats
 
 	oomKills int
 
@@ -162,7 +160,7 @@ func NewContainer(id ContainerID, cg *cgroup.Cgroup, md *ContainerMetadata, host
 		connectLastAttempt: map[netaddr.IPPort]time.Time{},
 		connectionsActive:  map[AddrPair]*ActiveConnection{},
 		retransmits:        map[AddrPair]int64{},
-		l7Stats:            map[ebpftracer.L7Protocol]map[AddrPair]*L7Stats{},
+		l7Stats:            L7Stats{},
 
 		mounts: map[string]proc.MountInfo{},
 
@@ -343,16 +341,7 @@ func (c *Container) Collect(ch chan<- prometheus.Metric) {
 		ch <- gauge(metrics.ApplicationType, 1, appType)
 	}
 
-	for _, protoStats := range c.l7Stats {
-		for _, s := range protoStats {
-			if s.Requests != nil {
-				s.Requests.Collect(ch)
-			}
-			if s.Latency != nil {
-				s.Latency.Collect(ch)
-			}
-		}
-	}
+	c.l7Stats.collect(ch)
 
 	if !*flags.DisablePinger {
 		for ip, rtt := range c.ping() {
@@ -510,11 +499,10 @@ func (c *Container) onConnectionOpen(pid uint32, fd uint64, src, dst netaddr.IPP
 	} else {
 		c.connectsSuccessful[AddrPair{src: dst, dst: *actualDst}]++
 		c.connectionsActive[AddrPair{src: src, dst: dst}] = &ActiveConnection{
-			ActualDest:         *actualDst,
-			Pid:                pid,
-			Fd:                 fd,
-			Timestamp:          timestamp,
-			PreparedStatements: map[string]string{},
+			ActualDest: *actualDst,
+			Pid:        pid,
+			Fd:         fd,
+			Timestamp:  timestamp,
 		}
 	}
 	c.connectLastAttempt[dst] = time.Now()
@@ -561,64 +549,74 @@ func (c *Container) onConnectionClose(srcDst AddrPair) bool {
 	return true
 }
 
-func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *ebpftracer.L7Request) {
+func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.RequestData) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
-	for dest, conn := range c.connectionsActive {
+
+	var dest AddrPair
+	var conn *ActiveConnection
+	var found bool
+	for dest, conn = range c.connectionsActive {
 		if conn.Pid == pid && conn.Fd == fd && (timestamp == 0 || conn.Timestamp == timestamp) {
-			key := AddrPair{src: dest.dst, dst: conn.ActualDest}
-			stats := c.l7Stats[r.Protocol]
-			if stats == nil {
-				stats = map[AddrPair]*L7Stats{}
-				c.l7Stats[r.Protocol] = stats
-			}
-			tracing.HandleL7Request(string(c.id), conn.ActualDest, r, conn.PreparedStatements)
-			if r.Method == ebpftracer.L7MethodStatementClose {
-				return
-			}
-			s := stats[key]
-			if s == nil {
-				constLabels := map[string]string{"destination": key.src.String(), "actual_destination": key.dst.String()}
-				s = &L7Stats{}
-				cOpts, ok := L7Requests[r.Protocol]
-				if !ok {
-					klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
-					return
-				}
-				if cOpts.Name != "" {
-					labels := []string{"status"}
-					if r.Protocol == ebpftracer.L7ProtocolRabbitmq || r.Protocol == ebpftracer.L7ProtocolNats {
-						labels = append(labels, "method")
-					}
-					s.Requests = prometheus.NewCounterVec(
-						prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help, ConstLabels: constLabels}, labels,
-					)
-				}
-				hOpts, ok := L7Latency[r.Protocol]
-				if !ok {
-					klog.Warningln("cannot find metric description for L7 protocol: %s", r.Protocol.String())
-					return
-				}
-				if hOpts.Name != "" {
-					s.Latency = prometheus.NewHistogram(
-						prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
-					)
-				}
-				stats[key] = s
-			}
-			if s.Requests != nil {
-				if r.Protocol == ebpftracer.L7ProtocolRabbitmq || r.Protocol == ebpftracer.L7ProtocolNats {
-					s.Requests.WithLabelValues(r.StatusString(), r.Method.String()).Inc()
-				} else {
-					s.Requests.WithLabelValues(r.StatusString()).Inc()
-				}
-			}
-			if s.Latency != nil {
-				s.Latency.Observe(r.Duration.Seconds())
-			}
-			return
+			found = true
+			break
 		}
 	}
+	if !found {
+		return
+	}
+
+	stats := c.l7Stats.get(r.Protocol, dest.dst, conn.ActualDest)
+	trace := tracing.NewTrace(string(c.id), conn.ActualDest)
+	switch r.Protocol {
+	case l7.ProtocolHTTP:
+		stats.observe(r.Status.Http(), "", r.Duration)
+		method, path := l7.ParseHttp(r.Payload)
+		trace.HttpRequest(method, path, r.Status, r.Duration)
+	case l7.ProtocolHTTP2:
+		if conn.http2Parser == nil {
+			conn.http2Parser = l7.NewHttp2Parser()
+		}
+		requests := conn.http2Parser.Parse(r.Method, r.Payload, uint64(r.Duration))
+		for _, req := range requests {
+			stats.observe(req.Status.Http(), "", req.Duration)
+			trace.Http2Request(req.Method, req.Path, req.Scheme, req.Status, req.Duration)
+		}
+	case l7.ProtocolPostgres:
+		if r.Method != l7.MethodStatementClose {
+			stats.observe(r.Status.String(), "", r.Duration)
+		}
+		if conn.postgresParser == nil {
+			conn.postgresParser = l7.NewPostgresParser()
+		}
+		query := conn.postgresParser.Parse(r.Payload)
+		trace.PostgresQuery(query, r.Status.Error(), r.Duration)
+	case l7.ProtocolMysql:
+		if r.Method != l7.MethodStatementClose {
+			stats.observe(r.Status.String(), "", r.Duration)
+		}
+		if conn.mysqlParser == nil {
+			conn.mysqlParser = l7.NewMysqlParser()
+		}
+		query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
+		trace.MysqlQuery(query, r.Status.Error(), r.Duration)
+	case l7.ProtocolMemcached:
+		stats.observe(r.Status.String(), "", r.Duration)
+		cmd, items := l7.ParseMemcached(r.Payload)
+		trace.MemcachedQuery(cmd, items, r.Status.Error(), r.Duration)
+	case l7.ProtocolRedis:
+		stats.observe(r.Status.String(), "", r.Duration)
+		cmd, args := l7.ParseRedis(r.Payload)
+		trace.RedisQuery(cmd, args, r.Status.Error(), r.Duration)
+	case l7.ProtocolMongo:
+		stats.observe(r.Status.String(), "", r.Duration)
+		query := l7.ParseMongo(r.Payload)
+		trace.MongoQuery(query, r.Status.Error(), r.Duration)
+	case l7.ProtocolKafka, l7.ProtocolCassandra:
+		stats.observe(r.Status.String(), "", r.Duration)
+	case l7.ProtocolRabbitmq, l7.ProtocolNats:
+		stats.observe(r.Status.String(), r.Method.String(), 0)
+	}
 }
 
 func (c *Container) onRetransmit(srcDst AddrPair) bool {
@@ -910,13 +908,7 @@ func (c *Container) gc(now time.Time) {
 					delete(c.retransmits, d)
 				}
 			}
-			for _, protoStats := range c.l7Stats {
-				for d := range protoStats {
-					if d.src == dst {
-						delete(protoStats, d)
-					}
-				}
-			}
+			c.l7Stats.delete(dst)
 		}
 	}
 }

+ 92 - 0
containers/l7.go

@@ -0,0 +1,92 @@
+package containers
+
+import (
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
+	"github.com/prometheus/client_golang/prometheus"
+	"inet.af/netaddr"
+	"k8s.io/klog/v2"
+	"time"
+)
+
+type L7Metrics struct {
+	Requests *prometheus.CounterVec
+	Latency  prometheus.Histogram
+}
+
+func (m *L7Metrics) observe(status, method string, duration time.Duration) {
+	if m.Requests != nil {
+		var err error
+		var c prometheus.Counter
+		if method != "" {
+			c, err = m.Requests.GetMetricWithLabelValues(status, method)
+		} else {
+			c, err = m.Requests.GetMetricWithLabelValues(status)
+		}
+		if err != nil {
+			klog.Warningln(err)
+		} else {
+			c.Inc()
+		}
+	}
+	if m.Latency != nil && duration != 0 {
+		m.Latency.Observe(duration.Seconds())
+	}
+}
+
+type L7Stats map[l7.Protocol]map[AddrPair]*L7Metrics // protocol -> dst:actual_dst -> metrics
+
+func (s L7Stats) get(protocol l7.Protocol, destination, actualDestination netaddr.IPPort) *L7Metrics {
+	if protocol == l7.ProtocolHTTP2 {
+		protocol = l7.ProtocolHTTP
+	}
+	protoStats := s[protocol]
+	if protoStats == nil {
+		protoStats = map[AddrPair]*L7Metrics{}
+		s[protocol] = protoStats
+	}
+	dest := AddrPair{src: destination, dst: actualDestination}
+	m := protoStats[dest]
+	if m == nil {
+		m = &L7Metrics{}
+		protoStats[dest] = m
+		constLabels := map[string]string{"destination": destination.String(), "actual_destination": actualDestination.String()}
+		labels := []string{"status"}
+		switch protocol {
+		case l7.ProtocolRabbitmq, l7.ProtocolNats:
+			labels = append(labels, "method")
+		default:
+			hOpts := L7Latency[protocol]
+			m.Latency = prometheus.NewHistogram(
+				prometheus.HistogramOpts{Name: hOpts.Name, Help: hOpts.Help, ConstLabels: constLabels},
+			)
+		}
+		cOpts := L7Requests[protocol]
+		m.Requests = prometheus.NewCounterVec(
+			prometheus.CounterOpts{Name: cOpts.Name, Help: cOpts.Help, ConstLabels: constLabels}, labels,
+		)
+	}
+	return m
+}
+
+func (s L7Stats) collect(ch chan<- prometheus.Metric) {
+	for _, protoStats := range s {
+		for _, m := range protoStats {
+			if m.Requests != nil {
+				m.Requests.Collect(ch)
+			}
+			if m.Latency != nil {
+				m.Latency.Collect(ch)
+			}
+		}
+	}
+}
+
+func (s L7Stats) delete(dst netaddr.IPPort) {
+	for _, protoStats := range s {
+		for d := range protoStats {
+			if d.src == dst {
+				delete(protoStats, d)
+			}
+		}
+	}
+}

+ 21 - 23
containers/metrics.go

@@ -1,7 +1,7 @@
 package containers
 
 import (
-	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/prometheus/client_golang/prometheus"
 )
 
@@ -89,29 +89,27 @@ var metrics = struct {
 }
 
 var (
-	L7Requests = map[ebpftracer.L7Protocol]prometheus.CounterOpts{
-		ebpftracer.L7ProtocolHTTP:      {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
-		ebpftracer.L7ProtocolPostgres:  {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
-		ebpftracer.L7ProtocolRedis:     {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
-		ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
-		ebpftracer.L7ProtocolMysql:     {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
-		ebpftracer.L7ProtocolMongo:     {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
-		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
-		ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
-		ebpftracer.L7ProtocolRabbitmq:  {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
-		ebpftracer.L7ProtocolNats:      {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
+	L7Requests = map[l7.Protocol]prometheus.CounterOpts{
+		l7.ProtocolHTTP:      {Name: "container_http_requests_total", Help: "Total number of outbound HTTP requests"},
+		l7.ProtocolPostgres:  {Name: "container_postgres_queries_total", Help: "Total number of outbound Postgres queries"},
+		l7.ProtocolRedis:     {Name: "container_redis_queries_total", Help: "Total number of outbound Redis queries"},
+		l7.ProtocolMemcached: {Name: "container_memcached_queries_total", Help: "Total number of outbound Memcached queries"},
+		l7.ProtocolMysql:     {Name: "container_mysql_queries_total", Help: "Total number of outbound Mysql queries"},
+		l7.ProtocolMongo:     {Name: "container_mongo_queries_total", Help: "Total number of outbound Mongo queries"},
+		l7.ProtocolKafka:     {Name: "container_kafka_requests_total", Help: "Total number of outbound Kafka requests"},
+		l7.ProtocolCassandra: {Name: "container_cassandra_queries_total", Help: "Total number of outbound Cassandra requests"},
+		l7.ProtocolRabbitmq:  {Name: "container_rabbitmq_messages_total", Help: "Total number of Rabbitmq messages produced or consumed by the container"},
+		l7.ProtocolNats:      {Name: "container_nats_messages_total", Help: "Total number of NATS messages produced or consumed by the container"},
 	}
-	L7Latency = map[ebpftracer.L7Protocol]prometheus.HistogramOpts{
-		ebpftracer.L7ProtocolHTTP:      {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
-		ebpftracer.L7ProtocolPostgres:  {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
-		ebpftracer.L7ProtocolRedis:     {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
-		ebpftracer.L7ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
-		ebpftracer.L7ProtocolMysql:     {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
-		ebpftracer.L7ProtocolMongo:     {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
-		ebpftracer.L7ProtocolKafka:     {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
-		ebpftracer.L7ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
-		ebpftracer.L7ProtocolRabbitmq:  {Name: "", Help: ""},
-		ebpftracer.L7ProtocolNats:      {Name: "", Help: ""},
+	L7Latency = map[l7.Protocol]prometheus.HistogramOpts{
+		l7.ProtocolHTTP:      {Name: "container_http_requests_duration_seconds_total", Help: "Histogram of the response time for each outbound HTTP request"},
+		l7.ProtocolPostgres:  {Name: "container_postgres_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Postgres query"},
+		l7.ProtocolRedis:     {Name: "container_redis_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Redis query"},
+		l7.ProtocolMemcached: {Name: "container_memcached_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Memcached query"},
+		l7.ProtocolMysql:     {Name: "container_mysql_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mysql query"},
+		l7.ProtocolMongo:     {Name: "container_mongo_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Mongo query"},
+		l7.ProtocolKafka:     {Name: "container_kafka_requests_duration_seconds_total", Help: "Histogram of the execution time for each outbound Kafka request"},
+		l7.ProtocolCassandra: {Name: "container_cassandra_queries_duration_seconds_total", Help: "Histogram of the execution time for each outbound Cassandra request"},
 	}
 )
 

+ 9 - 6
ebpftracer/Makefile

@@ -5,26 +5,29 @@ build:
 	docker run --rm --name ebpftracer ebpftracer cat /tmp/ebpf/ebpf.go > ./ebpf.go
 	@echo
 
-test: test_vm1 test_vm2 test_vm3 test_vm4
+test: test_vm1 test_vm2 test_vm3 test_vm4 test_vm5
 
 define test_in_vm
 	@echo ===TESTING IN $(1)===
-	vagrant ssh $(1) -c "uname -r && cd /tmp/src && VM=$(1) sudo go test -p 1 -count 1 -v ./ebpftracer/..."
+	vagrant ssh $(1) -c "uname -r && cd /tmp/src && sudo VM=$(1) go test -p 1 -count 1 -v ./ebpftracer/..."
 	@echo
 endef
 
-test_vm1: build
+test_vm1:
 	$(call test_in_vm,ubuntu1810)
 
-test_vm2: build
+test_vm2:
 	$(call test_in_vm,ubuntu2004)
 
-test_vm3: build
+test_vm3:
 	$(call test_in_vm,ubuntu2010)
 
-test_vm4: build
+test_vm4:
 	$(call test_in_vm,ubuntu2110)
 
+test_vm5:
+	$(call test_in_vm,ubuntu2204)
+
 vms_start:
 	vagrant up
 

+ 4 - 1
ebpftracer/Vagrantfile

@@ -1,6 +1,6 @@
 $script = <<-SCRIPT
 sudo apt update && sudo apt -y install gcc cgroup-tools
-sudo snap install go --classic
+sudo snap install go --channel=1.19/stable --classic
 
 # prevent unexpected `systemd-udevd` processes from appearing
 systemctl stop systemd-udevd.service systemd-udevd-control.socket systemd-udevd-kernel.socket
@@ -31,4 +31,7 @@ Vagrant.configure("2") do |config|
     config.vm.define "ubuntu2110" do |c|
         c.vm.box = "generic/ubuntu2110"
     end
+    config.vm.define "ubuntu2204" do |c|
+        c.vm.box = "generic/ubuntu2204"
+    end
 end

File diff suppressed because it is too large
+ 0 - 0
ebpftracer/ebpf.go


+ 2 - 0
ebpftracer/ebpf/ebpf.c

@@ -17,6 +17,8 @@
 
 #define EVENT_REASON_OOM_KILL		1
 
+#define MIN(a,b) (((a)<(b))?(a):(b))
+
 #define bpf_read(src, dst)                            \
 ({                                                    \
     if (bpf_probe_read(&dst, sizeof(dst), src) < 0) { \

+ 29 - 15
ebpftracer/ebpf/l7/cassandra.c

@@ -3,8 +3,11 @@
 #define CASSANDRA_REQUEST_FRAME  0x04
 #define CASSANDRA_RESPONSE_FRAME 0x84
 
-#define CASSANDRA_OPCODE_ERROR  0x00
-#define CASSANDRA_OPCODE_RESULT 0x08
+#define CASSANDRA_OPCODE_ERROR      0x00
+#define CASSANDRA_OPCODE_QUERY      0x07
+#define CASSANDRA_OPCODE_RESULT     0x08
+#define CASSANDRA_OPCODE_EXECUTE    0x0A
+#define CASSANDRA_OPCODE_BATCH      0x0D
 
 struct cassandra_header {
     __u8 version;
@@ -14,30 +17,41 @@ struct cassandra_header {
 };
 
 static __always_inline
-__s16 is_cassandra_request(char *buf, int buf_size) {
-    if (buf_size < 1) {
-        return -1;
-    }
+int is_cassandra_request(char *buf, __u64 buf_size, __s16 *stream_id) {
     struct cassandra_header h = {};
-    if (bpf_probe_read(&h, sizeof(h), (void *)buf) < 0) {
-        return -1;
+    if (buf_size < sizeof(h)) {
+        return 0;
+    }
+    bpf_read(buf, h);
+    if (h.version != CASSANDRA_REQUEST_FRAME) {
+        return 0;
     }
-    if (h.version == CASSANDRA_REQUEST_FRAME && h.stream_id >= 0) {
-        return h.stream_id;
+    if (h.opcode == CASSANDRA_OPCODE_QUERY || h.opcode == CASSANDRA_OPCODE_EXECUTE || h.opcode == CASSANDRA_OPCODE_BATCH) {
+        *stream_id = h.stream_id;
+        return 1;
     }
-    return -1;
+    return 0;
 }
 
 static __always_inline
-__u32 cassandra_status(struct cassandra_header h) {
-    if (h.version != CASSANDRA_RESPONSE_FRAME || h.stream_id == -1) {
+int is_cassandra_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *status) {
+    struct cassandra_header h = {};
+    if (buf_size < sizeof(h)) {
+        return 0;
+    }
+    bpf_read(buf, h);
+    if (h.version != CASSANDRA_RESPONSE_FRAME) {
         return 0;
     }
     if (h.opcode == CASSANDRA_OPCODE_RESULT) {
-        return 200;
+        *stream_id = h.stream_id;
+        *status = STATUS_OK;
+        return 1;
     }
     if (h.opcode == CASSANDRA_OPCODE_ERROR) {
-        return 500;
+        *stream_id = h.stream_id;
+        *status = STATUS_FAILED;
+        return 1;
     }
     return 0;
 }

+ 2 - 2
ebpftracer/ebpf/l7/gotls.c

@@ -42,7 +42,7 @@ int go_crypto_tls_write_enter(struct pt_regs *ctx) {
     }
     char *buf_ptr = (char*)GO_PARAM2(ctx);
     __u64 buf_size = GO_PARAM3(ctx);
-    return trace_enter_write(ctx, fd, 1, buf_ptr, buf_size);
+    return trace_enter_write(ctx, fd, 1, buf_ptr, buf_size, 0);
 }
 
 SEC("uprobe/go_crypto_tls_read_enter")
@@ -56,7 +56,7 @@ int go_crypto_tls_read_enter(struct pt_regs *ctx) {
     __u64 goroutine_id = GOROUTINE(ctx);
     __u64 pid = pid_tgid >> 32;
     __u64 id = pid << 32 | goroutine_id | IS_TLS_READ_ID;
-    return trace_enter_read(id, fd, buf_ptr, 0);
+    return trace_enter_read(id, fd, buf_ptr, 0, 0);
 }
 
 SEC("uprobe/go_crypto_tls_read_exit")

+ 3 - 2
ebpftracer/ebpf/l7/http.c

@@ -33,7 +33,7 @@ int is_http_request(char *buf) {
 }
 
 static __always_inline
-__u32 parse_http_status(char *buf) {
+int is_http_response(char *buf, __u32 *status) {
     char b[16];
     if (bpf_probe_read_str(&b, sizeof(b), (void *)buf) < 16) {
         return 0;
@@ -56,5 +56,6 @@ __u32 parse_http_status(char *buf) {
     if (b[9] < '0' || b[9] > '9' || b[10] < '0' || b[10] > '9' || b[11] < '0' || b[11] > '9') {
         return 0;
     }
-    return (b[9]-'0')*100 + (b[10]-'0')*10 + (b[11]-'0');
+    *status = (b[9]-'0')*100 + (b[10]-'0')*10 + (b[11]-'0');
+    return 1;
 }

+ 41 - 0
ebpftracer/ebpf/l7/http2.c

@@ -0,0 +1,41 @@
+#define HTTP2_CLIENT_INITIATED_STREAM(stream_id) (stream_id & 0x01000000) // big-endian (network byte order) odd number
+#define HTTP2_SETTINGS_FRAME 0x4
+
+static __always_inline
+int is_client_preface(char *buf, __u64 size, __u8 method) {
+    if (method != METHOD_HTTP2_CLIENT_FRAMES || size < 24) {
+        return 0;
+    }
+    char b[5];
+    bpf_read(buf, b);
+    if (b[0] == 'P' && b[1]=='R' && b[2]=='I' && b[3]==' ' && b[4]=='*') {
+        return 1;
+    }
+    return 0;
+}
+
+static __always_inline
+int is_server_preface(__u8 frame_type, __u32 stream_id, __u8 method) {
+    return method == METHOD_HTTP2_SERVER_FRAMES && frame_type == HTTP2_SETTINGS_FRAME && stream_id == 0;
+}
+
+static __always_inline
+int looks_like_http2_frame(char *buf, __u64 size, __u8 method) {
+    __u32 frame_length;
+    bpf_read(buf, frame_length);
+    frame_length = bpf_htonl(frame_length) >> 8;
+    if (frame_length + 9 > size) {
+        return is_client_preface(buf, size, method);
+    }
+    __u8 frame_type;
+    bpf_read(buf+3, frame_type);
+    if (frame_type > 0x9) {
+        return is_client_preface(buf, size, method);
+    }
+    __u32 stream_id;
+    bpf_read(buf+5, stream_id);
+    if (!HTTP2_CLIENT_INITIATED_STREAM(stream_id)) {
+        return is_server_preface(frame_type, stream_id, method);
+    }
+    return 1;
+}

+ 12 - 15
ebpftracer/ebpf/l7/kafka.c

@@ -13,36 +13,33 @@ struct kafka_response_header {
 };
 
 static __always_inline
-__s32 is_kafka_request(char *buf, int buf_size) {
-    if (buf_size < 1) {
-        return 0;
-    }
+int is_kafka_request(char *buf, __u64 buf_size, __s32 *request_id) {
     struct kafka_request_header h = {};
-    if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+    if (buf_size < sizeof(h)) {
         return 0;
     }
-    h.length = bpf_htonl(h.length);
-    h.api_key = bpf_htons(h.api_key);
-    h.api_version = bpf_htons(h.api_version);
-    h.correlation_id = bpf_htonl(h.correlation_id);
+    bpf_read(buf, h);
 
+    h.length = bpf_htonl(h.length);
     if (h.length+4 != buf_size) {
         return 0;
     }
+    h.api_key = bpf_htons(h.api_key);
+//    h.api_version = bpf_htons(h.api_version);
+    h.correlation_id = bpf_htonl(h.correlation_id);
     if (h.correlation_id > 0 && (h.api_key >= 0 && h.api_key <= 67)) {
-        return h.correlation_id;
+        *request_id = h.correlation_id;
+        return 1;
     }
     return 0;
 }
 
 static __always_inline
-__u32 parse_kafka_status(__s32 request_id, char *buf, int buf_size, __u8 partial) {
+int is_kafka_response(char *buf, __s32 request_id) {
     struct kafka_response_header h = {};
-    if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
+    bpf_read(buf, h);
     if (bpf_htonl(h.correlation_id) == request_id) {
-        return 200;
+        return 1;
     }
     return 0;
 }

+ 223 - 96
ebpftracer/ebpf/l7/l7.c

@@ -9,12 +9,34 @@
 #define PROTOCOL_CASSANDRA  8
 #define PROTOCOL_RABBITMQ   9
 #define PROTOCOL_NATS      10
-
-#define METHOD_UNKNOWN           0
-#define METHOD_PRODUCE           1
-#define METHOD_CONSUME           2
-#define METHOD_STATEMENT_PREPARE 3
-#define METHOD_STATEMENT_CLOSE   4
+#define PROTOCOL_HTTP2	   11
+
+#define STATUS_UNKNOWN  0
+#define STATUS_OK       200
+#define STATUS_FAILED   500
+
+#define METHOD_UNKNOWN              0
+#define METHOD_PRODUCE              1
+#define METHOD_CONSUME              2
+#define METHOD_STATEMENT_PREPARE    3
+#define METHOD_STATEMENT_CLOSE      4
+#define METHOD_HTTP2_CLIENT_FRAMES  5
+#define METHOD_HTTP2_SERVER_FRAMES  6
+
+#define MAX_PAYLOAD_SIZE 1024 // must be power of 2
+#define TRUNCATE_PAYLOAD_SIZE(size) ({                                  \
+    size = MIN(size, MAX_PAYLOAD_SIZE-1);                               \
+    asm volatile ("%0 &= %1" : "+r"(size) : "i"(MAX_PAYLOAD_SIZE-1));   \
+})
+#define COPY_PAYLOAD(dst, size, src) ({     \
+    TRUNCATE_PAYLOAD_SIZE(size);            \
+    if (bpf_probe_read(dst, size, src)) {   \
+        return 0;                           \
+    }                                       \
+})
+
+#define IOVEC_BUF_SIZE MAX_PAYLOAD_SIZE * 2  // must be double of MAX_PAYLOAD_SIZE
+#define MAX_IOVEC_SIZE 32
 
 #include "http.c"
 #include "postgres.c"
@@ -26,8 +48,7 @@
 #include "cassandra.c"
 #include "rabbitmq.c"
 #include "nats.c"
-
-#define MAX_PAYLOAD_SIZE 512
+#include "http2.c"
 
 struct l7_event {
     __u64 fd;
@@ -39,12 +60,13 @@ struct l7_event {
     __u8 method;
     __u16 padding;
     __u32 statement_id;
+    __u64 payload_size;
     char payload[MAX_PAYLOAD_SIZE];
 };
 
 struct {
      __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-     __type(key, __u32);
+     __type(key, int);
      __type(value, struct l7_event);
      __uint(max_entries, 1);
 } l7_event_heap SEC(".maps");
@@ -59,6 +81,7 @@ struct read_args {
     __u64 fd;
     char* buf;
     __u64* ret;
+    __u64 iovlen;
 };
 
 struct {
@@ -68,7 +91,7 @@ struct {
     __uint(max_entries, 10240);
 } active_reads SEC(".maps");
 
-struct socket_key {
+struct l7_request_key {
     __u64 fd;
     __u32 pid;
     __u16 is_tls;
@@ -81,23 +104,31 @@ struct l7_request {
     __u8 partial;
     __u8 request_type;
     __s32 request_id;
+    __u64 payload_size;
     char payload[MAX_PAYLOAD_SIZE];
 };
 
 struct {
      __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
-     __type(key, __u32);
+     __type(key, int);
      __type(value, struct l7_request);
      __uint(max_entries, 1);
 } l7_request_heap SEC(".maps");
 
 struct {
     __uint(type, BPF_MAP_TYPE_LRU_HASH);
-    __uint(key_size, sizeof(struct socket_key));
+    __uint(key_size, sizeof(struct l7_request_key));
     __uint(value_size, sizeof(struct l7_request));
     __uint(max_entries, 32768);
 } active_l7_requests SEC(".maps");
 
+struct {
+     __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
+     __type(key, int);
+     __type(value, char[IOVEC_BUF_SIZE]);
+     __uint(max_entries, 1);
+} iovec_buf_heap SEC(".maps");
+
 struct trace_event_raw_sys_enter_rw__stub {
     __u64 unused;
     long int id;
@@ -117,6 +148,13 @@ struct iovec {
     __u64 size;
 };
 
+struct user_msghdr {
+	void *msg_name;
+	int msg_namelen;
+	struct iovec *msg_iov;
+	__u64 msg_iovlen;
+};
+
 static inline __attribute__((__always_inline__))
 __u64 get_connection_timestamp(__u32 pid, __u64 fd) {
     struct sk_info sk = {};
@@ -130,9 +168,53 @@ __u64 get_connection_timestamp(__u32 pid, __u64 fd) {
 }
 
 static inline __attribute__((__always_inline__))
-int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size) {
+__u64 read_iovec(char *iovec, __u64 iovlen, __u64 ret, char *buf) {
+    struct iovec iov = {};
+    __u64 max = (ret) ? MIN(ret, MAX_PAYLOAD_SIZE) : MAX_PAYLOAD_SIZE;
+    __u64 offset = 0;
+    __u64 size = 0;
+    #pragma unroll
+    for (int i = 0; i < MAX_IOVEC_SIZE; i++) {
+        if (i >= iovlen) {
+            break;
+        }
+        if (bpf_probe_read(&iov, sizeof(iov), (void *)(iovec+i*sizeof(iov)))) {
+            return 0;
+        }
+        if (iov.size <= 0) {
+            continue;
+        }
+        size = MIN(iov.size, max-offset);
+        TRUNCATE_PAYLOAD_SIZE(size);
+        TRUNCATE_PAYLOAD_SIZE(offset);
+        if (bpf_probe_read(buf + offset, size, (void *)iov.buf)) {
+            return 0;
+        }
+        offset += size;
+        if (offset >= max) {
+            break;
+        }
+    }
+    return offset;
+}
+
+static inline __attribute__((__always_inline__))
+int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size, __u64 iovlen) {
     __u64 id = bpf_get_current_pid_tgid();
-    int zero = 0;
+    __u32 zero = 0;
+
+    char* payload = buf;
+    if (iovlen) {
+        payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
+        if (!payload) {
+            return 0;
+        }
+        size = read_iovec(buf, iovlen, 0, payload);
+    }
+    if (!size) {
+        return 0;
+    }
+
     struct l7_request *req = bpf_map_lookup_elem(&l7_request_heap, &zero);
     if (!req) {
         return 0;
@@ -141,15 +223,16 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size)
     req->partial = 0;
     req->request_id = 0;
     req->ns = 0;
-    struct socket_key k = {};
+    req->payload_size = size;
+    struct l7_request_key k = {};
     k.pid = id >> 32;
     k.fd = fd;
     k.is_tls = is_tls;
     k.stream_id = -1;
 
-    if (is_http_request(buf)) {
+    if (is_http_request(payload)) {
         req->protocol = PROTOCOL_HTTP;
-    } else if (is_postgres_query(buf, size, &req->request_type)) {
+    } else if (is_postgres_query(payload, size, &req->request_type)) {
         if (req->request_type == POSTGRES_FRAME_CLOSE) {
             struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
             if (!e) {
@@ -159,18 +242,18 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size)
             e->fd = k.fd;
             e->pid = k.pid;
             e->method = METHOD_STATEMENT_CLOSE;
-            e->status = 200;
             e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-            bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
+            e->payload_size = size;
+            COPY_PAYLOAD(e->payload, size, payload);
             bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
             return 0;
         }
         req->protocol = PROTOCOL_POSTGRES;
-    } else if (is_redis_query(buf)) {
+    } else if (is_redis_query(payload, size)) {
         req->protocol = PROTOCOL_REDIS;
-    } else if (is_memcached_query(buf, size)) {
+    } else if (is_memcached_query(payload, size)) {
         req->protocol = PROTOCOL_MEMCACHED;
-    } else if (is_mysql_query(buf, size, &req->request_type)) {
+    } else if (is_mysql_query(payload, size, &req->request_type)) {
         if (req->request_type == MYSQL_COM_STMT_CLOSE) {
             struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
             if (!e) {
@@ -180,16 +263,16 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size)
             e->fd = k.fd;
             e->pid = k.pid;
             e->method = METHOD_STATEMENT_CLOSE;
-            e->status = 200;
             e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
-            bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, (void *)buf);
+            e->payload_size = size;
+            COPY_PAYLOAD(e->payload, size, payload);
             bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
             return 0;
         }
         req->protocol = PROTOCOL_MYSQL;
-    } else if (is_mongo_query(buf, size)) {
+    } else if (is_mongo_query(payload, size)) {
         req->protocol = PROTOCOL_MONGO;
-    } else if (is_rabbitmq_produce(buf, size)) {
+    } else if (is_rabbitmq_produce(payload, size)) {
         struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
         if (!e) {
             return 0;
@@ -197,12 +280,11 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size)
         e->protocol = PROTOCOL_RABBITMQ;
         e->fd = k.fd;
         e->pid = k.pid;
-        e->status = 200;
         e->method = METHOD_PRODUCE;
         e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
-    } else if (nats_method(buf, size) == METHOD_PRODUCE) {
+    } else if (nats_method(payload, size) == METHOD_PRODUCE) {
         struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
         if (!e) {
             return 0;
@@ -210,45 +292,53 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size)
         e->protocol = PROTOCOL_NATS;
         e->fd = k.fd;
         e->pid = k.pid;
-        e->status = 200;
         e->method = METHOD_PRODUCE;
         e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
-    } else {
-        __s32 request_id = is_kafka_request(buf, size);
-        if  (request_id > 0) {
-            req->request_id = request_id;
-            req->protocol = PROTOCOL_KAFKA;
-            struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
-            if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
-                req->ns = prev_req->ns;
-            }
-        } else {
-            __s16 stream_id = is_cassandra_request(buf, size);
-            if  (stream_id != -1) {
-                k.stream_id = stream_id;
-                req->protocol = PROTOCOL_CASSANDRA;
-            }
+    } else if (is_cassandra_request(payload, size, &k.stream_id)) {
+        req->protocol = PROTOCOL_CASSANDRA;
+    } else if (is_kafka_request(payload, size, &req->request_id)) {
+        req->protocol = PROTOCOL_KAFKA;
+        struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
+        if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
+            req->ns = prev_req->ns;
         }
+    } else if (looks_like_http2_frame(payload, size, METHOD_HTTP2_CLIENT_FRAMES)) {
+        struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
+        if (!e) {
+            return 0;
+        }
+        e->protocol = PROTOCOL_HTTP2;
+        e->fd = k.fd;
+        e->pid = k.pid;
+        e->method = METHOD_HTTP2_CLIENT_FRAMES;
+        e->duration = bpf_ktime_get_ns();
+        e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+        e->payload_size = size;
+        COPY_PAYLOAD(e->payload, size, payload);
+        bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
+        return 0;
     }
+
     if (req->protocol == PROTOCOL_UNKNOWN) {
         return 0;
     }
     if (req->ns == 0) {
         req->ns = bpf_ktime_get_ns();
     }
-    bpf_probe_read(req->payload, MAX_PAYLOAD_SIZE, (void *)buf);
-    bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
+    COPY_PAYLOAD(req->payload, size, payload);
+    bpf_map_update_elem(&active_l7_requests, &k, req, BPF_NOEXIST);
     return 0;
 }
 
 static inline __attribute__((__always_inline__))
-int trace_enter_read(__u64 id, __u64 fd, char *buf, __u64 *ret) {
+int trace_enter_read(__u64 id, __u64 fd, char *buf, __u64 *ret, __u64 iovlen) {
     struct read_args args = {};
     args.fd = fd;
     args.buf = buf;
     args.ret = ret;
+    args.iovlen = iovlen;
     bpf_map_update_elem(&active_reads, &id, &args, BPF_ANY);
     return 0;
 }
@@ -259,7 +349,8 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     if (!args) {
         return 0;
     }
-    struct socket_key k = {};
+
+    struct l7_request_key k = {};
     k.pid = pid;
     k.fd = args->fd;
     k.is_tls = is_tls;
@@ -280,139 +371,168 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     }
 
     int zero = 0;
+    char* payload = args->buf;
+    if (args->iovlen) {
+        payload = bpf_map_lookup_elem(&iovec_buf_heap, &zero);
+        if (!payload) {
+            return 0;
+        }
+        ret = read_iovec(args->buf, args->iovlen, ret, payload);
+        if (!ret) {
+            return 0;
+        }
+    }
+
     struct l7_event *e = bpf_map_lookup_elem(&l7_event_heap, &zero);
     if (!e) {
         return 0;
     }
     e->fd = k.fd;
     e->pid = k.pid;
+    e->protocol = PROTOCOL_UNKNOWN;
     e->connection_timestamp = 0;
-    e->status = 0;
+    e->status = STATUS_UNKNOWN;
     e->method = METHOD_UNKNOWN;
     e->statement_id = 0;
+    e->payload_size = 0;
 
-    if (is_rabbitmq_consume(args->buf, ret)) {
+    if (is_rabbitmq_consume(payload, ret)) {
         e->protocol = PROTOCOL_RABBITMQ;
-        e->status = 200;
         e->method = METHOD_CONSUME;
         e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
     }
-    if (nats_method(args->buf, ret) == METHOD_CONSUME) {
+    if (nats_method(payload, ret) == METHOD_CONSUME) {
         e->protocol = PROTOCOL_NATS;
-        e->status = 200;
         e->method = METHOD_CONSUME;
         e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
         bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
         return 0;
     }
 
-
-    struct cassandra_header cassandra_response = {};
-    cassandra_response.stream_id = -1;
     struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
+    int response = 0;
     if (!req) {
-        if (bpf_probe_read(&cassandra_response, sizeof(cassandra_response), (void *)(args->buf)) < 0) {
+        if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
+            req = bpf_map_lookup_elem(&active_l7_requests, &k);
+            if (!req) {
+                return 0;
+            }
+            response = 1;
+        } else if (looks_like_http2_frame(payload, ret, METHOD_HTTP2_SERVER_FRAMES)) {
+            e->protocol = PROTOCOL_HTTP2;
+            e->method = METHOD_HTTP2_SERVER_FRAMES;
+            e->duration = bpf_ktime_get_ns();
+            e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
+            e->payload_size = ret;
+            COPY_PAYLOAD(e->payload, ret, payload);
+            bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
             return 0;
-        }
-        k.stream_id = cassandra_response.stream_id;
-        req = bpf_map_lookup_elem(&active_l7_requests, &k);
-        if (!req) {
+        } else {
             return 0;
         }
     }
 
-    bpf_probe_read(e->payload, MAX_PAYLOAD_SIZE, req->payload);
-    __s32 request_id = req->request_id;
     e->protocol = req->protocol;
-    __u64 ns = req->ns;
-    __u8 partial = req->partial;
-    __u8 request_type = req->request_type;
+    e->payload_size = req->payload_size;
+    COPY_PAYLOAD(e->payload, req->payload_size, req->payload);
+
     bpf_map_delete_elem(&active_l7_requests, &k);
     if (e->protocol == PROTOCOL_HTTP) {
-        e->status = parse_http_status(args->buf);
+        response = is_http_response(payload, &e->status);
     } else if (e->protocol == PROTOCOL_POSTGRES) {
-        e->status = parse_postgres_status(args->buf, ret);
-        if (request_type == POSTGRES_FRAME_PARSE) {
+        response = is_postgres_response(payload, ret, &e->status);
+        if (req->request_type == POSTGRES_FRAME_PARSE) {
             e->method = METHOD_STATEMENT_PREPARE;
         }
     } else if (e->protocol == PROTOCOL_REDIS) {
-        e->status = parse_redis_status(args->buf, ret);
+        response = is_redis_response(payload, ret, &e->status);
     } else if (e->protocol == PROTOCOL_MEMCACHED) {
-        e->status = parse_memcached_status(args->buf, ret);
+        response = is_memcached_response(payload, ret, &e->status);
     } else if (e->protocol == PROTOCOL_MYSQL) {
-        e->status = parse_mysql_response(args->buf, ret, request_type, &e->statement_id);
-        if (request_type == MYSQL_COM_STMT_PREPARE) {
+        response = is_mysql_response(payload, ret, req->request_type, &e->statement_id, &e->status);
+        if (req->request_type == MYSQL_COM_STMT_PREPARE) {
             e->method = METHOD_STATEMENT_PREPARE;
         }
     } else if (e->protocol == PROTOCOL_MONGO) {
-        e->status = parse_mongo_status(args->buf, ret, partial);
-        if (e->status == 1) {
+        response = is_mongo_response(payload, ret, req->partial);
+        if (response == 2) { // partial
             struct l7_request *r = bpf_map_lookup_elem(&l7_request_heap, &zero);
             if (!r) {
                 return 0;
             }
             r->partial = 1;
             r->protocol = e->protocol;
-            r->ns = ns;
-            bpf_probe_read(r->payload, MAX_PAYLOAD_SIZE, e->payload);
+            r->ns = req->ns;
+            r->payload_size = req->payload_size;
+            COPY_PAYLOAD(r->payload, req->payload_size, req->payload);
             bpf_map_update_elem(&active_l7_requests, &k, r, BPF_ANY);
             return 0;
         }
     } else if (e->protocol == PROTOCOL_KAFKA) {
-        e->status = parse_kafka_status(request_id, args->buf, ret, partial);
-    } else if (e->protocol == PROTOCOL_CASSANDRA) {
-        e->status = cassandra_status(cassandra_response);
+        response = is_kafka_response(payload, req->request_id);
     }
-    if (e->status == 0) {
+
+    if (!response) {
         return 0;
     }
-    e->duration = bpf_ktime_get_ns() - ns;
+    e->duration = bpf_ktime_get_ns() - req->ns;
     e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
     bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
     return 0;
 }
 
+SEC("tracepoint/syscalls/sys_enter_write")
+int sys_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size, 0);
+}
+
 SEC("tracepoint/syscalls/sys_enter_writev")
 int sys_enter_writev(struct trace_event_raw_sys_enter_rw__stub* ctx) {
-    struct iovec iovec0 = {};
-    if (bpf_probe_read(&iovec0, sizeof(struct iovec), (void *)ctx->buf) < 0) {
-        return 0;
-    }
-    return trace_enter_write(ctx, ctx->fd, 0, iovec0.buf, iovec0.size);
+    return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, 0, ctx->size);
 }
 
-SEC("tracepoint/syscalls/sys_enter_write")
-int sys_enter_write(struct trace_event_raw_sys_enter_rw__stub* ctx) {
-    return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size);
+SEC("tracepoint/syscalls/sys_enter_sendmsg")
+int sys_enter_sendmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    struct user_msghdr msghdr = {};
+    if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
+        return 0;
+    }
+    return trace_enter_write(ctx, ctx->fd, 0, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
 }
 
 SEC("tracepoint/syscalls/sys_enter_sendto")
 int sys_enter_sendto(struct trace_event_raw_sys_enter_rw__stub* ctx) {
-    return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size);
+    return trace_enter_write(ctx, ctx->fd, 0, ctx->buf, ctx->size, 0);
 }
 
 SEC("tracepoint/syscalls/sys_enter_read")
 int sys_enter_read(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0);
+    return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
 }
 
 SEC("tracepoint/syscalls/sys_enter_readv")
 int sys_enter_readv(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    struct iovec iovec0 = {};
-    if (bpf_probe_read(&iovec0, sizeof(struct iovec), (void *)ctx->buf) < 0) {
+    return trace_enter_read(id, ctx->fd, ctx->buf, 0, ctx->size);
+}
+
+SEC("tracepoint/syscalls/sys_enter_recvmsg")
+int sys_enter_recvmsg(struct trace_event_raw_sys_enter_rw__stub* ctx) {
+    __u64 id = bpf_get_current_pid_tgid();
+    struct user_msghdr msghdr = {};
+    if (bpf_probe_read(&msghdr, sizeof(msghdr), (void *)ctx->buf)) {
         return 0;
     }
-    return trace_enter_read(id, ctx->fd, iovec0.buf, 0);
+    return trace_enter_read(id, ctx->fd, (char*)msghdr.msg_iov, 0, msghdr.msg_iovlen);
 }
 
 SEC("tracepoint/syscalls/sys_enter_recvfrom")
 int sys_enter_recvfrom(struct trace_event_raw_sys_enter_rw__stub* ctx) {
     __u64 id = bpf_get_current_pid_tgid();
-    return trace_enter_read(id, ctx->fd, ctx->buf, 0);
+    return trace_enter_read(id, ctx->fd, ctx->buf, 0, 0);
 }
 
 SEC("tracepoint/syscalls/sys_exit_read")
@@ -429,6 +549,13 @@ int sys_exit_readv(struct trace_event_raw_sys_exit_rw__stub* ctx) {
     return trace_exit_read(ctx, pid_tgid, pid, 0, ctx->ret);
 }
 
+SEC("tracepoint/syscalls/sys_exit_recvmsg")
+int sys_exit_recvmsg(struct trace_event_raw_sys_exit_rw__stub* ctx) {
+    __u64 pid_tgid = bpf_get_current_pid_tgid();
+    __u32 pid = pid_tgid >> 32;
+    return trace_exit_read(ctx, pid_tgid, pid, 0, ctx->ret);
+}
+
 SEC("tracepoint/syscalls/sys_exit_recvfrom")
 int sys_exit_recvfrom(struct trace_event_raw_sys_exit_rw__stub* ctx) {
     __u64 pid_tgid = bpf_get_current_pid_tgid();

+ 29 - 25
ebpftracer/ebpf/l7/memcached.c

@@ -1,17 +1,14 @@
 // https://github.com/memcached/memcached/blob/master/doc/protocol.txt
 static __always_inline
-int is_memcached_query(char *buf, int buf_size) {
-    if (buf_size < 1) {
+int is_memcached_query(char *buf, __u64 buf_size) {
+    if (buf_size < 9) {
         return 0;
     }
     char b[7];
+    bpf_read(buf, b);
     char end[2];
-    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
-    if (bpf_probe_read(&end, sizeof(end), (void *)((char *)buf+buf_size-2)) < 0) {
-        return 0;
-    }
+    TRUNCATE_PAYLOAD_SIZE(buf_size);
+    bpf_read(buf+buf_size-2, end);
     if (end[0] != '\r' || end[1] != '\n') {
         return 0;
     }
@@ -52,47 +49,54 @@ int is_memcached_query(char *buf, int buf_size) {
 }
 
 static __always_inline
-__u32 parse_memcached_status(char *buf, int buf_size) {
+int is_memcached_response(char *buf, __u64 buf_size, __u32 *status) {
     char r[3];
+    bpf_read(buf, r);
     char end[2];
-    if (bpf_probe_read(&r, sizeof(r), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
-    if (bpf_probe_read(&end, sizeof(end), (void *)((char *)buf+buf_size-2)) < 0) {
-        return 0;
-    }
+    TRUNCATE_PAYLOAD_SIZE(buf_size);
+    bpf_read(buf+buf_size-2, end);
     if (end[0] != '\r' || end[1] != '\n') {
         return 0;
     }
     if (r[0] == 'V' && r[1] == 'A' && r[2] == 'L') { //VALUE
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (r[0] == 'S' && r[1] == 'T' && r[2] == 'O') { //STORED
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (r[0] == 'D' && r[1] == 'E' && r[2] == 'L') { //DELETED
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (r[0] == 'T' && r[1] == 'O' && r[2] == 'C') { //TOUCHED
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (r[0] == 'N' && r[1] == 'O' && r[2] == 'T') { //NOT_STORED || NOT_FOUND
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (r[0] == 'E' && r[1] == 'X' && r[2] == 'I') { //EXISTS
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (r[0] == 'E' && r[1] == 'R' && r[2] == 'R') { //ERROR
-        return 500;
+        *status = STATUS_FAILED;
+        return 1;
     }
     if (r[0] == 'C' && r[1] == 'L' && r[2] == 'I') { //CLIENT_ERROR
-        return 500;
+        *status = STATUS_FAILED;
+        return 1;
     }
     if (r[0] == 'S' && r[1] == 'E' && r[2] == 'R') { //SERVER_ERROR
-        return 500;
+        *status = STATUS_FAILED;
+        return 1;
     }
     if (r[0] >= '0' && r[0] <= '9') { // incr/decr response: <value>\r\n
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     return 0;
 }

+ 9 - 17
ebpftracer/ebpf/l7/mongo.c

@@ -11,14 +11,12 @@ struct mongo_header {
 };
 
 static __always_inline
-int is_mongo_query(char *buf, int buf_size) {
-    if (buf_size < 1) {
-        return 0;
-    }
+int is_mongo_query(char *buf, __u64 buf_size) {
     struct mongo_header h = {};
-    if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
+    if (buf_size < sizeof(h)) {
         return 0;
     }
+    bpf_read(buf, h);
     if (h.response_to == 0 && (h.op_code == MONGO_OP_MSG || h.op_code == MONGO_OP_COMPRESSED)) {
         return 1;
     }
@@ -26,28 +24,22 @@ int is_mongo_query(char *buf, int buf_size) {
 }
 
 static __always_inline
-__u32 parse_mongo_status(char *buf, int buf_size, __u8 partial) {
+int is_mongo_response(char *buf, __u64 buf_size, __u8 partial) {
     if (partial == 0 && buf_size == 4) { //partial read
-        return 1;
+        return 2;
     }
     struct mongo_header h = {};
     if (partial) {
-        if (bpf_probe_read(&h.response_to, sizeof(h.response_to), (void *)((char *)buf+4)) < 0) {
-            return 0;
-        }
-        if (bpf_probe_read(&h.op_code, sizeof(h.op_code), (void *)((char *)buf+8)) < 0) {
-            return 0;
-        }
+        bpf_read(buf+4, h.response_to);
+        bpf_read(buf+8, h.op_code);
     } else {
-        if (bpf_probe_read(&h, sizeof(h), (void *)((char *)buf)) < 0) {
-            return 0;
-        }
+        bpf_read(buf, h);
     }
     if (h.response_to == 0) {
         return 0;
     }
     if (h.op_code == MONGO_OP_MSG || h.op_code == MONGO_OP_COMPRESSED) {
-        return 200;
+        return 1;
     }
     return 0;
 }

+ 12 - 15
ebpftracer/ebpf/l7/mysql.c

@@ -10,14 +10,12 @@
 
 
 static __always_inline
-int is_mysql_query(char *buf, int buf_size, __u8 *request_type) {
-    if (buf_size < 1) {
+int is_mysql_query(char *buf, __u64 buf_size, __u8 *request_type) {
+    if (buf_size < 5) {
         return 0;
     }
     __u8 b[5];
-    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
+    bpf_read(buf, b);
     int length = (int)b[0] | (int)b[1] << 8 | (int)b[2] << 16;
     if (length+4 != buf_size || b[3] != 0) { // sequence must be 0
         return 0;
@@ -37,28 +35,27 @@ int is_mysql_query(char *buf, int buf_size, __u8 *request_type) {
 }
 
 static __always_inline
-__u32 parse_mysql_response(char *buf, int buf_size, __u8 request_type, __u32 *statement_id) {
+int is_mysql_response(char *buf, __u64 buf_size, __u8 request_type, __u32 *statement_id, __u32 *status) {
     __u8 b[5];
-    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
+    bpf_read(buf, b);
     if (b[3] < 1) { // sequence must be > 0
         return 0;
     }
     int length = (int)b[0] | (int)b[1] << 8 | (int)b[2] << 16;
     if (length == 1 || b[4] == MYSQL_RESPONSE_EOF) {
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (b[4] == MYSQL_RESPONSE_OK ) {
         if (request_type == MYSQL_COM_STMT_PREPARE) {
-            if (bpf_probe_read(statement_id, sizeof(*statement_id), (void *)((char *)buf+5)) < 0) {
-                return 0;
-            }
+            bpf_read(buf+5, *statement_id);
         }
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (b[4] == MYSQL_RESPONSE_ERROR) {
-        return 500;
+        *status = STATUS_FAILED;
+        return 1;
     }
     return 0;
 }

+ 3 - 6
ebpftracer/ebpf/l7/nats.c

@@ -6,13 +6,10 @@ int nats_method(char *buf, __u64 buf_size) {
         return 0;
     }
     char b[5];
+    bpf_read(buf, b);
     char end[2];
-    if (bpf_probe_read(&b, sizeof(b), (void *)buf) < 0) {
-        return 0;
-    }
-    if (bpf_probe_read(&end, sizeof(end), (void *)(buf+buf_size-2)) < 0) {
-        return 0;
-    }
+    TRUNCATE_PAYLOAD_SIZE(buf_size);
+    bpf_read(buf+buf_size-2, end);
     if (end[0] != '\r' || end[1] != '\n') {
         return 0;
     }

+ 21 - 21
ebpftracer/ebpf/l7/openssl.c

@@ -61,31 +61,31 @@ struct ssl_st {
     fd;                                                                 \
 })
 
-#define WRITE_ENTER(ctx, bio_t)                              \
-({                                                           \
-    __u32 fd = GET_FD(ctx, bio_t, wbio);                     \
-    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);               \
-    __u64 buf_size = PT_REGS_PARM3(ctx);                     \
-    return trace_enter_write(ctx, fd, 1, buf_ptr, buf_size); \
+#define WRITE_ENTER(ctx, bio_t)                                 \
+({                                                              \
+    __u32 fd = GET_FD(ctx, bio_t, wbio);                        \
+    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);                  \
+    __u64 buf_size = PT_REGS_PARM3(ctx);                        \
+    return trace_enter_write(ctx, fd, 1, buf_ptr, buf_size, 0); \
 })
 
-#define READ_ENTER(ctx, bio_t)                   \
-({                                               \
-    __u32 fd = GET_FD(ctx, bio_t, rbio);         \
-    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);   \
-    __u64 pid_tgid = bpf_get_current_pid_tgid(); \
-    __u64 id = pid_tgid | IS_TLS_READ_ID;        \
-    return trace_enter_read(id, fd, buf_ptr, 0); \
+#define READ_ENTER(ctx, bio_t)                      \
+({                                                  \
+    __u32 fd = GET_FD(ctx, bio_t, rbio);            \
+    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);      \
+    __u64 pid_tgid = bpf_get_current_pid_tgid();    \
+    __u64 id = pid_tgid | IS_TLS_READ_ID;           \
+    return trace_enter_read(id, fd, buf_ptr, 0, 0); \
 })
 
-#define READ_EX_ENTER(ctx, bio_t)                      \
-({                                                     \
-    __u32 fd = GET_FD(ctx, bio_t, rbio);               \
-    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);         \
-    __u64 pid_tgid = bpf_get_current_pid_tgid();       \
-    __u64 id = pid_tgid | IS_TLS_READ_ID;              \
-    __u64* ret_ptr = (__u64*)PT_REGS_PARM4(ctx);       \
-    return trace_enter_read(id, fd, buf_ptr, ret_ptr); \
+#define READ_EX_ENTER(ctx, bio_t)                           \
+({                                                          \
+    __u32 fd = GET_FD(ctx, bio_t, rbio);                    \
+    char* buf_ptr = (char*)PT_REGS_PARM2(ctx);              \
+    __u64 pid_tgid = bpf_get_current_pid_tgid();            \
+    __u64 id = pid_tgid | IS_TLS_READ_ID;                   \
+    __u64* ret_ptr = (__u64*)PT_REGS_PARM4(ctx);            \
+    return trace_enter_read(id, fd, buf_ptr, ret_ptr, 0);   \
 })
 
 SEC("uprobe/openssl_SSL_write_enter")

+ 15 - 26
ebpftracer/ebpf/l7/postgres.c

@@ -7,18 +7,14 @@
 #define POSTGRES_FRAME_CLOSE 'C'
 
 static __always_inline
-int is_postgres_query(char *buf, int buf_size, __u8 *request_type) {
-    if (buf_size < 1) {
-        return 0;
-    }
+int is_postgres_query(char *buf, __u64 buf_size, __u8 *request_type) {
     char f_cmd;
     int f_length;
-    if (bpf_probe_read(&f_cmd, sizeof(f_cmd), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
-    if (bpf_probe_read(&f_length, sizeof(f_length), (void *)((char *)buf+1)) < 0) {
+    if (buf_size < sizeof(f_cmd)+sizeof(f_length)) {
         return 0;
     }
+    bpf_read(buf, f_cmd);
+    bpf_read(buf+1, f_length);
     f_length = bpf_htonl(f_length);
 
     *request_type = f_cmd;
@@ -26,9 +22,8 @@ int is_postgres_query(char *buf, int buf_size, __u8 *request_type) {
         return 1;
     }
     char sync[5];
-    if (bpf_probe_read(&sync, sizeof(sync), (void *)((char *)buf+buf_size-5)) < 0) {
-        return 0;
-    }
+    TRUNCATE_PAYLOAD_SIZE(buf_size);
+    bpf_read(buf+buf_size-5, sync);
     if (sync[0] == 'S' && sync[1] == 0 && sync[2] == 0 && sync[3] == 0 && sync[4] == 4) {
         return 1;
     }
@@ -36,33 +31,27 @@ int is_postgres_query(char *buf, int buf_size, __u8 *request_type) {
 }
 
 static __always_inline
-__u32 parse_postgres_status(char *buf, int buf_size) {
+int is_postgres_response(char *buf, __u64 buf_size, __u32 *status) {
     char cmd;
     int length;
-    if (bpf_probe_read(&cmd, sizeof(cmd), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
-    if (bpf_probe_read(&length, sizeof(length), (void *)((char *)buf+1)) < 0) {
-        return 0;
-    }
+    bpf_read(buf, cmd);
+    bpf_read(buf+1, length);
     length = bpf_htonl(length);
 
     if (length+1 > buf_size) {
         return 0;
     }
     if ((cmd == '1' || cmd == '2') && length == 4 && buf_size >= 10) {
-        if (bpf_probe_read(&cmd, sizeof(cmd), (void *)((char *)buf+5)) < 0) {
-            return 0;
-        }
-        if (bpf_probe_read(&length, sizeof(length), (void *)((char *)buf+5+1)) < 0) {
-            return 0;
-        }
+        bpf_read(buf+5, cmd);
+        bpf_read(buf+5+1, length);
     }
     if (cmd == 'E') {
-        return 500;
+        *status = STATUS_FAILED;
+        return 1;
     }
     if (cmd == 't' || cmd == 'T' || cmd == 'D' || cmd == 'C') {
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     return 0;
 }

+ 1 - 0
ebpftracer/ebpf/l7/rabbitmq.c

@@ -26,6 +26,7 @@ int rabbitmq_method_is(char *buf, __u64 buf_size, __u16 expected_method) {
         return 0;
     }
     __u8 end = 0;
+    TRUNCATE_PAYLOAD_SIZE(size);
     bpf_read(buf+7+size, end);
     if (end != RABBITMQ_FRAME_END) {
         return 0;

+ 12 - 12
ebpftracer/ebpf/l7/redis.c

@@ -2,11 +2,12 @@
 // https://redis.io/docs/reference/protocol-spec/
 
 static __always_inline
-int is_redis_query(char *buf) {
-    char b[5];
-    if (bpf_probe_read(&b, sizeof(b), (void *)((char *)buf)) < 0) {
+int is_redis_query(char *buf, __u64 buf_size) {
+    if (buf_size < 5) {
         return 0;
     }
+    char b[5];
+    bpf_read(buf, b);
     if (b[0] != '*' || b[1] < '0' || b[1] > '9') {
         return 0;
     }
@@ -22,23 +23,22 @@ int is_redis_query(char *buf) {
 }
 
 static __always_inline
-__u32 parse_redis_status(char *buf, int buf_size) {
+int is_redis_response(char *buf, __u64 buf_size, __u32 *status) {
     char type;
+    bpf_read(buf, type);
     char end[2];
-    if (bpf_probe_read(&type, sizeof(type), (void *)((char *)buf)) < 0) {
-        return 0;
-    }
-    if (bpf_probe_read(&end, sizeof(end), (void *)((char *)buf+buf_size-2)) < 0) {
-        return 0;
-    }
+    TRUNCATE_PAYLOAD_SIZE(buf_size);
+    bpf_read(buf+buf_size-2, end);
     if (end[0] != '\r' || end[1] != '\n') {
         return 0;
     }
     if (type == '*' || type == ':' || type == '$' || type == '+') {
-        return 200;
+        *status = STATUS_OK;
+        return 1;
     }
     if (type == '-') {
-        return 500;
+        *status = STATUS_FAILED;
+        return 1;
     }
     return 0;
 }

+ 20 - 0
ebpftracer/l7/http.go

@@ -0,0 +1,20 @@
+package l7
+
+import (
+	"bytes"
+)
+
+func ParseHttp(payload []byte) (string, string) {
+	method, rest, ok := bytes.Cut(payload, space)
+	if !ok {
+		return "", ""
+	}
+	if !isHttpMethod(string(method)) {
+		return "", ""
+	}
+	uri, _, ok := bytes.Cut(rest, space)
+	if !ok {
+		uri = append(uri, []byte("...")...)
+	}
+	return string(method), string(uri)
+}

+ 176 - 0
ebpftracer/l7/http2.go

@@ -0,0 +1,176 @@
+package l7
+
+import (
+	"encoding/binary"
+	"golang.org/x/net/http2"
+	"golang.org/x/net/http2/hpack"
+	"net/http"
+	"strconv"
+	"strings"
+	"time"
+)
+
+const (
+	http2FrameHeaderLength = 9
+	http2DecoderGcInterval = uint64(10 * time.Minute)
+)
+
+type Http2FrameHeader struct {
+	Type     http2.FrameType
+	Flags    http2.Flags
+	Length   int
+	StreamId uint32
+}
+
+type Http2Request struct {
+	Method   string
+	Path     string
+	Scheme   string
+	Status   Status
+	Duration time.Duration
+
+	kernelTime uint64
+}
+
+type Http2Parser struct {
+	clientDecoder  *hpack.Decoder
+	serverDecoder  *hpack.Decoder
+	activeRequests map[uint32]*Http2Request
+	lastGcTime     uint64
+}
+
+func NewHttp2Parser() *Http2Parser {
+	return &Http2Parser{
+		clientDecoder:  hpack.NewDecoder(4096, nil),
+		serverDecoder:  hpack.NewDecoder(4096, nil),
+		activeRequests: map[uint32]*Http2Request{},
+	}
+}
+
+func (p *Http2Parser) Parse(method Method, payload []byte, kernelTime uint64) []Http2Request {
+	if method == MethodHttp2ClientFrames {
+		l := len(http2.ClientPreface)
+		if len(payload) >= l && string(payload[:l]) == http2.ClientPreface {
+			payload = payload[l:]
+		}
+	}
+	if len(payload) == 0 {
+		return nil
+	}
+
+	var decoder *hpack.Decoder
+	statuses := map[uint32]Status{}
+	offset := 0
+	for {
+		if len(payload)-offset < http2FrameHeaderLength {
+			break
+		}
+		h := Http2FrameHeader{
+			Length:   int(binary.BigEndian.Uint32(payload[offset:]) >> 8),
+			Type:     http2.FrameType(payload[offset+3]),
+			Flags:    http2.Flags(payload[offset+4]),
+			StreamId: binary.BigEndian.Uint32(payload[offset+5:]) & (1<<31 - 1),
+		}
+		offset += http2FrameHeaderLength
+		if h.Type != http2.FrameHeaders {
+			if len(payload)-offset < h.Length {
+				break
+			}
+			offset += h.Length
+			continue
+		}
+		switch method {
+		case MethodHttp2ClientFrames:
+			req := p.activeRequests[h.StreamId]
+			if req == nil {
+				req = &Http2Request{kernelTime: kernelTime}
+				p.activeRequests[h.StreamId] = req
+			}
+			decoder = p.clientDecoder
+			decoder.SetEmitFunc(func(hf hpack.HeaderField) {
+				switch hf.Name {
+				case ":method":
+					if req.Method == "" && isHttpMethod(hf.Value) {
+						req.Method = hf.Value
+					}
+				case ":path":
+					if req.Path == "" && isHttpPath(hf.Value) {
+						req.Path = hf.Value
+					}
+				case ":scheme":
+					if req.Scheme == "" && isHttpScheme(hf.Value) {
+						req.Scheme = hf.Value
+					}
+				}
+			})
+		case MethodHttp2ServerFrames:
+			if _, ok := statuses[h.StreamId]; !ok {
+				statuses[h.StreamId] = 0
+			}
+			decoder = p.serverDecoder
+			decoder.SetEmitFunc(func(hf hpack.HeaderField) {
+				if hf.Name == ":status" {
+					s, _ := strconv.Atoi(hf.Value)
+					statuses[h.StreamId] = Status(s)
+				}
+			})
+		}
+		next := offset + h.Length
+		if next > len(payload) {
+			next = len(payload)
+		}
+		if _, err := decoder.Write(payload[offset:next]); err != nil {
+			continue
+		}
+		offset = next
+	}
+	var res []Http2Request
+	for streamId, status := range statuses {
+		r := p.activeRequests[streamId]
+		if r == nil {
+			continue
+		}
+		r.Status = status
+		r.Duration = time.Duration(kernelTime - r.kernelTime)
+		res = append(res, *r)
+		delete(p.activeRequests, streamId)
+	}
+
+	// GC
+	if kernelTime-p.lastGcTime > http2DecoderGcInterval {
+		if p.lastGcTime > 0 {
+			for streamId, r := range p.activeRequests {
+				if kernelTime-r.kernelTime > http2DecoderGcInterval {
+					delete(p.activeRequests, streamId)
+				}
+			}
+		}
+		p.lastGcTime = kernelTime
+	}
+
+	return res
+}
+
+func isHttpMethod(s string) bool {
+	switch s {
+	case http.MethodGet,
+		http.MethodHead,
+		http.MethodPost,
+		http.MethodPut,
+		http.MethodPatch,
+		http.MethodDelete,
+		http.MethodConnect,
+		http.MethodOptions,
+		http.MethodTrace:
+		return true
+	}
+	return false
+}
+
+func isHttpPath(s string) bool {
+	return strings.HasPrefix(s, "/") || s == "*"
+}
+
+func isHttpScheme(s string) bool {
+	return s == "http" || s == "https"
+}

+ 119 - 0
ebpftracer/l7/l7.go

@@ -0,0 +1,119 @@
+package l7
+
+import (
+	"strconv"
+	"time"
+)
+
+type Protocol uint8
+
+const (
+	ProtocolHTTP      Protocol = 1
+	ProtocolPostgres  Protocol = 2
+	ProtocolRedis     Protocol = 3
+	ProtocolMemcached Protocol = 4
+	ProtocolMysql     Protocol = 5
+	ProtocolMongo     Protocol = 6
+	ProtocolKafka     Protocol = 7
+	ProtocolCassandra Protocol = 8
+	ProtocolRabbitmq  Protocol = 9
+	ProtocolNats      Protocol = 10
+	ProtocolHTTP2     Protocol = 11
+)
+
+func (p Protocol) String() string {
+	switch p {
+	case ProtocolHTTP:
+		return "HTTP"
+	case ProtocolPostgres:
+		return "Postgres"
+	case ProtocolRedis:
+		return "Redis"
+	case ProtocolMemcached:
+		return "Memcached"
+	case ProtocolMysql:
+		return "Mysql"
+	case ProtocolMongo:
+		return "Mongo"
+	case ProtocolKafka:
+		return "Kafka"
+	case ProtocolCassandra:
+		return "Cassandra"
+	case ProtocolRabbitmq:
+		return "Rabbitmq"
+	case ProtocolNats:
+		return "NATS"
+	case ProtocolHTTP2:
+		return "HTTP2"
+	}
+	return "UNKNOWN:" + strconv.Itoa(int(p))
+}
+
+type Method uint8
+
+const (
+	MethodUnknown           Method = 0
+	MethodProduce           Method = 1
+	MethodConsume           Method = 2
+	MethodStatementPrepare  Method = 3
+	MethodStatementClose    Method = 4
+	MethodHttp2ClientFrames Method = 5
+	MethodHttp2ServerFrames Method = 6
+)
+
+func (m Method) String() string {
+	switch m {
+	case MethodUnknown:
+		return "unknown"
+	case MethodProduce:
+		return "produce"
+	case MethodConsume:
+		return "consume"
+	case MethodStatementPrepare:
+		return "statement_prepare"
+	case MethodStatementClose:
+		return "statement_close"
+	case MethodHttp2ClientFrames:
+		return "http2_client_frames"
+	case MethodHttp2ServerFrames:
+		return "http2_server_frames"
+	}
+	return "UNKNOWN:" + strconv.Itoa(int(m))
+}
+
+type Status int
+
+const (
+	StatusUnknown Status = 0
+	StatusOk      Status = 200
+	StatusFailed  Status = 500
+)
+
+func (s Status) String() string {
+	switch s {
+	case StatusUnknown:
+		return "unknown"
+	case StatusOk:
+		return "ok"
+	case StatusFailed:
+		return "failed"
+	}
+	return strconv.Itoa(int(s))
+}
+
+func (s Status) Http() string {
+	return strconv.Itoa(int(s))
+}
+
+func (s Status) Error() bool {
+	return s == StatusFailed
+}
+
+type RequestData struct {
+	Protocol    Protocol
+	Status      Status
+	Duration    time.Duration
+	Method      Method
+	StatementId uint32
+	Payload     []byte
+}

+ 13 - 13
tracing/tracing_test.go → ebpftracer/l7/l7_test.go

@@ -1,4 +1,4 @@
-package tracing
+package l7
 
 import (
 	"bytes"
@@ -8,28 +8,28 @@ import (
 	"testing"
 )
 
-func Test_parseHttp(t *testing.T) {
-	m, p := parseHttp([]byte(`HEAD /1 HTTP/1.1\nHost: 127.0.0.1\nUser-Agent: curl/8.0.1\nAccept: */*\n\nxzxxxxxxzx`))
+func TestParseHttp(t *testing.T) {
+	m, p := ParseHttp([]byte(`HEAD /1 HTTP/1.1\nHost: 127.0.0.1\nUser-Agent: curl/8.0.1\nAccept: */*\n\nxzxxxxxxzx`))
 	assert.Equal(t, "HEAD", m)
 	assert.Equal(t, "/1", p)
 
-	m, p = parseHttp([]byte(`GET /too-long-uri`))
+	m, p = ParseHttp([]byte(`GET /too-long-uri`))
 	assert.Equal(t, "GET", m)
 	assert.Equal(t, "/too-long-uri...", p)
 }
 
 func Test_parseMemcached(t *testing.T) {
-	cmd, items := parseMemcached(append([]byte(`incr 1111 2222`), '\r', '\n'))
+	cmd, items := ParseMemcached(append([]byte(`incr 1111 2222`), '\r', '\n'))
 	assert.Equal(t, "incr", cmd)
 	assert.Equal(t, []string{"1111"}, items)
 
-	cmd, items = parseMemcached(append([]byte(`gets 1111 2222 3333`), '\r', '\n'))
+	cmd, items = ParseMemcached(append([]byte(`gets 1111 2222 3333`), '\r', '\n'))
 	assert.Equal(t, "gets", cmd)
 	assert.Equal(t, []string{"1111", "2222", "3333"}, items)
 }
 
-func Test_parseRedis(t *testing.T) {
-	cmd, args := parseRedis([]byte{
+func TestParseRedis(t *testing.T) {
+	cmd, args := ParseRedis([]byte{
 		'*', '3', '\r', '\n',
 		'$', '4', '\r', '\n',
 		'L', 'L', 'E', 'N', '\r', '\n',
@@ -41,7 +41,7 @@ func Test_parseRedis(t *testing.T) {
 	assert.Equal(t, "LLEN", cmd)
 	assert.Equal(t, "mylist ...", args)
 
-	cmd, args = parseRedis([]byte{
+	cmd, args = ParseRedis([]byte{
 		'*', '2', '\r', '\n',
 		'$', '8', '\r', '\n',
 		'S', 'M', 'E', 'M', 'B', 'E', 'R', 'S', '\r', '\n',
@@ -62,7 +62,7 @@ type mongoHeader struct {
 	SectionKind   uint8
 }
 
-func Test_parseMongo(t *testing.T) {
+func TestParseMongo(t *testing.T) {
 	buf := bytes.NewBuffer(nil)
 	v := bson.M{"a": "bssssssssssssssssssssssssssssssssssssssssss"}
 	data, err := bson.Marshal(v)
@@ -78,11 +78,11 @@ func Test_parseMongo(t *testing.T) {
 
 	payload := buf.Bytes()
 
-	assert.Equal(t, `{"a": "bssssssssssssssssssssssssssssssssssssssssss"}`, parseMongo(payload))
-	assert.Equal(t, `<truncated>`, parseMongo(payload[:20]))
+	assert.Equal(t, `{"a": "bssssssssssssssssssssssssssssssssssssssssss"}`, ParseMongo(payload))
+	assert.Equal(t, `<truncated>`, ParseMongo(payload[:20]))
 
 	dataSize := binary.LittleEndian.Uint32(data)
 
 	binary.LittleEndian.PutUint32(payload[mongoHeaderLength+mongoSectionKindLength:], dataSize+1)
-	assert.Equal(t, `<truncated>`, parseMongo(payload))
+	assert.Equal(t, `<truncated>`, ParseMongo(payload))
 }

+ 39 - 0
ebpftracer/l7/memcached.go

@@ -0,0 +1,39 @@
+package l7
+
+import (
+	"bytes"
+	"strings"
+)
+
+var (
+	space = []byte{' '}
+	crlf  = []byte{'\r', '\n'}
+)
+
+func ParseMemcached(payload []byte) (string, []string) {
+	cmd, rest, ok := bytes.Cut(payload, space)
+	if !ok {
+		return "", nil
+	}
+	command := string(cmd)
+	switch command {
+	case "set", "add", "cas", "append", "prepend", "replace", "delete", "incr", "decr", "touch":
+		if key, _, ok := bytes.Cut(rest, space); ok {
+			return command, []string{string(key)}
+		}
+	case "gat", "gats":
+		_, rest, ok = bytes.Cut(rest, space)
+		if ok {
+			keys, _, ok := bytes.Cut(rest, crlf)
+			if ok {
+				return command, strings.Split(string(keys), " ")
+			}
+		}
+	case "get", "gets":
+		keys, _, ok := bytes.Cut(rest, crlf)
+		if ok {
+			return command, strings.Split(string(keys), " ")
+		}
+	}
+	return "", nil
+}

+ 2 - 23
tracing/mongo.go → ebpftracer/l7/mongo.go

@@ -1,34 +1,13 @@
-package tracing
+package l7
 
 import (
 	"encoding/binary"
-	"github.com/coroot/coroot-node-agent/ebpftracer"
 	"go.mongodb.org/mongo-driver/bson"
-	"go.opentelemetry.io/otel/attribute"
-	"go.opentelemetry.io/otel/codes"
-	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
-	"go.opentelemetry.io/otel/trace"
-	"time"
 )
 
 const (
 	MongoOpMSG = 2013
-)
 
-func handleMongoQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue) {
-	query := parseMongo(r.Payload[:])
-	if query == "" {
-		return
-	}
-	_, span := tracer.Start(nil, "query", trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	span.SetAttributes(append(attrs, semconv.DBSystemMongoDB, semconv.DBStatement(query))...)
-	if r.Status == 500 {
-		span.SetStatus(codes.Error, "")
-	}
-	span.End(trace.WithTimestamp(end))
-}
-
-const (
 	mongoHeaderLength      = 20
 	mongoOpCodeOffset      = 12
 	mongoSectionKindLength = 1
@@ -36,7 +15,7 @@ const (
 	mongoSectionKindBody   = 0
 )
 
-func parseMongo(payload []byte) (res string) {
+func ParseMongo(payload []byte) (res string) {
 	res = "<truncated>"
 	if len(payload) < mongoHeaderLength+mongoSectionKindLength+mongoSectionSizeLength {
 		return

+ 14 - 25
tracing/mysql.go → ebpftracer/l7/mysql.go

@@ -1,15 +1,9 @@
-package tracing
+package l7
 
 import (
 	"encoding/binary"
 	"fmt"
-	"github.com/coroot/coroot-node-agent/ebpftracer"
-	"go.opentelemetry.io/otel/attribute"
-	"go.opentelemetry.io/otel/codes"
-	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
-	"go.opentelemetry.io/otel/trace"
 	"strconv"
-	"time"
 )
 
 const (
@@ -17,24 +11,19 @@ const (
 	MysqlComStmtPrepare = 0x16
 	MysqlComStmtExecute = 0x17
 	MysqlComStmtClose   = 0x19
-	mysqlMsgHeaderSize  = 4
+
+	mysqlMsgHeaderSize = 4
 )
 
-func handleMysqlQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue, preparedStatements map[string]string) {
-	query := parseMysql(r.Payload[:], r.StatementId, preparedStatements)
-	if query == "" {
-		return
-	}
+type MysqlParser struct {
+	preparedStatements map[string]string
+}
 
-	_, span := tracer.Start(nil, "query", trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	span.SetAttributes(append(attrs, semconv.DBSystemMySQL, semconv.DBStatement(query))...)
-	if r.Status == 500 {
-		span.SetStatus(codes.Error, "")
-	}
-	span.End(trace.WithTimestamp(end))
+func NewMysqlParser() *MysqlParser {
+	return &MysqlParser{preparedStatements: map[string]string{}}
 }
 
-func parseMysql(payload []byte, statementId uint32, preparedStatements map[string]string) string {
+func (p *MysqlParser) Parse(payload []byte, statementId uint32) string {
 	payloadSize := len(payload)
 	if payloadSize < mysqlMsgHeaderSize+5 {
 		return ""
@@ -44,8 +33,8 @@ func parseMysql(payload []byte, statementId uint32, preparedStatements map[strin
 	readQuery := func() (query string) {
 		to := mysqlMsgHeaderSize + msgSize
 		partial := false
-		if to > payloadSize-1 {
-			to = payloadSize - 1
+		if to > payloadSize {
+			to = payloadSize
 			partial = true
 		}
 		query = string(payload[mysqlMsgHeaderSize+1 : to])
@@ -63,7 +52,7 @@ func parseMysql(payload []byte, statementId uint32, preparedStatements map[strin
 		return readQuery()
 	case MysqlComStmtExecute:
 		statementIdStr := readStatementId()
-		statement, ok := preparedStatements[statementIdStr]
+		statement, ok := p.preparedStatements[statementIdStr]
 		if !ok {
 			statement = fmt.Sprintf(`EXECUTE %s /* unknown */`, statementIdStr)
 		}
@@ -71,11 +60,11 @@ func parseMysql(payload []byte, statementId uint32, preparedStatements map[strin
 	case MysqlComStmtPrepare:
 		query := readQuery()
 		statementIdStr := strconv.FormatUint(uint64(statementId), 10)
-		preparedStatements[statementIdStr] = query
+		p.preparedStatements[statementIdStr] = query
 		return fmt.Sprintf("PREPARE %s FROM %s", statementIdStr, query)
 	case MysqlComStmtClose:
 		statementIdStr := readStatementId()
-		delete(preparedStatements, statementIdStr)
+		delete(p.preparedStatements, statementIdStr)
 	}
 	return ""
 }

+ 11 - 22
tracing/postgres.go → ebpftracer/l7/postgres.go

@@ -1,14 +1,8 @@
-package tracing
+package l7
 
 import (
 	"bytes"
 	"fmt"
-	"github.com/coroot/coroot-node-agent/ebpftracer"
-	"go.opentelemetry.io/otel/attribute"
-	"go.opentelemetry.io/otel/codes"
-	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
-	"go.opentelemetry.io/otel/trace"
-	"time"
 )
 
 const (
@@ -18,20 +12,15 @@ const (
 	PostgresFrameClose byte = 'C'
 )
 
-func handlePostgresQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue, preparedStatements map[string]string) {
-	query := parsePostgres(r.Payload[:], preparedStatements)
-	if query == "" {
-		return
-	}
-	_, span := tracer.Start(nil, "query", trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	span.SetAttributes(append(attrs, semconv.DBSystemPostgreSQL, semconv.DBStatement(query))...)
-	if r.Status == 500 {
-		span.SetStatus(codes.Error, "")
-	}
-	span.End(trace.WithTimestamp(end))
+type PostgresParser struct {
+	preparedStatements map[string]string
+}
+
+func NewPostgresParser() *PostgresParser {
+	return &PostgresParser{preparedStatements: map[string]string{}}
 }
 
-func parsePostgres(payload []byte, preparedStatements map[string]string) string {
+func (p *PostgresParser) Parse(payload []byte) string {
 	l := len(payload)
 	if l < 5 {
 		return ""
@@ -56,7 +45,7 @@ func parsePostgres(payload []byte, preparedStatements map[string]string) string
 			return ""
 		}
 		preparedStatementNameStr := string(preparedStatementName)
-		statement, ok := preparedStatements[preparedStatementNameStr]
+		statement, ok := p.preparedStatements[preparedStatementNameStr]
 		if !ok {
 			statement = fmt.Sprintf(`EXECUTE %s /* unknown */`, preparedStatementNameStr)
 		}
@@ -74,7 +63,7 @@ func parsePostgres(payload []byte, preparedStatements map[string]string) string
 			query = string(q) + "..."
 		}
 		preparedStatementNameStr := string(preparedStatementName)
-		preparedStatements[preparedStatementNameStr] = query
+		p.preparedStatements[preparedStatementNameStr] = query
 		return fmt.Sprintf("PREPARE %s AS %s", preparedStatementNameStr, query)
 	case PostgresFrameClose:
 		if l < 7 {
@@ -87,7 +76,7 @@ func parsePostgres(payload []byte, preparedStatements map[string]string) string
 		if !ok {
 			return ""
 		}
-		delete(preparedStatements, string(preparedStatementName))
+		delete(p.preparedStatements, string(preparedStatementName))
 	}
 	return ""
 }

+ 41 - 0
ebpftracer/l7/redis.go

@@ -0,0 +1,41 @@
+package l7
+
+import (
+	"bytes"
+	"strconv"
+)
+
+func ParseRedis(payload []byte) (cmd string, args string) {
+	var v, rest []byte
+	var ok bool
+	v, rest, ok = bytes.Cut(payload, crlf)
+	if !ok || !bytes.HasPrefix(v, []byte("*")) {
+		return
+	}
+	arrayLen, err := strconv.ParseUint(string(v[1:]), 10, 32)
+	if err != nil {
+		return
+	}
+	readString := func() string {
+		v, rest, ok = bytes.Cut(rest, crlf)
+		if !ok || !bytes.HasPrefix(v, []byte("$")) {
+			return ""
+		}
+		v, rest, ok = bytes.Cut(rest, crlf)
+		if ok {
+			return string(v)
+		}
+		return ""
+	}
+	cmd = readString()
+	if cmd == "" {
+		return
+	}
+	if arrayLen > 1 {
+		args = readString()
+		if arrayLen > 2 {
+			args += " ..."
+		}
+	}
+	return
+}

+ 29 - 99
ebpftracer/tracer.go

@@ -9,6 +9,7 @@ import (
 	"github.com/cilium/ebpf/link"
 	"github.com/cilium/ebpf/perf"
 	"github.com/coroot/coroot-node-agent/common"
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"github.com/coroot/coroot-node-agent/proc"
 	"golang.org/x/mod/semver"
 	"golang.org/x/sys/unix"
@@ -21,7 +22,7 @@ import (
 	"time"
 )
 
-const PayloadSize = 512
+const MaxPayloadSize = 1024
 
 type EventType uint32
 type EventReason uint32
@@ -42,91 +43,6 @@ const (
 	EventReasonOOMKill EventReason = 1
 )
 
-type L7Protocol uint8
-
-const (
-	L7ProtocolHTTP      L7Protocol = 1
-	L7ProtocolPostgres  L7Protocol = 2
-	L7ProtocolRedis     L7Protocol = 3
-	L7ProtocolMemcached L7Protocol = 4
-	L7ProtocolMysql     L7Protocol = 5
-	L7ProtocolMongo     L7Protocol = 6
-	L7ProtocolKafka     L7Protocol = 7
-	L7ProtocolCassandra L7Protocol = 8
-	L7ProtocolRabbitmq  L7Protocol = 9
-	L7ProtocolNats      L7Protocol = 10
-)
-
-func (p L7Protocol) String() string {
-	switch p {
-	case L7ProtocolHTTP:
-		return "HTTP"
-	case L7ProtocolPostgres:
-		return "Postgres"
-	case L7ProtocolRedis:
-		return "Redis"
-	case L7ProtocolMemcached:
-		return "Memcached"
-	case L7ProtocolMysql:
-		return "Mysql"
-	case L7ProtocolMongo:
-		return "Mongo"
-	case L7ProtocolKafka:
-		return "Kafka"
-	case L7ProtocolCassandra:
-		return "Cassandra"
-	case L7ProtocolRabbitmq:
-		return "Rabbitmq"
-	case L7ProtocolNats:
-		return "NATS"
-	}
-	return "UNKNOWN:" + strconv.Itoa(int(p))
-}
-
-type L7Method uint8
-
-const (
-	L7MethodUnknown          L7Method = 0
-	L7MethodProduce          L7Method = 1
-	L7MethodConsume          L7Method = 2
-	L7MethodStatementPrepare L7Method = 3
-	L7MethodStatementClose   L7Method = 4
-)
-
-func (m L7Method) String() string {
-	switch m {
-	case L7MethodUnknown:
-		return "unknown"
-	case L7MethodProduce:
-		return "produce"
-	case L7MethodConsume:
-		return "consume"
-	}
-	return "UNKNOWN:" + strconv.Itoa(int(m))
-}
-
-type L7Request struct {
-	Protocol    L7Protocol
-	Status      int
-	Duration    time.Duration
-	Method      L7Method
-	StatementId uint32
-	Payload     [PayloadSize]byte
-}
-
-func (r *L7Request) StatusString() string {
-	switch r.Protocol {
-	case L7ProtocolHTTP:
-		return strconv.Itoa(r.Status)
-	case L7ProtocolMongo, L7ProtocolKafka, L7ProtocolRabbitmq, L7ProtocolNats:
-		return "unknown"
-	}
-	if r.Status == 500 {
-		return "failed"
-	}
-	return "ok"
-}
-
 type Event struct {
 	Type      EventType
 	Reason    EventReason
@@ -135,7 +51,7 @@ type Event struct {
 	DstAddr   netaddr.IPPort
 	Fd        uint64
 	Timestamp uint64
-	L7Request *L7Request
+	L7Request *l7.RequestData
 }
 
 type Tracer struct {
@@ -254,8 +170,14 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		return fmt.Errorf("failed to load collection spec: %w", err)
 	}
 	_ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
-	c, err := ebpf.NewCollection(collectionSpec)
+	c, err := ebpf.NewCollectionWithOptions(collectionSpec, ebpf.CollectionOptions{
+		//Programs: ebpf.ProgramOptions{LogLevel: 2, LogSize: 20 * 1024 * 1024},
+	})
 	if err != nil {
+		var verr *ebpf.VerifierError
+		if errors.As(err, &verr) {
+			klog.Errorf("%+v", verr)
+		}
 		return fmt.Errorf("failed to load collection: %w", err)
 	}
 	t.collection = c
@@ -269,7 +191,7 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 	}
 
 	if !t.disableL7Tracing {
-		perfMaps = append(perfMaps, perfMap{name: "l7_events", event: &l7Event{}, perCPUBufferSizePages: 16})
+		perfMaps = append(perfMaps, perfMap{name: "l7_events", event: &l7Event{}, perCPUBufferSizePages: 32})
 	}
 
 	for _, pm := range perfMaps {
@@ -286,11 +208,11 @@ func (t *Tracer) ebpf(ch chan<- Event) error {
 		program := t.collection.Programs[programSpec.Name]
 		if t.disableL7Tracing {
 			switch programSpec.Name {
-			case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto":
+			case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
 				continue
-			case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom":
+			case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
 				continue
-			case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom":
+			case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
 				continue
 			}
 		}
@@ -408,18 +330,26 @@ type l7Event struct {
 	Method              uint8
 	Padding             uint16
 	StatementId         uint32
-	Payload             [PayloadSize]byte
+	PayloadSize         uint64
+	Payload             [MaxPayloadSize]byte
 }
 
 func (e l7Event) Event() Event {
-	return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: &L7Request{
-		Protocol:    L7Protocol(e.Protocol),
-		Status:      int(e.Status),
+	r := &l7.RequestData{
+		Protocol:    l7.Protocol(e.Protocol),
+		Status:      l7.Status(e.Status),
 		Duration:    time.Duration(e.Duration),
-		Method:      L7Method(e.Method),
+		Method:      l7.Method(e.Method),
 		StatementId: e.StatementId,
-		Payload:     e.Payload,
-	}}
+	}
+	switch {
+	case e.PayloadSize == 0:
+	case e.PayloadSize > MaxPayloadSize:
+		r.Payload = e.Payload[:MaxPayloadSize]
+	default:
+		r.Payload = e.Payload[:e.PayloadSize]
+	}
+	return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: r}
 }
 
 func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {

+ 3 - 3
go.mod

@@ -27,8 +27,8 @@ require (
 	go.opentelemetry.io/otel/trace v1.14.0
 	golang.org/x/arch v0.4.0
 	golang.org/x/mod v0.11.0
-	golang.org/x/net v0.7.0
-	golang.org/x/sys v0.10.0
+	golang.org/x/net v0.14.0
+	golang.org/x/sys v0.11.0
 	golang.org/x/time v0.3.0
 	gopkg.in/alecthomas/kingpin.v2 v2.2.6
 	gopkg.in/yaml.v2 v2.4.0
@@ -129,7 +129,7 @@ require (
 	go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
 	golang.org/x/exp v0.0.0-20230801115018-d63ba01acd4b // indirect
 	golang.org/x/sync v0.1.0 // indirect
-	golang.org/x/text v0.7.0 // indirect
+	golang.org/x/text v0.12.0 // indirect
 	google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
 	google.golang.org/grpc v1.53.0 // indirect
 	google.golang.org/protobuf v1.28.1 // indirect

+ 7 - 7
go.sum

@@ -753,8 +753,8 @@ golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b/go.mod h1:9nx3DQGgdP8bBQD5qx
 golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
-golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
+golang.org/x/net v0.14.0 h1:BONx9s002vGdD9umnlX1Po8vOZmrgH34qlHcD1MfK14=
+golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -862,11 +862,11 @@ golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
-golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
+golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
+golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
-golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
+golang.org/x/term v0.11.0 h1:F9tnn/DA/Im8nCwm+fX+1/eBwi4qFjRT++MhtVC4ZX0=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
 golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -876,8 +876,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
 golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
-golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
-golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
+golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
+golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
 golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
 golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=

+ 0 - 45
tracing/http.go

@@ -1,45 +0,0 @@
-package tracing
-
-import (
-	"bytes"
-	"fmt"
-	"github.com/coroot/coroot-node-agent/ebpftracer"
-	"go.opentelemetry.io/otel/attribute"
-	"go.opentelemetry.io/otel/codes"
-	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
-	"go.opentelemetry.io/otel/trace"
-	"inet.af/netaddr"
-	"time"
-)
-
-func handleHttpRequest(start, end time.Time, r *ebpftracer.L7Request, dest netaddr.IPPort, attrs []attribute.KeyValue) {
-	method, path := parseHttp(r.Payload[:])
-	if method == "" {
-		return
-	}
-	_, span := tracer.Start(nil, method, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	span.SetAttributes(append(
-		attrs,
-		semconv.HTTPURL(fmt.Sprintf("http://%s%s", dest.String(), path)),
-		semconv.HTTPMethod(method),
-		semconv.HTTPSchemeHTTP,
-		semconv.HTTPStatusCode(r.Status),
-	)...)
-	if r.Status >= 400 {
-		span.SetStatus(codes.Error, "")
-	}
-	span.End(trace.WithTimestamp(end))
-}
-
-func parseHttp(payload []byte) (string, string) {
-	// the HTTP method is being validated in the eBPF code, confirming that the request is an HTTP request
-	method, rest, ok := bytes.Cut(payload, space)
-	if !ok {
-		return "", ""
-	}
-	uri, _, ok := bytes.Cut(rest, space)
-	if !ok {
-		uri = append(uri, []byte("...")...)
-	}
-	return string(method), string(uri)
-}

+ 0 - 62
tracing/memcached.go

@@ -1,62 +0,0 @@
-package tracing
-
-import (
-	"bytes"
-	"github.com/coroot/coroot-node-agent/ebpftracer"
-	"go.opentelemetry.io/otel/attribute"
-	"go.opentelemetry.io/otel/codes"
-	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
-	"go.opentelemetry.io/otel/trace"
-	"strings"
-	"time"
-)
-
-const (
-	MemcacheDBItemKeyName attribute.Key = "db.memcached.item"
-)
-
-func handleMemcachedQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue) {
-	cmd, items := parseMemcached(r.Payload[:])
-	if cmd == "" {
-		return
-	}
-	_, span := tracer.Start(nil, cmd, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	if len(items) == 1 {
-		attrs = append(attrs, MemcacheDBItemKeyName.String(items[0]))
-	} else if len(items) > 1 {
-		attrs = append(attrs, MemcacheDBItemKeyName.StringSlice(items))
-	}
-	span.SetAttributes(append(attrs, semconv.DBSystemMemcached, semconv.DBOperation(cmd))...)
-	if r.Status == 500 {
-		span.SetStatus(codes.Error, "")
-	}
-	span.End(trace.WithTimestamp(end))
-}
-
-func parseMemcached(payload []byte) (string, []string) {
-	cmd, rest, ok := bytes.Cut(payload, space)
-	if !ok {
-		return "", nil
-	}
-	command := string(cmd)
-	switch command {
-	case "set", "add", "cas", "append", "prepend", "replace", "delete", "incr", "decr", "touch":
-		if key, _, ok := bytes.Cut(rest, space); ok {
-			return command, []string{string(key)}
-		}
-	case "gat", "gats":
-		_, rest, ok = bytes.Cut(rest, space)
-		if ok {
-			keys, _, ok := bytes.Cut(rest, crlf)
-			if ok {
-				return command, strings.Split(string(keys), " ")
-			}
-		}
-	case "get", "gets":
-		keys, _, ok := bytes.Cut(rest, crlf)
-		if ok {
-			return command, strings.Split(string(keys), " ")
-		}
-	}
-	return "", nil
-}

+ 0 - 64
tracing/redis.go

@@ -1,64 +0,0 @@
-package tracing
-
-import (
-	"bytes"
-	"github.com/coroot/coroot-node-agent/ebpftracer"
-	"go.opentelemetry.io/otel/attribute"
-	"go.opentelemetry.io/otel/codes"
-	semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
-	"go.opentelemetry.io/otel/trace"
-	"strconv"
-	"time"
-)
-
-func handleRedisQuery(start, end time.Time, r *ebpftracer.L7Request, attrs []attribute.KeyValue) {
-	cmd, args := parseRedis(r.Payload[:])
-	if cmd == "" {
-		return
-	}
-	_, span := tracer.Start(nil, cmd, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
-	statement := cmd
-	if args != "" {
-		statement += " " + args
-	}
-	span.SetAttributes(append(attrs, semconv.DBSystemRedis, semconv.DBOperation(cmd), semconv.DBStatement(statement))...)
-	if r.Status == 500 {
-		span.SetStatus(codes.Error, "")
-	}
-	span.End(trace.WithTimestamp(end))
-}
-
-func parseRedis(payload []byte) (cmd string, args string) {
-	var v, rest []byte
-	var ok bool
-	v, rest, ok = bytes.Cut(payload, crlf)
-	if !ok || !bytes.HasPrefix(v, []byte("*")) {
-		return
-	}
-	arrayLen, err := strconv.ParseUint(string(v[1:]), 10, 32)
-	if err != nil {
-		return
-	}
-	readString := func() string {
-		v, rest, ok = bytes.Cut(rest, crlf)
-		if !ok || !bytes.HasPrefix(v, []byte("$")) {
-			return ""
-		}
-		v, rest, ok = bytes.Cut(rest, crlf)
-		if ok {
-			return string(v)
-		}
-		return ""
-	}
-	cmd = readString()
-	if cmd == "" {
-		return
-	}
-	if arrayLen > 1 {
-		args = readString()
-		if arrayLen > 2 {
-			args += " ..."
-		}
-	}
-	return
-}

+ 119 - 24
tracing/tracing.go

@@ -2,9 +2,11 @@ package tracing
 
 import (
 	"context"
-	"github.com/coroot/coroot-node-agent/ebpftracer"
+	"fmt"
+	"github.com/coroot/coroot-node-agent/ebpftracer/l7"
 	"go.opentelemetry.io/otel"
 	"go.opentelemetry.io/otel/attribute"
+	"go.opentelemetry.io/otel/codes"
 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
 	"go.opentelemetry.io/otel/sdk/resource"
@@ -17,10 +19,12 @@ import (
 	"time"
 )
 
+const (
+	MemcacheDBItemKeyName attribute.Key = "db.memcached.item"
+)
+
 var (
 	tracer trace.Tracer
-	space  = []byte{' '}
-	crlf   = []byte{'\r', '\n'}
 )
 
 func Init(machineId, hostname, version string) {
@@ -49,32 +53,123 @@ func Init(machineId, hostname, version string) {
 	tracer = tracerProvider.Tracer("coroot-node-agent", trace.WithInstrumentationVersion(version))
 }
 
-func HandleL7Request(containerId string, dest netaddr.IPPort, r *ebpftracer.L7Request, preparedStatements map[string]string) {
+type Trace struct {
+	containerId string
+	destination netaddr.IPPort
+	commonAttrs []attribute.KeyValue
+}
+
+func NewTrace(containerId string, destination netaddr.IPPort) *Trace {
 	if tracer == nil {
-		return
+		return nil
 	}
+	return &Trace{containerId: containerId, destination: destination, commonAttrs: []attribute.KeyValue{
+		semconv.ContainerID(containerId),
+		semconv.NetPeerName(destination.IP().String()),
+		semconv.NetPeerPort(int(destination.Port())),
+	}}
+}
+
+func (t *Trace) createSpan(name string, duration time.Duration, error bool, attrs ...attribute.KeyValue) {
 	end := time.Now()
-	start := end.Add(-r.Duration)
+	start := end.Add(-duration)
+	_, span := tracer.Start(nil, name, trace.WithTimestamp(start), trace.WithSpanKind(trace.SpanKindClient))
+	span.SetAttributes(attrs...)
+	span.SetAttributes(t.commonAttrs...)
+	if error {
+		span.SetStatus(codes.Error, "")
+	}
+	span.End(trace.WithTimestamp(end))
+}
+
+func (t *Trace) HttpRequest(method, path string, status l7.Status, duration time.Duration) {
+	if t == nil || method == "" {
+		return
+	}
+	t.createSpan(method, duration, status >= 400,
+		semconv.HTTPURL(fmt.Sprintf("http://%s%s", t.destination.String(), path)),
+		semconv.HTTPMethod(method),
+		semconv.HTTPStatusCode(int(status)),
+	)
+}
+
+func (t *Trace) Http2Request(method, path, scheme string, status l7.Status, duration time.Duration) {
+	if t == nil {
+		return
+	}
+	if method == "" {
+		method = "unknown"
+	}
+	if path == "" {
+		path = "/unknown"
+	}
+	if scheme == "" {
+		scheme = "unknown"
+	}
+	t.createSpan(method, duration, status > 400,
+		semconv.HTTPURL(fmt.Sprintf("%s://%s%s", scheme, t.destination.String(), path)),
+		semconv.HTTPMethod(method),
+		semconv.HTTPStatusCode(int(status)),
+	)
+}
 
+func (t *Trace) PostgresQuery(query string, error bool, duration time.Duration) {
+	if t == nil || query == "" {
+		return
+	}
+	t.createSpan("query", duration, error,
+		semconv.DBSystemPostgreSQL,
+		semconv.DBStatement(query),
+	)
+}
+
+func (t *Trace) MysqlQuery(query string, error bool, duration time.Duration) {
+	if t == nil || query == "" {
+		return
+	}
+	t.createSpan("query", duration, error,
+		semconv.DBSystemMySQL,
+		semconv.DBStatement(query),
+	)
+}
+
+func (t *Trace) MongoQuery(query string, error bool, duration time.Duration) {
+	if t == nil || query == "" {
+		return
+	}
+	t.createSpan("query", duration, error,
+		semconv.DBSystemMongoDB,
+		semconv.DBStatement(query),
+	)
+}
+
+func (t *Trace) MemcachedQuery(cmd string, items []string, error bool, duration time.Duration) {
+	if t == nil || cmd == "" {
+		return
+	}
 	attrs := []attribute.KeyValue{
-		semconv.ContainerID(containerId),
-		semconv.NetPeerName(dest.IP().String()),
-		semconv.NetPeerPort(int(dest.Port())),
-	}
-	switch r.Protocol {
-	case ebpftracer.L7ProtocolHTTP:
-		handleHttpRequest(start, end, r, dest, attrs)
-	case ebpftracer.L7ProtocolMemcached:
-		handleMemcachedQuery(start, end, r, attrs)
-	case ebpftracer.L7ProtocolRedis:
-		handleRedisQuery(start, end, r, attrs)
-	case ebpftracer.L7ProtocolPostgres:
-		handlePostgresQuery(start, end, r, attrs, preparedStatements)
-	case ebpftracer.L7ProtocolMysql:
-		handleMysqlQuery(start, end, r, attrs, preparedStatements)
-	case ebpftracer.L7ProtocolMongo:
-		handleMongoQuery(start, end, r, attrs)
-	default:
+		semconv.DBSystemMemcached,
+		semconv.DBOperation(cmd),
+	}
+	if len(items) == 1 {
+		attrs = append(attrs, MemcacheDBItemKeyName.String(items[0]))
+	} else if len(items) > 1 {
+		attrs = append(attrs, MemcacheDBItemKeyName.StringSlice(items))
+	}
+	t.createSpan(cmd, duration, error, attrs...)
+}
+
+func (t *Trace) RedisQuery(cmd, args string, error bool, duration time.Duration) {
+	if t == nil || cmd == "" {
 		return
 	}
+	statement := cmd
+	if args != "" {
+		statement += " " + args
+	}
+	t.createSpan(cmd, duration, error,
+		semconv.DBSystemRedis,
+		semconv.DBOperation(cmd),
+		semconv.DBStatement(statement),
+	)
 }

Some files were not shown because too many files changed in this diff