فهرست منبع

Feature #TASK_QT-18250 中央国债MySQL WITH

Carl 10 ماه پیش
والد
کامیت
6f3383a5c1

+ 14 - 2
containers/container_apm.go

@@ -192,7 +192,9 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			}
 			query := conn.postgresParser.Parse(r.Payload)
 			//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
-
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("[%s] ->>>>> Postgres -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
+			}
 			//apmTrace, ok := c.getTrace(r.TraceId)
 			apmTrace, err := c.getOrInitTrace(r.TraceId)
 			//fmt.Println("mysql r.TraceId:", r.TraceId)
@@ -215,7 +217,9 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			}
 			query := conn.mysqlParser.Parse(r.Payload, r.StatementId)
 			//trace.MysqlQuery(query, r.Status.Error(), r.Duration)
-
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("[%s] ->>>>> Mysql -> %s payload:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
+			}
 			//apmTrace, ok := c.getTrace(r.TraceId)
 			apmTrace, err := c.getOrInitTrace(r.TraceId)
 			//fmt.Println("mysql r.TraceId:", r.TraceId)
@@ -238,6 +242,10 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 				conn.dmParser = l7.NewDmParser()
 			}
 			query := conn.dmParser.Parse(r.Payload, r.StatementId)
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("[%s] ->>>>> Mysql -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, query)
+			}
+
 			apmTrace, err := c.getOrInitTrace(r.TraceId)
 			if err == nil {
 				//apmTrace.DmTraceQueryEvent(query, r, conn.ActualDest)
@@ -260,6 +268,10 @@ func (c *Container) onL7RequestApm(pid uint32, fd uint64, timestamp uint64, r *l
 			//fmt.Println("cmd", cmd)
 			//fmt.Println("args", args)
 			//apmTrace, ok := c.getTrace(r.TraceId)
+			if c.AppInfo.AppName != "" {
+				klog.Debugf("[%s] ->>>>> Redis -> %s DMSQL:[%s]", c.AppInfo.AppName, conn.ActualDest, cmd)
+			}
+
 			apmTrace, err := c.getOrInitTrace(r.TraceId)
 			if err == nil {
 				//apmTrace.RedisTraceQuery(cmd, args, r.Status.Error(), r.Duration)

+ 1 - 1
containers/registry.go

@@ -446,7 +446,7 @@ func (r *Registry) handleEvents(ch <-chan ebpftracer.Event) {
 				}
 			case ebpftracer.EventTypeL7Request:
 
-				fmt.Println("e.L7Request Payload:", string(e.L7Request.Payload))
+				klog.Debugln("e.L7Request Payload:", string(e.L7Request.Payload))
 				if e.L7Request == nil {
 					continue
 				}

+ 21 - 9
ebpftracer/tracer/inject/inject_linux_amd64.go

@@ -879,14 +879,16 @@ func JvmInject(jvmInjector *JvmInjector) error {
 	klog.Infof("modifyIoFdTargetAddr")
 	err = modifyIoFdTargetAddr(pid, debugIoFdAddr, ioFdReleaseTargetAddr)
 	if err != nil {
-		fmt.Println(err)
+		klog.Error(err)
+		PtraceDetach(pid)
 		return err
 	}
 
 	klog.Infof("modifyNetSetTargetAddr")
 	err = modifyNetSetTargetAddr(pid, debugNetSendAddr, netSendReleaseTargetAddr)
 	if err != nil {
-		fmt.Println(err)
+		klog.Error(err)
+		PtraceDetach(pid)
 		return err
 	}
 	// 二次效验 读取并验证地址
@@ -896,33 +898,43 @@ func JvmInject(jvmInjector *JvmInjector) error {
 	printCodeData(jvmInjector.DebugLibNetInfo)
 	// 效验目标函数内地址是否与预期一致
 	if !jvmInjector.validateAllModifyCheck() && err == nil {
-		klog.Errorf("[inject] failed validateAllModifyCheck")
+		klog.WithError(err).Errorf("[inject] failed validateAllModifyCheck")
+		PtraceDetach(pid)
 		return err
 	}
 	// 更新函数入口
 	klog.Infof("modifyReleaseFuncEnter")
 	err = modifyReleaseFuncEnter(pid, originFuncEnterAddr, debugFuncEnterAddr)
 	if err != nil {
-		klog.Errorf("[inject] failed modifyReleaseFuncEnter")
+		klog.WithError(err).Errorf("[inject] failed modifyReleaseFuncEnter")
+		PtraceDetach(pid)
 		return err
 	}
 	// 校验jmp地址修改正确
 	klog.Infof("checkReleaseFuncSymAfterChange")
-	err = jvmInjector.checkReleaseFuncSymAfterChange()
-	if err != nil {
-		klog.Errorf("[inject] failed checkReleaseFuncSymAfterChange")
+	errReleaseFuncSymAfterChange := jvmInjector.checkReleaseFuncSymAfterChange()
+	if errReleaseFuncSymAfterChange != nil {
+		klog.WithError(errReleaseFuncSymAfterChange).Errorf("[inject] failed checkReleaseFuncSymAfterChange")
+		// 回滚
 		if len(jvmInjector.ReleaseLibNetInfo.FuncSymbol.OriginCode) == 5 {
 			err = restoreOriginalInstructions(pid, originFuncEnterAddr, jvmInjector.ReleaseLibNetInfo.FuncSymbol.OriginCode)
 			if err != nil {
-				fmt.Println(err)
+				klog.WithError(err).Errorf("[inject] failed restoreOriginalInstructions")
+				PtraceDetach(pid)
 				return err
 			}
 		}
+		PtraceDetach(pid)
+		return errReleaseFuncSymAfterChange
 	}
 
+	return PtraceDetach(pid)
+}
+
+func PtraceDetach(pid int) error {
 	// 恢复执行
 	klog.Infof("Detach")
-	if err = syscall.PtraceDetach(pid); err != nil {
+	if err := syscall.PtraceDetach(pid); err != nil {
 		klog.Errorf("ptrace DETACH: %v", err)
 		return err
 	}

+ 20 - 21
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -12,7 +12,6 @@ import (
 	"net/url"
 	"sort"
 	"strconv"
-	"strings"
 	"sync"
 
 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
@@ -881,10 +880,10 @@ func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 			query := attr.Value.AsString()
 			mNode.MethodName = query
 			mNode.Ps = []string{query}
-			words := strings.Fields(query)
-			if len(words) > 0 {
-				mNode.OperType = strings.ToUpper(words[0])
-			}
+			//words := strings.Fields(query)
+			//if len(words) > 0 {
+			//	mNode.OperType = strings.ToUpper(words[0])
+			//}
 		case "sql.exception":
 			if attr.Value.AsBool() {
 				mNode.Exception = 1
@@ -938,10 +937,10 @@ func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 		case "db.statement":
 			query := attr.Value.AsString()
 			mNode.Ps = []string{query}
-			words := strings.Fields(query)
-			if len(words) > 0 {
-				mNode.OperType = strings.ToUpper(words[0])
-			}
+			//words := strings.Fields(query)
+			//if len(words) > 0 {
+			//	mNode.OperType = strings.ToUpper(words[0])
+			//}
 		case "sql.exception":
 			if attr.Value.AsBool() {
 				mNode.Exception = 1
@@ -972,10 +971,10 @@ func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 			query := attr.Value.AsString()
 			mNode.MethodName = query
 			mNode.Ps = []string{query}
-			words := strings.Fields(query)
-			if len(words) > 0 {
-				mNode.OperType = strings.ToUpper(words[0])
-			}
+			//words := strings.Fields(query)
+			//if len(words) > 0 {
+			//	mNode.OperType = strings.ToUpper(words[0])
+			//}
 		case "sql.exception":
 			if attr.Value.AsBool() {
 				mNode.Exception = 1
@@ -1005,10 +1004,10 @@ func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 		case "db.statement":
 			query := attr.Value.AsString()
 			mNode.Ps = []string{query}
-			words := strings.Fields(query)
-			if len(words) > 0 {
-				mNode.OperType = strings.ToUpper(words[0])
-			}
+			//words := strings.Fields(query)
+			//if len(words) > 0 {
+			//	mNode.OperType = strings.ToUpper(words[0])
+			//}
 		case "sql.exception":
 			if attr.Value.AsBool() {
 				mNode.Exception = 1
@@ -1039,10 +1038,10 @@ func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 			query := attr.Value.AsString()
 			mNode.MethodName = query
 			mNode.Ps = []string{query}
-			words := strings.Fields(query)
-			if len(words) > 0 {
-				mNode.OperType = strings.ToUpper(words[0])
-			}
+			//words := strings.Fields(query)
+			//if len(words) > 0 {
+			//	mNode.OperType = strings.ToUpper(words[0])
+			//}
 		case "nosql.src_addr":
 			mNode.SrcAddr = attr.Value.AsString()
 		case "nosql.destination_addr":

+ 51 - 2
tracing/apm_tracing.go

@@ -4,8 +4,10 @@ import (
 	"context"
 	"fmt"
 	klog "github.com/sirupsen/logrus"
+	"os"
 	"strconv"
 	"strings"
+	"sync"
 	"sync/atomic"
 	"time"
 
@@ -315,12 +317,59 @@ func (t *Trace) MysqlTraceQuery(query string, error bool, duration time.Duration
 	)
 }
 
+// 内置允许操作(始终放行)
+var builtinOps = map[string]struct{}{
+	"SELECT": {},
+	"INSERT": {},
+	"UPDATE": {},
+	"DELETE": {},
+	"WITH":   {},
+}
+
+var (
+	allowedOps map[string]struct{}
+	initOnce   sync.Once
+)
+
+// 解析并缓存环境变量中的前缀
+func initAllowedOps() {
+	allowedOps = make(map[string]struct{})
+	env := os.Getenv("ALLOWED_SQL_PREFIXES")
+	for _, s := range strings.Split(env, ",") {
+		if trimmed := strings.ToUpper(strings.TrimSpace(s)); trimmed != "" {
+			allowedOps[trimmed] = struct{}{}
+		}
+	}
+}
+
 func isCURDOperation(q string) bool {
 	if len(q) < 6 {
 		return false
 	}
-	q = strings.ToUpper(q[:6])
-	return q == "SELECT" || q == "INSERT" || q == "UPDATE" || q == "DELETE" || q == "PREPAR"
+
+	// 提取前缀(考虑最长10个字符)
+	prefix := strings.ToUpper(strings.TrimSpace(q))
+	if len(prefix) > 10 {
+		prefix = prefix[:10]
+	}
+
+	// 提取第一个单词作为操作关键字
+	spaceIdx := strings.IndexAny(prefix, " \t\n")
+	if spaceIdx > 0 {
+		prefix = prefix[:spaceIdx]
+	}
+
+	// 先查固定允许的操作
+	if _, ok := builtinOps[prefix]; ok {
+		return true
+	}
+
+	// 加载环境变量控制的操作(只执行一次)
+	initOnce.Do(initAllowedOps)
+
+	// 查额外允许操作
+	_, ok := allowedOps[prefix]
+	return ok
 }
 
 func (t *Trace) SQLTraceQueryEvent(l7Type l7.Protocol, semconvVal attribute.KeyValue, query string, r *l7.RequestData, destination netaddr.IPPort) {