ソースを参照

Feature #TASK_QT-31498 es适配

Carl 6 ヶ月 前
コミット
be54ab7bdd

+ 39 - 6
containers/container_apm.go

@@ -213,12 +213,45 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 	 */
 	if r.Protocol == l7.ProtocolHTTP {
 		if c.l7Attach && c.valuableTrace(r.TraceId) {
-			method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
-			apmTrace, err := c.getOrInitTrace(r.TraceId)
-			//fmt.Println("ProtocolHTTP-----", r.TraceId, err)
-			if err == nil {
-				apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
-				c.SendEvent(apmTrace, r.TraceId)
+			// 检查是否启用 Elasticsearch 检测
+			if *flags.EnableElasticsearchDetection {
+				// 解析 User-Agent 以检测 Elasticsearch 请求
+				method, requestURI, sn, sport, userAgent := l7.ParseHttpHostWithUserAgent(r.Payload, r.IsTls)
+				// 检查是否是 Elasticsearch 请求(通过 User-Agent)
+				isElasticsearch := strings.Contains(strings.ToLower(userAgent), "elasticsearch")
+				apmTrace, err := c.getOrInitTrace(r.TraceId)
+				//fmt.Println("ProtocolHTTP-----", r.TraceId, err)
+				if err == nil {
+					if isElasticsearch {
+						r.Protocol = l7.ProtocolES
+						// Elasticsearch 请求,按照 NoSQL 方式处理
+						query := l7.ParseElasticsearch(r.Payload)
+						if c.AppInfo.AppName != "" {
+							klog.Debugf("[%s] ->>>>> Elasticsearch -> %s:%d Query:[%s]", c.AppInfo.AppName, sn, sport, query)
+						}
+						conn := c.connectionsByPidFd[PidFd{Pid: pid, Fd: fd}]
+						if conn == nil {
+							conn = &ActiveConnection{
+								Dest:       r.ComponentDAddr,
+								ActualDest: r.ComponentDAddr,
+								Timestamp:  timestamp,
+							}
+						}
+						apmTrace.NoSQLTraceQueryEvent(r.Protocol, semconv.DBSystemElasticsearch, method, query, r, conn.Src, conn.ActualDest)
+					} else {
+						// 普通 HTTP 请求
+						apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
+					}
+					c.SendEvent(apmTrace, r.TraceId)
+				}
+			} else {
+				// Elasticsearch 检测未启用,使用普通 HTTP 处理
+				method, requestURI, sn, sport := l7.ParseHttpHost(r.Payload, r.IsTls)
+				apmTrace, err := c.getOrInitTrace(r.TraceId)
+				if err == nil {
+					apmTrace.HttpTraceRequestEvent(method, requestURI, sn, sport, r)
+					c.SendEvent(apmTrace, r.TraceId)
+				}
 			}
 		}
 		//return nil

+ 1 - 0
ebpftracer/ebpf/include/socket_trace_common.h

@@ -201,6 +201,7 @@ struct ebpf_proc_info {
 	__u64 path_ptr_pos;
 	__u64 status_code_pos;
 	__u64 request_host_pos;
+	__u64 url_host_pos;  // url.URL.Host field offset
 	__u64 request_proto_pos;
 	__u64 ctx_ptr_pos;
 	__u64 headers_ptr_pos;

+ 15 - 5
ebpftracer/ebpf/utrace/go/net/client.probe.bpf.c

@@ -318,12 +318,22 @@ int uprobe_Transport_roundTrip(struct pt_regs *ctx) {
 //        return 0;
 //    }
 
-    // get host from Request
-    if (!get_go_string_from_user_ptr((void *)(req_ptr+proc_info->request_host_pos), httpReq->host, sizeof(httpReq->host))) {
-        cw_bpf_debug("uprobe_Transport_roundTrip: Failed to get host from Request");
+    // get host from Request.Host first
+    bool host_read_success = get_go_string_from_user_ptr((void *)(req_ptr+proc_info->request_host_pos), httpReq->host, sizeof(httpReq->host));
+    
+    // If Request.Host is empty or failed, try to get from Request.URL.Host
+    if (!host_read_success || httpReq->host[0] == '\0') {
+        // Get Request.URL pointer
+        void *url_ptr = 0;
+        if (!bpf_probe_read(&url_ptr, sizeof(url_ptr), (void *)(req_ptr+proc_info->url_ptr_pos))) {
+            // Use runtime-detected offset for url.URL.Host field
+            if (proc_info->url_host_pos > 0) {
+                if (!get_go_string_from_user_ptr((void *)(url_ptr + proc_info->url_host_pos), httpReq->host, sizeof(httpReq->host))) {
+                    cw_bpf_debug("[Client] Failed to get host from Request.URL.Host");
+                }
+            }
+        }
     }
-	// TODO set request_host_pos
-	cw_bpf_debug("[Client] httpReq->host:%llu",proc_info->request_host_pos);
 //	for (int i = 0; i < MAX_HOSTNAME_SIZE; ++i) {
 //		if (httpReq->host[i] == '\0') {
 //			break;

+ 25 - 0
ebpftracer/l7/http.go

@@ -108,3 +108,28 @@ func ParseHttpHostWithUserAgent(payload []byte, isTls bool) (string, string, str
 	userAgent := parseHttpHeader(rest, "User-Agent:")
 	return method, uri, host, port, userAgent
 }
+
+// ParseElasticsearch 解析 Elasticsearch HTTP 请求体
+// 只返回请求体(Query)部分
+func ParseElasticsearch(payload []byte) string {
+	_, _, _, _, rest := parseHttpHostCommon(payload, false)
+
+	// 查找请求体的开始位置(\r\n\r\n 之后)
+	bodyStart := bytes.Index(rest, []byte("\r\n\r\n"))
+	if bodyStart == -1 {
+		// 如果没有找到完整的头部结束标记,返回空字符串
+		return ""
+	}
+	bodyStart += 4
+
+	// 提取请求体(限制长度,避免过长)
+	body := rest[bodyStart:]
+	maxBodyLen := 200
+	if len(body) > maxBodyLen {
+		body = body[:maxBodyLen]
+	}
+
+	// 只返回请求体内容
+	bodyStr := strings.TrimSpace(string(body))
+	return bodyStr
+}

+ 5 - 0
ebpftracer/l7/l7.go

@@ -28,6 +28,7 @@ const (
 	ProtocolDM        Protocol = 14
 	ProtocolMariaDB   Protocol = 15
 	ProtocolGrpc      Protocol = 16
+	ProtocolES        Protocol = 17
 )
 
 func (p Protocol) Int() int {
@@ -70,6 +71,8 @@ func (p Protocol) String() string {
 		return "Mariadb"
 	case ProtocolGrpc:
 		return "GRPC"
+	case ProtocolES:
+		return "Elasticsearch"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }
@@ -110,6 +113,8 @@ func (p Protocol) ServiceNameString() string {
 		return "MARIA"
 	case ProtocolGrpc:
 		return "GRPC"
+	case ProtocolES:
+		return "ELASTICSEARCH"
 	}
 	return "UNKNOWN:" + strconv.Itoa(int(p))
 }

+ 1 - 0
ebpftracer/tls.go

@@ -440,6 +440,7 @@ func (t *Tracer) AttachGoTlsUprobes(pid uint32, appInfo *AppInfo, codeType uint1
 				&info.PathPtrPos:          tracer.NewID("std", "net/url", "URL", "Path"),
 				&info.StatusCodePos:       tracer.NewID("std", "net/http", "response", "status"),
 				&info.RequestHostPos:      tracer.NewID("std", "net/http", "Request", "Host"),
+				&info.UrlHostPos:          tracer.NewID("std", "net/url", "URL", "Host"),
 				&info.ProtoPos:            tracer.NewID("std", "net/http", "Request", "Proto"),
 				&info.CtxPtrPos:           tracer.NewID("std", "net/http", "Request", "ctx"),
 				&info.HeadersPtrPos:       tracer.NewID("std", "net/http", "Request", "Header"),

+ 10 - 9
flags/flags.go

@@ -32,17 +32,18 @@ var (
 	RunInContainer      = kingpin.Flag("run-in-container", "run in container").Default("false").Envar("RUN_IN_CONTAINER").Bool()
 	RunInOmniagent      = kingpin.Flag("run-in-omniagent", "run in omniagent").Default("false").Envar("RUN_IN_OMNIAGENT").Bool()
 
-	ListenAddress     = kingpin.Flag("listen", "Listen address - ip:port or :port").Default("0.0.0.0:8123").Envar("LISTEN").String()
-	CgroupRoot        = kingpin.Flag("cgroupfs-root", "The mount point of the host cgroupfs root").Default("/sys/fs/cgroup").Envar("CGROUPFS_ROOT").String()
-	DisableLogParsing = kingpin.Flag("disable-log-parsing", "Disable container log parsing").Default("true").Envar("DISABLE_LOG_PARSING").Bool()
-	DisablePinger     = kingpin.Flag("disable-pinger", "Don't ping upstreams").Default("false").Envar("DISABLE_PINGER").Bool()
-	DisableL7Tracing  = kingpin.Flag("disable-l7-tracing", "Disable L7 tracing").Default("false").Envar("DISABLE_L7_TRACING").Bool()
+	ListenAddress                = kingpin.Flag("listen", "Listen address - ip:port or :port").Default("0.0.0.0:8123").Envar("LISTEN").String()
+	CgroupRoot                   = kingpin.Flag("cgroupfs-root", "The mount point of the host cgroupfs root").Default("/sys/fs/cgroup").Envar("CGROUPFS_ROOT").String()
+	DisableLogParsing            = kingpin.Flag("disable-log-parsing", "Disable container log parsing").Default("true").Envar("DISABLE_LOG_PARSING").Bool()
+	DisablePinger                = kingpin.Flag("disable-pinger", "Don't ping upstreams").Default("false").Envar("DISABLE_PINGER").Bool()
+	DisableL7Tracing             = kingpin.Flag("disable-l7-tracing", "Disable L7 tracing").Default("false").Envar("DISABLE_L7_TRACING").Bool()
+	EnableElasticsearchDetection = kingpin.Flag("enable-es", "Enable Elasticsearch detection in HTTP requests").Default("false").Envar("ENABLE_ES").Bool()
 
 	ExternalNetworksWhitelist = kingpin.
-		Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
-		Envar("TRACK_PUBLIC_NETWORK").
-		Default("0.0.0.0/0").
-		Strings()
+					Flag("track-public-network", "Allow track connections to the specified IP networks, all private networks are allowed by default (e.g., Y.Y.Y.Y/mask)").
+					Envar("TRACK_PUBLIC_NETWORK").
+					Default("0.0.0.0/0").
+					Strings()
 	EphemeralPortRange = kingpin.Flag("ephemeral-port-range", "Destination and Listen TCP ports from this range will be skipped").Default("").Envar("EPHEMERAL_PORT_RANGE").String()
 
 	Provider          = kingpin.Flag("provider", "`provider` label for `node_cloud_info` metric").Envar("PROVIDER").String()

+ 1 - 1
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -193,7 +193,7 @@ func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
 					buildDNSMapEvent(&mNode, event)
 				case l7.ProtocolMysql, l7.ProtocolMariaDB, l7.ProtocolPostgres, l7.ProtocolDM:
 					buildSQLMapEvent(&mNode, event)
-				case l7.ProtocolRedis, l7.ProtocolMemcached, l7.ProtocolCassandra:
+				case l7.ProtocolRedis, l7.ProtocolMemcached, l7.ProtocolCassandra, l7.ProtocolES:
 					buildNoSqlMapEvent(&mNode, event)
 				case l7.ProtocolMongo:
 					buildMongoMapEvent(&mNode, event)

+ 1 - 0
utils/modelse/bpf_struct.go

@@ -203,6 +203,7 @@ type EbpfProcInfo struct {
 	PathPtrPos     uint64
 	StatusCodePos  uint64
 	RequestHostPos uint64
+	UrlHostPos     uint64 // url.URL.Host field offset
 	ProtoPos       uint64
 	CtxPtrPos      uint64
 	HeadersPtrPos  uint64