Kaynağa Gözat

Feature #TASK_QT-22513 euspace适配kafka代码暂存

rock 11 ay önce
ebeveyn
işleme
bc0d176e1f

+ 1 - 0
containers/container_apm.go

@@ -271,6 +271,7 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	case l7.ProtocolKafka:
 		klog.Info("enter case l7.ProtocolKafka")
 		stats.observe(r.Status.String(), "", r.Duration)
+		klog.Infof("enter case l7.ProtocolKafka. r.traceID is %d", r.TraceId)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
 			// 获取了kafka的请求类型和topic
 			kafkaInfo, err := l7.ParseKafkaProtocol(r.Payload)

+ 40 - 69
ebpftracer/ebpf/l7/kafka.c

@@ -7,16 +7,7 @@ struct kafka_request_header {
     __s32 correlation_id;
 };
 
-// struct kafka_request_data {
-//     __s32 length;
-//     __s16 api_key;
-//     __s16 api_version;
-//     __s32 correlation_id;
-//     __s16 clientID_length;
-// };
-
 struct kafka_response_header {
-    __s32 length;
     __s32 correlation_id;
 };
 
@@ -30,87 +21,67 @@ int is_kafka_request(char *buf, __u64 buf_size, __s32 *request_id) {
 
     // h.length = bpf_htonl(h.length);
     h.length = bpf_ntohl(h.length);
-    if (h.length+4 != buf_size) {
+    if (h.length <= 0 || h.length + 4 != buf_size) {
         return 0;
     }
     // h.api_key = bpf_htons(h.api_key);
     h.api_key = bpf_ntohs(h.api_key);
-//    h.api_version = bpf_htons(h.api_version);
+   h.api_version = bpf_ntohs(h.api_version);
     // h.correlation_id = bpf_htonl(h.correlation_id);
     h.correlation_id = bpf_ntohl(h.correlation_id);
     // cw_bpf_debug("[Request][KAFKA] correlation_id  start is %d -------->", h.correlation_id);
-    if (h.correlation_id > 0 && (h.api_key >= 0 && h.api_key <= 67)) {
+    if (h.correlation_id > 0 && 
+        h.api_key >= 0 && h.api_key <= 67 && 
+        h.api_version >= 0 && h.api_version <= 12 &&
+        h.length >= 8 && h.length <= 104857600) {
     // if (h.correlation_id > 0 && (h.api_key == 0 || h.api_key == 67)) {
+        // bpf_printk("[Request][KAFKA] start h.length is %d",h.length);
         *request_id = h.correlation_id;
-        return 1;
+        if(h.api_key == 0)
+        {
+            __s16 clientID_length;
+            bpf_read(buf + sizeof(h), clientID_length);
+            clientID_length = bpf_ntohs(clientID_length);
+            bpf_printk("[Request][KAFKA] start clientid length is %d",clientID_length);
+            return 1;
+
+        }
+        return 0;
     }
     return 0;
 }
 
-// static __always_inline
-// __s16 kafka_request_topic_length_data(char *buf, __u64 buf_size, __s16 *method_type ,__u64 *before_topic_length) {
-//     struct kafka_request_data h = {};
-//     if (buf_size < sizeof(h)) {
-//         return 0;
-//     }
-//     bpf_read(buf, h);
-//     h.api_key = bpf_htons(h.api_key);
-//     if(h.api_key == 0)
-//     {
-//         *method_type = 1;
-//     }
-//     else if(h.api_key == 1)
-//     {
-//         *method_type = 2;
-//     }
-//     h.clientID_length = bpf_htons(h.clientID_length);
-//     if (h.clientID_length > 0 ) {
-//         //buf + 4 + 2 + 2 + 4 + 2 + h.clientID_length = begin of topic name 
-//         __s16 topic_length;
-//         bpf_probe_read(topic_length,2, buf + 4 + 2 + 2 + 4 + 2 + h.clientID_length);
-//         topic_length = bpf_htons(topic_length);
-//         if(topic_length > 0)
-//         { 
-//             *before_topic_length = 4 + 2 + 2 + 4 + 2 + h.clientID_length;
-//             return topic_length;
-//         }
-//         return 0;
-//     }
-//     return 0;
-// }
-
 static __always_inline
-int is_kafka_response(char *buf, __s32 request_id, __s32 *real_request_id) {
+int is_kafka_response(char *buf,__u64 buf_size, __s32 request_id, __s32 *real_request_id) {
     struct kafka_response_header h = {};
+    bpf_printk("[Request][KAFKA] end bufsize is %d", buf_size);
+    // if(buf_size > 4 && buf)
+    // {
+    //     bpf_printk("[Request][KAFKA] end buf0:[0x%x]",buf[0]);
+    //     bpf_printk("[Request][KAFKA] end buf1:[0x%x]",buf[1]);
+    //     bpf_printk("[Request][KAFKA] end buf2:[0x%x]",buf[2]);
+    //     bpf_printk("[Request][KAFKA] end buf3:[0x%x]",buf[3]);
+    // }
+    if (buf_size < sizeof(h)) {
+        return 0;
+    }
     bpf_read(buf, h);
-    h.correlation_id = bpf_htonl(h.correlation_id);
-    // h.correlation_id = bpf_ntohl(h.correlation_id);
+
+    // 正确解析 correlation_id (使用 ntohl 而不是 htonl)
+    h.correlation_id = bpf_ntohl(h.correlation_id);
     *real_request_id = h.correlation_id;
-    // cw_bpf_debug("[Request][KAFKA] correlation_id  end is %d -------->", h.correlation_id);
-    bpf_printk("[Request][KAFKA] correlation_id  end is %d -------->", h.correlation_id);
+
+    // 基本验证
+    if (h.correlation_id <= 0) {
+        // bpf_printk("[Request][KAFKA] end h.correlation_id <= 0");
+        return 0;
+    }
+    // bpf_printk("[Request][KAFKA] end h.length is %d, bufsize is %d", h.length, buf_size);
+    // 检查是否匹配请求的 correlation_id
     if (h.correlation_id == request_id) {
         return 1;
     }
     return 0;
 }
 
-// static __always_inline
-// int is_kafka_cw_response(char *buf, __s32 request_id) {
-//     struct kafka_response_header h = {};
-//     bpf_read(buf, h);
-//     h.api_key = bpf_htons(h.api_key);
-//     if (bpf_htonl(h.correlation_id) == request_id) {
-//         if(h.api_key == 0)
-//         {
-//             return 1;
-//         }
-//         else if(h.api_key == 1)
-//         {
-//             return 2;
-//         }
-//         return 0;
-//     }
-//     return 0;
-// }
-
 

+ 49 - 8
ebpftracer/ebpf/l7/l7.c

@@ -207,6 +207,13 @@ struct {
     __uint(max_entries, 32768);
 } active_l7_requests_mysql_resp_header_ctx SEC(".maps");
 
+struct {
+    __uint(type, BPF_MAP_TYPE_LRU_HASH);
+    __uint(key_size, sizeof(struct l7_request_key));
+    __uint(value_size, 4);
+    __uint(max_entries, 32768);
+} active_l7_requests_kafka_resp_header_ctx SEC(".maps");
+
 struct {
      __uint(type, BPF_MAP_TYPE_PERCPU_ARRAY);
      __type(key, int);
@@ -558,8 +565,9 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
 
     if (is_http_request(payload)) {
 	    cw_bpf_debug("");
-	    cw_bpf_debug("-----[Kernel HTTP Enter]:pid:[%d]|CURRENT-GOID:[%llu]|FD:[%d]", tid, get_current_goroutine(), k.fd);
-	    __u8 type =  0;
+	    // cw_bpf_debug("-----[Kernel HTTP Enter]:pid:[%d]|CURRENT-GOID:[%llu]|FD:[%d]", tid, get_current_goroutine(), k.fd);
+	    bpf_printk("-----[Kernel HTTP Enter]:pid:[%d]|CURRENT-GOID:[%llu]|FD:[%d]", tid, get_current_goroutine(), k.fd);
+        __u8 type =  0;
 	    __u64 trace_id = 0;
 	    struct apm_trace_key_t trace_key = get_apm_trace_key(120 * NS_PER_SEC, true);
 	    struct apm_trace_info_t * trace_info = get_apm_trace_info_by_trace_key(trace_key);
@@ -688,13 +696,13 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
     } else if (is_kafka_request(payload, size, &req->request_id)) {
         req->protocol = PROTOCOL_KAFKA;
         struct l7_request *prev_req = bpf_map_lookup_elem(&active_l7_requests, &k);
+        bpf_printk("-----[kafka HTTP Enter]:pid:[%d]|CURRENT-GOID:[%llu]|FD:[%d]", tid, get_current_goroutine(), k.fd);
         if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
             req->ns = prev_req->ns;
         }
         cw_bpf_debug("[Request][KAFKA] start -------->");
-        bpf_printk("[Request][KAFKA] start -------->");
         cw_bpf_debug("[Request][KAFKA] correlation_id  start is %d -------->", req->request_id);
-        bpf_printk("[Request][KAFKA] correlation_id  start is %d -------->", req->request_id);
+        bpf_printk("[Request][KAFKA] correlation_id  start is %d, k.pid is %d, k.fd is %d -------->", req->request_id, k.pid, k.fd);
         // __u64 before_topic_length;
         // __s16 s16_method_type; 
         // __s16 topic_length = kafka_request_topic_length_data(payload, size, &s16_method_type, &before_topic_length);
@@ -1242,14 +1250,43 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
 			return 0;
 		}
 	} else if (e->protocol == PROTOCOL_KAFKA) {
+        if(ret == 4) {
+              //sava header to ctx and return
+             char resp_packet_header[4];
+             bpf_probe_read(resp_packet_header,4, payload);
+             bpf_map_update_elem(&active_l7_requests_kafka_resp_header_ctx, &k, resp_packet_header, BPF_ANY);
+             bpf_map_update_elem(&active_l7_requests, &k, req, BPF_ANY);
+             return 0;
+        }
         e->component_sport = conn->sport;
         e->component_dport = conn->dport;
         __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
         __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
-        __s32 real_request_id;
-        response = is_kafka_response(payload, req->request_id, &real_request_id);
-        cw_bpf_debug("[Request][KAFKA] end -------->");
-        bpf_printk("[Request][KAFKA] end -------->");
+        char* resp_packet_header = bpf_map_lookup_elem(&active_l7_requests_kafka_resp_header_ctx, &k);
+        if(resp_packet_header) {
+            __s32 kafka_response_length;
+            bpf_read(resp_packet_header, kafka_response_length);
+            kafka_response_length = bpf_ntohl(kafka_response_length);
+            if (kafka_response_length == ret)
+            {
+                __s32 real_request_id = 0;
+                response = is_kafka_response(payload, ret, req->request_id, &real_request_id);
+                // cw_bpf_debug("[Request][KAFKA] end -------->");
+                // bpf_printk("[Request][KAFKA] end k.pid is %d k.fd is %d ",k.pid, k.fd);
+                // bpf_printk("[Request][KAFKA] end req->request_id is %d, real_request_id is %d>", req->request_id, real_request_id);
+                cw_bpf_debug("[Request][KAFKA] end -------->");
+                bpf_printk("[Request][KAFKA] end k.pid is %d k.fd is %d ",k.pid, k.fd);
+                bpf_printk("[Request][KAFKA] end sport is %d, dport is %d>", e->component_sport, e->component_dport);
+                bpf_printk("[Request][KAFKA] end req->request_id is %d, real_request_id is %d>", req->request_id, real_request_id);
+            }
+            if(response) {
+                 bpf_map_delete_elem(&active_l7_requests_kafka_resp_header_ctx, &k);
+            }
+        } 
+        // else {
+        //     __s32 real_request_id = 0;
+        //     response = is_kafka_response(payload, ret, req->request_id, &real_request_id);
+        // }
         // cw_bpf_debug("[Request][KAFKA] correlation_id  end is %d, real is %d -------->", req->request_id, real_request_id);
         
         // if(response)
@@ -1268,6 +1305,10 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
     }
 	if (e->trace_id == 0){
 		e->trace_id = get_apm_trace_id(pid,tid);
+        if (e->protocol == PROTOCOL_KAFKA)
+        {
+            bpf_printk("[Request][KAFKA] end e->trace_id is %d ",e->trace_id);
+        }
 	}
 	e->end_at = bpf_ktime_get_ns();
 	e->start_at = req->ns;

+ 5 - 0
ebpftracer/l7/kafka.go

@@ -2,6 +2,7 @@ package l7
 
 import (
 	"encoding/binary"
+	klog "github.com/sirupsen/logrus"
 )
 
 // KafkaInfo contains parsed Kafka request information
@@ -18,6 +19,7 @@ const (
 
 // ParseKafkaProtocol analyzes Kafka protocol to determine request type and topic
 func ParseKafkaProtocol(payload []byte) (*KafkaInfo, error) {
+	klog.Info("enter case ParseKafkaProtocol")
 	// Kafka request should be at least 8 bytes
 	// 4 bytes for length + 2 bytes for API key + 2 bytes for API version
 	if len(payload) < 8 {
@@ -35,6 +37,7 @@ func ParseKafkaProtocol(payload []byte) (*KafkaInfo, error) {
 	if len(payload) < currentOffset+2 {
 		return nil, nil
 	}
+	klog.Info("enter case ParseKafkaProtocol")
 	clientIDLen := binary.BigEndian.Uint16(payload[currentOffset:currentOffset+2])
 	currentOffset += 2 + int(clientIDLen)
 	
@@ -73,8 +76,10 @@ func ParseKafkaProtocol(payload []byte) (*KafkaInfo, error) {
 		switch apiKey {
 		case ProduceRequest:
 			info.RequestType = "kafka_produce"
+			klog.Info("enter case ParseKafkaProtocol kafka_produce")
 		case FetchRequest:
 			info.RequestType = "kafka_consume"
+			klog.Info("enter case ParseKafkaProtocol kafka_consume")
 		default:
 			return nil, nil
 		}

+ 2 - 2
ebpftracer/tracer.go

@@ -777,8 +777,8 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
 				req.SAddr = ipPort(v.SAddr, v.Sport)
 				req.DAddr = ipPort(v.DAddr, v.Dport)
-				klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
-				klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
+				// klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
+				// klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
 			}
 			switch {
 			case v.PayloadSize == 0: