Sfoglia il codice sorgente

Feature #TASK_QT-22513 euspace适配kafka

rock 1 anno fa
parent
commit
82448d0b12

+ 2 - 2
Makefile2

@@ -17,12 +17,12 @@ build:
 	CGO_ENABLED=1 go build -gcflags="all=-N -l" -buildvcs=false -o euspace
 c:
 	#docker exec -it 9d928d96d4d0 sh -c 'cd /opt/github/euspace/ebpftracer && sh build.sh${PARAMS}'
-	docker exec -it 432002584cbf sh -c 'cd /opt/github/euspace/ebpftracer && make all ${PARAMS}'
+	docker exec -it 889965bb1d4a sh -c 'cd /opt/github/euspace/ebpftracer && make all ${PARAMS}'
 c-build: c
 
 go-build:
 	#ssh [email protected] 'export https_proxy=http://10.0.22.50:4780 && source ~/.g/env && cd /opt/github/euspace && make -f Makefile2 build'
-	docker exec -it 432002584cbf bash -c 'cd /opt/github/euspace && source ~/.g/env && make -f Makefile2 build'
+	docker exec -it 889965bb1d4a bash -c 'cd /opt/github/euspace && source ~/.g/env && make -f Makefile2 build'
 go: go-build
 
 run:

+ 1 - 16
containers/container.go

@@ -979,22 +979,7 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
 		stats.observe(r.Status.String(), "", r.Duration)
 		query := l7.ParseMongo(r.Payload)
 		trace.MongoQuery(query, r.Status.Error(), r.Duration)
-	case l7.ProtocolKafka:
-		stats.observe(r.Status.String(), "", r.Duration)
-		// 获取了kafka的请求类型和topic
-		kafkaInfo, err := l7.ParseKafkaProtocol(r.Payload)
-		if err != nil {
-			return err
-		}
-		if kafkaInfo != nil {
-			if kafkaInfo.RequestType == "kafka_produce" {
-				trace.KafkaProduceRequest(kafkaInfo.Topic, r.Status.Error(), r.Duration)
-			} 
-			// else if kafkaInfo.RequestType == "kafka_consume" {
-			// 	trace.KafkaConsumeRequest(kafkaInfo.Topic, r.Status.Error(), r.Duration)
-			// }
-		}
-	case l7.ProtocolCassandra:
+	case l7.ProtocolKafka,l7.ProtocolCassandra:
 		stats.observe(r.Status.String(), "", r.Duration)
 	case l7.ProtocolRabbitmq, l7.ProtocolNats:
 		stats.observe(r.Status.String(), r.Method.String(), 0)

+ 4 - 2
containers/container_apm.go

@@ -269,19 +269,21 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 		//query := l7.ParseMongo(r.Payload)
 		//trace.MongoQuery(query, r.Status.Error(), r.Duration)
 	case l7.ProtocolKafka:
+		klog.Info("enter case l7.ProtocolKafka")
 		stats.observe(r.Status.String(), "", r.Duration)
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
 			// 获取了kafka的请求类型和topic
 			kafkaInfo, err := l7.ParseKafkaProtocol(r.Payload)
 			if err != nil {
-				return err
+				return nil
 			}
 			if kafkaInfo != nil {
 				apmTrace, err := c.getOrInitTrace(r.TraceId)
 				if err == nil {
 					if kafkaInfo.RequestType == "kafka_produce" {
+						klog.Info("enter the kafkaInfo.RequestType = kafka_produce")
 						// apmTrace.RedisTraceQueryEvent(cmd, args, r, conn.ActualDest)
-						apmTrace.KafkaProduceRequestEvent(kafkaInfo.Topic,conn.ActualDest)
+						apmTrace.KafkaProduceRequestEvent(kafkaInfo.Topic,r,conn.ActualDest)
 					} 
 					// else if kafkaInfo.RequestType == "kafka_consume" {
 					// 	trace.KafkaConsumeRequest(kafkaInfo.Topic, r.Status.Error(), r.Duration)

+ 15 - 5
ebpftracer/ebpf/l7/kafka.c

@@ -28,14 +28,19 @@ int is_kafka_request(char *buf, __u64 buf_size, __s32 *request_id) {
     }
     bpf_read(buf, h);
 
-    h.length = bpf_htonl(h.length);
+    // h.length = bpf_htonl(h.length);
+    h.length = bpf_ntohl(h.length);
     if (h.length+4 != buf_size) {
         return 0;
     }
-    h.api_key = bpf_htons(h.api_key);
+    // h.api_key = bpf_htons(h.api_key);
+    h.api_key = bpf_ntohs(h.api_key);
 //    h.api_version = bpf_htons(h.api_version);
-    h.correlation_id = bpf_htonl(h.correlation_id);
+    // h.correlation_id = bpf_htonl(h.correlation_id);
+    h.correlation_id = bpf_ntohl(h.correlation_id);
+    // cw_bpf_debug("[Request][KAFKA] correlation_id  start is %d -------->", h.correlation_id);
     if (h.correlation_id > 0 && (h.api_key >= 0 && h.api_key <= 67)) {
+    // if (h.correlation_id > 0 && (h.api_key == 0 || h.api_key == 67)) {
         *request_id = h.correlation_id;
         return 1;
     }
@@ -75,10 +80,15 @@ int is_kafka_request(char *buf, __u64 buf_size, __s32 *request_id) {
 // }
 
 static __always_inline
-int is_kafka_response(char *buf, __s32 request_id) {
+int is_kafka_response(char *buf, __s32 request_id, __s32 *real_request_id) {
     struct kafka_response_header h = {};
     bpf_read(buf, h);
-    if (bpf_htonl(h.correlation_id) == request_id) {
+    h.correlation_id = bpf_htonl(h.correlation_id);
+    // h.correlation_id = bpf_ntohl(h.correlation_id);
+    *real_request_id = h.correlation_id;
+    // cw_bpf_debug("[Request][KAFKA] correlation_id  end is %d -------->", h.correlation_id);
+    bpf_printk("[Request][KAFKA] correlation_id  end is %d -------->", h.correlation_id);
+    if (h.correlation_id == request_id) {
         return 1;
     }
     return 0;

+ 14 - 1
ebpftracer/ebpf/l7/l7.c

@@ -281,6 +281,10 @@ void send_event(void *ctx, struct l7_event *e, struct connection_id cid, struct
     e->connection_timestamp = conn->timestamp;
     e->fd = cid.fd;
     e->pid = cid.pid;
+    if (e->protocol == PROTOCOL_KAFKA)
+    {
+        cw_bpf_debug("[Request][KAFKA] begin send data -------->");
+    }
     long error = bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
 	if (error ==0){
 	        cw_add_event_count(e->trace_id);
@@ -687,6 +691,10 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
         if (prev_req && prev_req->protocol == PROTOCOL_KAFKA) {
             req->ns = prev_req->ns;
         }
+        cw_bpf_debug("[Request][KAFKA] start -------->");
+        bpf_printk("[Request][KAFKA] start -------->");
+        cw_bpf_debug("[Request][KAFKA] correlation_id  start is %d -------->", req->request_id);
+        bpf_printk("[Request][KAFKA] correlation_id  start is %d -------->", req->request_id);
         // __u64 before_topic_length;
         // __s16 s16_method_type; 
         // __s16 topic_length = kafka_request_topic_length_data(payload, size, &s16_method_type, &before_topic_length);
@@ -1238,7 +1246,12 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
         e->component_dport = conn->dport;
         __builtin_memcpy(&e->component_saddr, &conn->saddr, sizeof(e->component_saddr));
         __builtin_memcpy(&e->component_daddr, &conn->daddr, sizeof(e->component_daddr));
-        response = is_kafka_response(payload, req->request_id);
+        __s32 real_request_id;
+        response = is_kafka_response(payload, req->request_id, &real_request_id);
+        cw_bpf_debug("[Request][KAFKA] end -------->");
+        bpf_printk("[Request][KAFKA] end -------->");
+        // cw_bpf_debug("[Request][KAFKA] correlation_id  end is %d, real is %d -------->", req->request_id, real_request_id);
+        
         // if(response)
         // {
         //     struct mq_data *mq_data_ptr = bpf_map_lookup_elem(&apm_l7_request_mq_data_map, &req->request_id);

+ 0 - 2
ebpftracer/l7/l7.go

@@ -99,8 +99,6 @@ func (p Protocol) ServiceNameString() string {
 		return "DNS"
 	case ProtocolDM:
 		return "DM"
-	case ProtocolKafka:
-		return "KAFKA"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }

+ 5 - 1
ebpftracer/tracer.go

@@ -761,7 +761,11 @@ func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapTy
 				ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
 				ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
 			}
-			if req.Protocol == l7.ProtocolHTTP {
+			// if req.Protocol == l7.ProtocolHTTP {
+			// 	klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
+			// 	klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
+			// }
+			if req.Protocol == l7.ProtocolKafka {
 				klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
 				klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
 			}

+ 1 - 1
main.go

@@ -322,7 +322,7 @@ func main() {
 		if err != nil {
 			return
 		}
-		log.Infoln("netdata is:", string(jsonData))
+		// log.Infoln("netdata is:", string(jsonData))
 		// 创建请求
 		urlRoute := "/api/v2/ebpf/receive"
 

+ 2 - 2
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -231,7 +231,7 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 
 	//Transform the categorized map into a slice
 	data, _ := json.Marshal(sendDataMap)
-	klog.Debug(string(data))
+	klog.Info(string(data))
 	//fmt.Println(len(sendData))
 	//fmt.Println("sdl len:", len(sdl))
 	return sendDataMap
@@ -926,7 +926,7 @@ func buildMQMapEvent(mNode *MapInfoT, event tracesdk.Event){
 		}
 	}
 	mNode.Flow = 0
-	mNode.Scheme = ""
+	mNode.Schema = ""
 }
 
 func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) {

+ 1 - 1
tracing/apm_tracing.go

@@ -473,7 +473,7 @@ func (t *Trace) RedisTraceQueryEvent(cmd, args string, r *l7.RequestData, destin
 	t.createTraceEvent(l7.ProtocolRedis.String(), ebpftracer.EventTypeL7Request.Int(), l7.ProtocolRedis.Int(), attr...)
 }
 
-func (t *Trace) KafkaProduceRequestEvent(topic string,destination netaddr.IPPort) {
+func (t *Trace) KafkaProduceRequestEvent(topic string, r *l7.RequestData,destination netaddr.IPPort) {
 	if t == nil {
 		return
 	}