Просмотр исходного кода

Feature #TASK_QT-18250 根据不同的go-grpc版本,挂载不同的uprobe点。

rock 7 месяцев назад
Родитель
Сommit
6994bcf41d
2 измененных файлов с 140 добавлено и 80 удалено
  1. 64 64
      ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c
  2. 76 16
      ebpftracer/tls.go

+ 64 - 64
ebpftracer/ebpf/utrace/go/net/grpc.server.probe.bpf.c

@@ -261,38 +261,38 @@ handleStream(struct pt_regs *ctx, void *stream_ptr, struct go_iface *go_context)
 //   - status_ptr: pointer to the status.Stream holding the status info
 //
 // Returns 0 on success, otherwise a negative error value in case of failure.
-static __always_inline int writeStatus(struct pt_regs *ctx, void *status_ptr) {
-    if (status_ptr == NULL) {
-        bpf_printk("grpc:server:writeStatus: NULL status_ptr");
-        return -1;
-    }
-
-    void *key = (void *)GOROUTINE(ctx);
-
-    struct grpc_request_t *req_ptr = bpf_map_lookup_elem(&grpc_events, &key);
-    if (req_ptr == NULL) {
-        bpf_printk("grpc:server:handleStream: failed to lookup grpc request");
-        return -2;
-    }
-
-    void *s_ptr = 0;
-    long rc = bpf_probe_read_user(&s_ptr, sizeof(s_ptr), (void *)(status_ptr + status_s_pos));
-    if (rc != 0) {
-        bpf_printk("grpc:server:handleStream: failed to read Status.s");
-        return -3;
-    }
-
-    // Get status code from Status.s pointer
-    rc = bpf_probe_read_user(
-        &req_ptr->status_code, sizeof(req_ptr->status_code), (void *)(s_ptr + status_code_pos));
-    if (rc != 0) {
-        bpf_printk("grpc:server:handleStream: failed to read status code");
-        return -4;
-    }
-    req_ptr->has_status = true;
-
-    return 0;
-}
+// static __always_inline int writeStatus(struct pt_regs *ctx, void *status_ptr) {
+//     if (status_ptr == NULL) {
+//         bpf_printk("grpc:server:writeStatus: NULL status_ptr");
+//         return -1;
+//     }
+
+//     void *key = (void *)GOROUTINE(ctx);
+
+//     struct grpc_request_t *req_ptr = bpf_map_lookup_elem(&grpc_events, &key);
+//     if (req_ptr == NULL) {
+//         bpf_printk("grpc:server:handleStream: failed to lookup grpc request");
+//         return -2;
+//     }
+
+//     void *s_ptr = 0;
+//     long rc = bpf_probe_read_user(&s_ptr, sizeof(s_ptr), (void *)(status_ptr + status_s_pos));
+//     if (rc != 0) {
+//         bpf_printk("grpc:server:handleStream: failed to read Status.s");
+//         return -3;
+//     }
+
+//     // Get status code from Status.s pointer
+//     rc = bpf_probe_read_user(
+//         &req_ptr->status_code, sizeof(req_ptr->status_code), (void *)(s_ptr + status_code_pos));
+//     if (rc != 0) {
+//         bpf_printk("grpc:server:handleStream: failed to read status code");
+//         return -4;
+//     }
+//     req_ptr->has_status = true;
+
+//     return 0;
+// }
 
 // This instrumentation attaches uprobe to the following function:
 // func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo)
@@ -664,35 +664,35 @@ int uprobe_http2Server_operateHeader(struct pt_regs *ctx) {
 // https://github.com/grpc/grpc-go/blob/bcf9171a20e44ed81a6eb152e3ca9e35b2c02c5d/internal/transport/http2_server.go#L1049
 //
 // This is only compatible with versions > 1.40 and < 1.69.0 of the Server.
-SEC("uprobe/http2Server_WriteStatus")
-int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
-    // bpf_printk("enter uprobe_http2Server_WriteStatus\n");
-    void *status_ptr = get_argument(ctx, 3);
-    return writeStatus(ctx, status_ptr);
-}
-
-// func (ht *http2Server) writeStatus(s *Stream, st *status.Status)
-// https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/internal/transport/http2_server.go#L1045
-//
-// This is only compatible with versions > 1.69.0 of the Server.
-SEC("uprobe/http2Server_WriteStatus2")
-int uprobe_http2Server_WriteStatus2(struct pt_regs *ctx) {
-    // bpf_printk("enter uprobe_http2Server_WriteStatus2\n");
-    u64 server_stream_pos = 2;
-    void *server_stream_ptr = get_argument(ctx, server_stream_pos);
-    if (server_stream_ptr == NULL) {
-        bpf_printk("grpc:server:uprobe/http2Server_WriteStatus2: failed to get ServerStream arg");
-        return -1;
-    }
-
-    void *stream_ptr;
-    long rc = bpf_probe_read_user(
-        &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
-    if (rc != 0) {
-        bpf_printk("grpc:server:uprobe/http2Server_WriteStatus2: failed to read stream_ptr");
-        return -2;
-    }
-
-    void *status_ptr = get_argument(ctx, 3);
-    return writeStatus(ctx, status_ptr);
-}
+// SEC("uprobe/http2Server_WriteStatus")
+// int uprobe_http2Server_WriteStatus(struct pt_regs *ctx) {
+//     // bpf_printk("enter uprobe_http2Server_WriteStatus\n");
+//     void *status_ptr = get_argument(ctx, 3);
+//     return writeStatus(ctx, status_ptr);
+// }
+
+// // func (ht *http2Server) writeStatus(s *Stream, st *status.Status)
+// // https://github.com/grpc/grpc-go/blob/317271b232677b7869576a49855b01b9f4775d67/internal/transport/http2_server.go#L1045
+// //
+// // This is only compatible with versions > 1.69.0 of the Server.
+// SEC("uprobe/http2Server_WriteStatus2")
+// int uprobe_http2Server_WriteStatus2(struct pt_regs *ctx) {
+//     // bpf_printk("enter uprobe_http2Server_WriteStatus2\n");
+//     u64 server_stream_pos = 2;
+//     void *server_stream_ptr = get_argument(ctx, server_stream_pos);
+//     if (server_stream_ptr == NULL) {
+//         bpf_printk("grpc:server:uprobe/http2Server_WriteStatus2: failed to get ServerStream arg");
+//         return -1;
+//     }
+
+//     void *stream_ptr;
+//     long rc = bpf_probe_read_user(
+//         &stream_ptr, sizeof(stream_ptr), (void *)(server_stream_ptr + server_stream_stream_pos));
+//     if (rc != 0) {
+//         bpf_printk("grpc:server:uprobe/http2Server_WriteStatus2: failed to read stream_ptr");
+//         return -2;
+//     }
+
+//     void *status_ptr = get_argument(ctx, 3);
+//     return writeStatus(ctx, status_ptr);
+// }

+ 76 - 16
ebpftracer/tls.go

@@ -206,6 +206,46 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 		return nil, err
 	}
 
+	// 检测 gRPC 版本
+	var grpcMajorVersion, grpcMinorVersion int
+	for _, dep := range bi.Deps {
+		if strings.Contains(dep.Path, "grpc") {
+			klog.Infoln("Found gRPC dependency:", dep.Path, "version:", dep.Version)
+			
+			// 解析版本号
+			version := dep.Version
+			if version != "" {
+				// 移除可能的 "v" 前缀
+				version = strings.TrimPrefix(version, "v")
+				parts := strings.Split(version, ".")
+				
+				if len(parts) >= 2 {
+					major, err := strconv.Atoi(parts[0])
+					if err != nil {
+						klog.WithError(err).Warnf("Error parsing major version from %s", parts[0])
+						continue
+					}
+					
+					minor, err := strconv.Atoi(parts[1])
+					if err != nil {
+						klog.WithError(err).Warnf("Error parsing minor version from %s", parts[1])
+						continue
+					}
+					
+					klog.Infof("Detected gRPC version: %d.%d for PID %d", major, minor, pid)
+					grpcMajorVersion = major
+					grpcMinorVersion = minor
+					// // 根据版本选择相应的探针策略
+					// if major == 1 && minor >= 69 {
+					// 	klog.Infof("Using modern gRPC handler for version %d.%d", major, minor)
+					// } else {
+					// 	klog.Infof("Using legacy gRPC handler for version %d.%d", major, minor)
+					// }
+				}
+			}
+		}
+	}
+
 	offset, ok := tracer.GetOffset(tracer.NewID("std", "runtime", "g", "goid"), path)
 	bucketsOff, ok2 := tracer.GetOffset(tracer.NewID("std", "runtime", "hmap", "buckets"), path)
 
@@ -383,9 +423,10 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 			//	links = append(links, l)
 			//}
 		case goGrpcClientConnInvoke:
+			// 根据 gRPC 版本选择相应的探针
 			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_ClientConn_Invoke"], &link.UprobeOptions{Address: address})
 			if err != nil {
-				klog.WithError(err).Errorln("failed to attach uprobe_ClientConn_Invoke uprobe")
+				klog.WithError(err).Errorf("failed to attach uprobe_ClientConn_Invoke uprobe")
 				continue
 			}
 			klog.Infoln("uprobe_ClientConn_Invoke ok")
@@ -414,44 +455,49 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				links = append(links, l)
 			}
 		case goGrpcClientLoopyHeaderHandler:
+			// 根据 gRPC 版本选择相应的探针
 			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_LoopyWriter_HeaderHandler"], &link.UprobeOptions{Address: address})
 			if err != nil {
-				klog.WithError(err).Errorln("failed to attach uprobe_LoopyWriter_HeaderHandler uprobe")
+				klog.WithError(err).Errorf("failed to attach uprobe_LoopyWriter_HeaderHandler uprobe")
 				continue
 			}
 			klog.Infoln("uprobe_LoopyWriter_HeaderHandler ok")
 			links = append(links, l)
 		case goGrpcHttp2ClientNewStream:
+			// 根据 gRPC 版本选择相应的探针
 			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Client_NewStream"], &link.UprobeOptions{Address: address})
 			if err != nil {
-				klog.WithError(err).Errorln("failed to attach uprobe_http2Client_NewStream uprobe")
+				klog.WithError(err).Errorf("failed to attach uprobe_http2Client_NewStream uprobe")
 				continue
 			}
 			klog.Infoln("uprobe_http2Client_NewStream ok")
 			links = append(links, l)
 		case goGrpcHttp2OperateHeader:
+			// 根据 gRPC 版本选择相应的探针
 			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Server_operateHeader"], &link.UprobeOptions{Address: address})
 			if err != nil {
-				klog.WithError(err).Errorln("failed to attach uprobe_http2Server_operateHeader uprobe")
-				continue
-			}
-			klog.Infoln("goGrpcHttp2OperateHeader ok")
-			links = append(links, l)
-		case goGrpcServerWritestatus:
-			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Server_WriteStatus"], &link.UprobeOptions{Address: address})
-			if err != nil {
-				klog.WithError(err).Errorln("failed to attach uprobe_http2Server_WriteStatus uprobe")
+				klog.WithError(err).Errorf("failed to attach uprobe_http2Server_operateHeader uprobe")
 				continue
 			}
-			klog.Infoln("google.golang.org/grpc/internal/transport.(*http2Server).writeStatus ok")
+			klog.Infoln("uprobe_http2Server_operateHeader ok")
 			links = append(links, l)
+		// case goGrpcServerWritestatus:
+		// 	// 根据 gRPC 版本选择相应的 WriteStatus 探针
+		// 	l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_http2Server_WriteStatus"], &link.UprobeOptions{Address: address})
+		// 	if err != nil {
+		// 		klog.WithError(err).Errorf("failed to attach uprobe_http2Server_WriteStatus uprobe")
+		// 		continue
+		// 	}
+		// 	links = append(links, l)
 		case goGrpcServerHandleStream:
-			l, err := exe.Uprobe(s.Name, t.uprobes["uprobe_server_handleStream"], &link.UprobeOptions{Address: address})
+			// 根据 gRPC 版本选择相应的探针
+			probeName := t.selectGRPCServerProbe(grpcMajorVersion, grpcMinorVersion)
+			l, err := exe.Uprobe(s.Name, t.uprobes[probeName], &link.UprobeOptions{Address: address})
 			if err != nil {
-				klog.WithError(err).Errorln("failed to attach uprobe_server_handleStream uprobe")
+				klog.WithError(err).Errorf("failed to attach %s uprobe", probeName)
 				continue
 			}
-			klog.Infoln("google.golang.org/grpc.(*Server).handleStream ok")
+			klog.Infof("%s ok (gRPC v%d.%d)", probeName, grpcMajorVersion, grpcMinorVersion)
 			links = append(links, l)
 			sStart := s.Value - textSection.Addr
 			sEnd := sStart + s.Size
@@ -651,6 +697,20 @@ func getSslLibPathAndVersion(pid uint32) (string, string) {
 	return libsslPath, "v" + version
 }
 
+// selectGRPCServerProbe 根据 gRPC 版本选择服务端探针
+func (t *Tracer) selectGRPCServerProbe(major, minor int) string {
+	// 根据 gRPC 版本选择相应的探针
+	if major == 1 && minor >= 69 {
+		// 现代版本 (>= 1.69.0) 使用新的探针
+		klog.Infof("Selecting modern gRPC server probe for version %d.%d", major, minor)
+		return "uprobe_server_handleStream2"
+	} else {
+		// 传统版本 (< 1.69.0) 使用旧的探针
+		klog.Infof("Selecting legacy gRPC server probe for version %d.%d", major, minor)
+		return "uprobe_server_handleStream"
+	}
+}
+
 func getReturnOffsets(machine elf.Machine, instructions []byte) []int {
 	var res []int
 	switch machine {