Parcourir la source

Feature #TASK_QT-29889 【印尼-Telkom-POC】可观测-完善MongoDB抓取查询语句的功能

Tom il y a 7 mois
Parent
commit
61046fbf1f

+ 189 - 16
ebpftracer/l7/mongo.go

@@ -2,6 +2,8 @@ package l7
 
 import (
 	"encoding/binary"
+	"fmt"
+	"strings"
 
 	"go.mongodb.org/mongo-driver/bson"
 )
@@ -9,30 +11,201 @@ import (
 const (
 	MongoOpMSG = 2013
 
-	mongoHeaderLength      = 20
+	// MongoDB Wire Protocol OP_MSG Structure:
+	// [16 bytes: header] + [4 bytes: flag_bits] + [sections...]
+	mongoHeaderLength      = 16 // length(4) + request_id(4) + response_to(4) + op_code(4)
+	mongoFlagBitsLength    = 4
 	mongoOpCodeOffset      = 12
 	mongoSectionKindLength = 1
-	mongoSectionSizeLength = 4
-	mongoSectionKindBody   = 0
+	mongoSectionKindCmd    = 0 // Section 0: Command document (metadata)
+	mongoSectionKindDocs   = 1 // Section 1: Document sequence (actual data)
+
+	// 默认操作类型
+	unknownOpType     = "unknown"
+	invalidDataResult = "<truncated>"
+	separator         = "|"
 )
 
-func ParseMongo(payload []byte) (res string) {
-	res = "<truncated>"
-	if len(payload) < mongoHeaderLength+mongoSectionKindLength+mongoSectionSizeLength {
-		return
+// ParseMongo 解析 MongoDB Wire Protocol (OP_MSG) 的 payload
+// 返回格式:opType|section0 或 opType|section0@[section1]
+func ParseMongo(payload []byte) string {
+
+	//unknown|<truncated>
+	invalidResult := fmt.Sprintf("%s%s%s", unknownOpType, separator, invalidDataResult)
+
+	minLength := mongoHeaderLength + mongoFlagBitsLength + mongoSectionKindLength + 4 // +4 for BSON length
+	if len(payload) < minLength {
+		return invalidResult
 	}
+
+	// 验证是否为 OP_MSG
 	opCode := binary.LittleEndian.Uint32(payload[mongoOpCodeOffset:])
 	if opCode != MongoOpMSG {
-		return
+		return invalidResult
+	}
+
+	// 跳过 header (16 bytes) + flag_bits (4 bytes)
+	offset := mongoHeaderLength + mongoFlagBitsLength
+
+	var section0 string     // 命令元数据
+	var section0Raw []byte  // Section 0 原始 BSON 数据(用于提取操作类型)
+	var section1 string     // 文档数据
+	opType := unknownOpType // 操作类型(insert/update/find等),默认为 unknown
+
+	// 解析所有 sections
+	for offset < len(payload) {
+		if offset+mongoSectionKindLength >= len(payload) {
+			break
+		}
+		sectionKind := payload[offset]
+		offset += mongoSectionKindLength
+
+		if sectionKind == mongoSectionKindCmd {
+			// Section 0: 命令文档(元数据)
+			if offset+4 > len(payload) {
+				break
+			}
+			bsonLen := int(binary.LittleEndian.Uint32(payload[offset:]))
+			if bsonLen < 5 || offset+bsonLen > len(payload) {
+				break
+			}
+			section0Raw = payload[offset : offset+bsonLen]
+			section0 = bson.Raw(section0Raw).String()
+
+			// Section 0 是必需的,解析失败则提前返回
+			if section0 == "" {
+				return invalidResult
+			}
+
+			// 提取操作类型(BSON 文档的第一个字段名)
+			opType = extractMongoOpType(section0Raw)
+
+			offset += bsonLen
+
+		} else if sectionKind == mongoSectionKindDocs {
+			// Section 1: 文档序列(真正的数据)
+			if offset+4 > len(payload) {
+				break
+			}
+			// Section 1 格式: [4 bytes size] + [identifier C-string] + [BSON documents]
+			// 注意:size 包含了 size 字段本身的 4 字节
+			section1Size := int(binary.LittleEndian.Uint32(payload[offset:]))
+
+			// 验证:从 offset 开始需要 section1Size 字节(包含 size 字段)
+			if section1Size < 5 || offset+section1Size > len(payload) {
+				break
+			}
+
+			// 提取数据:跳过 size 字段的 4 字节
+			section1Data := payload[offset+4 : offset+section1Size]
+
+			// 跳过 identifier(null-terminated string)
+			identifierEnd := 0
+			for i, b := range section1Data {
+				if b == 0 {
+					identifierEnd = i + 1
+					break
+				}
+				if i > 100 { // 防止无限循环
+					break
+				}
+			}
+
+			if identifierEnd > 0 && identifierEnd < len(section1Data) {
+				// 解析 BSON 文档数组
+				docsData := section1Data[identifierEnd:]
+				docs := parseBSONDocuments(docsData)
+				if len(docs) > 0 {
+					section1 = strings.Join(docs, ", ")
+				}
+			}
+
+			offset += section1Size
+		} else {
+			// 未知的 section kind,停止解析
+			break
+		}
 	}
-	sectionKind := payload[mongoHeaderLength]
-	if sectionKind != mongoSectionKindBody {
-		return
+
+	// 构建返回结果 格式:
+	//   - 有 section1: "opType|section0@[section1]"  (例: insert|{"insert":"users"}@[{"name":"Alice"}, ...])
+	//   - 无 section1: "opType|section0"  (例: find|{"find":"users","filter":{...}})
+	var baseResult string
+	if section1 != "" {
+		// 同时包含命令和文档(批量操作)
+		baseResult = fmt.Sprintf("%s@[%s]", section0, section1)
+	} else {
+		// 只有命令(查询、单文档操作)
+		baseResult = section0
+	}
+
+	// 拼接操作类型并返回
+	return fmt.Sprintf("%s%s%s", opType, separator, baseResult)
+}
+
+// extractMongoOpType 从 Section 0 的 BSON 文档中提取操作类型
+// BSON 格式:[4 bytes: length] + [1 byte: type] + [cstring: field name] + [value] + ... + [0x00]
+func extractMongoOpType(bsonData []byte) string {
+	if len(bsonData) < 5 {
+		return ""
+	}
+	// 跳过文档长度(前 4 字节)
+	offset := 4
+
+	// 读取第一个元素的 type(1 字节)
+	if offset >= len(bsonData) {
+		return ""
+	}
+	// elementType := bsonData[offset] // 暂不需要
+	offset += 1
+
+	// 读取第一个元素的 field name(null-terminated string)
+	fieldNameStart := offset
+	for offset < len(bsonData) && bsonData[offset] != 0 {
+		offset++
+		if offset-fieldNameStart > 50 { // 防止无限循环
+			return ""
+		}
 	}
-	sectionData := payload[mongoHeaderLength+mongoSectionKindLength:]
-	sectionLength := binary.LittleEndian.Uint32(sectionData)
-	if sectionLength < 1 || int(sectionLength) > len(sectionData) {
-		return
+
+	if offset >= len(bsonData) {
+		return ""
+	}
+	// 提取操作类型(字段名),操作类型总是 BSON 文档的第一个字段,所以这里直接提取
+	opType := string(bsonData[fieldNameStart:offset])
+
+	// 过滤掉内部字段(以 $ 开头的)(理论上不会,容错)
+	if strings.HasPrefix(opType, "$") {
+		return ""
+	}
+	return opType
+}
+
+// parseBSONDocuments 解析连续的 BSON 文档
+func parseBSONDocuments(data []byte) []string {
+	var docs []string
+	offset := 0
+
+	for offset < len(data) {
+		if offset+4 > len(data) {
+			break
+		}
+		// 读取 BSON 文档长度
+		bsonLen := int(binary.LittleEndian.Uint32(data[offset:]))
+		if bsonLen < 5 || offset+bsonLen > len(data) {
+			break
+		}
+		// 解析 BSON 文档
+		doc := bson.Raw(data[offset : offset+bsonLen])
+		docs = append(docs, doc.String())
+
+		offset += bsonLen
+
+		// 限制解析数量,避免过多文档, 最多解析前1个文档
+		if len(docs) > 1 {
+			docs = append(docs, "...")
+			break
+		}
 	}
-	return bson.Raw(sectionData).String()
+	return docs
 }

+ 13 - 2
pkg/go.opentelemetry.io/otel/exporters/otlp/otlptrace/apm_exporter.go

@@ -12,6 +12,7 @@ import (
 	"net/url"
 	"sort"
 	"strconv"
+	"strings"
 	"sync"
 
 	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
@@ -1073,8 +1074,18 @@ func buildMongoMapEvent(mNode *MapInfoT, event tracesdk.Event) {
 			mNode.Port = attr.Value.AsInt64()
 		case "db.statement":
 			query := attr.Value.AsString()
-			mNode.MethodName = query
-			mNode.Ps = []string{query}
+			// 解析格式:
+			//1.opType|content(如:insert|{...}@[...])
+			//2.unknown|<truncated>
+			parts := strings.SplitN(query, "|", 2)
+			if len(parts) == 2 {
+				mNode.MethodName = strings.ToUpper(parts[0]) // 操作类型(insert/find/update等)
+				mNode.Ps = []string{parts[1]}                // 命令和文档数据
+			} else {
+				// 兜底:如果格式不符合预期,保持原样
+				mNode.MethodName = query
+				mNode.Ps = []string{query}
+			}
 		case "nosql.src_addr":
 			mNode.SrcAddr = attr.Value.AsString()
 		case "nosql.destination_addr":

+ 1 - 1
tracing/apm_tracing.go

@@ -561,7 +561,7 @@ func (t *Trace) MongoTraceQueryEvent(query string, r *l7.RequestData, destinatio
 
 	var attr []attribute.KeyValue
 	attr = append(attr,
-		semconv.DBSystemRedis,
+		semconv.DBSystemMongoDB,
 		semconv.DBStatement(query),
 		semconv.NetPeerName(destination.IP().String()),
 		semconv.NetPeerPort(int(destination.Port())),