mongo.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package l7
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "strings"
  6. "go.mongodb.org/mongo-driver/bson"
  7. )
  8. const (
  9. MongoOpMSG = 2013
  10. // MongoDB Wire Protocol OP_MSG Structure:
  11. // [16 bytes: header] + [4 bytes: flag_bits] + [sections...]
  12. mongoHeaderLength = 16 // length(4) + request_id(4) + response_to(4) + op_code(4)
  13. mongoFlagBitsLength = 4
  14. mongoOpCodeOffset = 12
  15. mongoSectionKindLength = 1
  16. mongoSectionKindCmd = 0 // Section 0: Command document (metadata)
  17. mongoSectionKindDocs = 1 // Section 1: Document sequence (actual data)
  18. // 默认操作类型
  19. invalidDataResult = "<truncated>"
  20. )
  21. // ParseMongo 解析 MongoDB Wire Protocol (OP_MSG) 的 payload
  22. // 返回格式:opType|section0 或 opType|section0@[section1]
  23. func ParseMongo(payload []byte) string {
  24. minLength := mongoHeaderLength + mongoFlagBitsLength + mongoSectionKindLength + 4 // +4 for BSON length
  25. if len(payload) < minLength {
  26. return invalidDataResult
  27. }
  28. // 验证是否为 OP_MSG
  29. opCode := binary.LittleEndian.Uint32(payload[mongoOpCodeOffset:])
  30. if opCode != MongoOpMSG {
  31. return invalidDataResult
  32. }
  33. // 跳过 header (16 bytes) + flag_bits (4 bytes)
  34. offset := mongoHeaderLength + mongoFlagBitsLength
  35. var section0 string // 命令元数据
  36. var section0Raw []byte // Section 0 原始 BSON 数据(用于提取操作类型)
  37. var section1 string // 文档数据
  38. // 解析所有 sections
  39. for offset < len(payload) {
  40. if offset+mongoSectionKindLength >= len(payload) {
  41. break
  42. }
  43. sectionKind := payload[offset]
  44. offset += mongoSectionKindLength
  45. if sectionKind == mongoSectionKindCmd {
  46. // Section 0: 命令文档(元数据)
  47. if offset+4 > len(payload) {
  48. break
  49. }
  50. bsonLen := int(binary.LittleEndian.Uint32(payload[offset:]))
  51. if bsonLen < 5 || offset+bsonLen > len(payload) {
  52. break
  53. }
  54. section0Raw = payload[offset : offset+bsonLen]
  55. section0 = bson.Raw(section0Raw).String()
  56. // Section 0 是必需的,解析失败则提前返回
  57. if section0 == "" {
  58. return invalidDataResult
  59. }
  60. /*// 提取操作类型(BSON 文档的第一个字段名)
  61. cmd = extractMongoOpType(section0Raw)*/
  62. offset += bsonLen
  63. } else if sectionKind == mongoSectionKindDocs {
  64. // Section 1: 文档序列(真正的数据)
  65. if offset+4 > len(payload) {
  66. break
  67. }
  68. // Section 1 格式: [4 bytes size] + [identifier C-string] + [BSON documents]
  69. // 注意:size 包含了 size 字段本身的 4 字节
  70. section1Size := int(binary.LittleEndian.Uint32(payload[offset:]))
  71. // 验证:从 offset 开始需要 section1Size 字节(包含 size 字段)
  72. if section1Size < 5 || offset+section1Size > len(payload) {
  73. break
  74. }
  75. // 提取数据:跳过 size 字段的 4 字节
  76. section1Data := payload[offset+4 : offset+section1Size]
  77. // 跳过 identifier(null-terminated string)
  78. identifierEnd := 0
  79. for i, b := range section1Data {
  80. if b == 0 {
  81. identifierEnd = i + 1
  82. break
  83. }
  84. if i > 20 { // 防止无限循环,identifier一般不会超过20个字节
  85. break
  86. }
  87. }
  88. if identifierEnd > 0 && identifierEnd < len(section1Data) {
  89. // 解析 BSON 文档数组
  90. docsData := section1Data[identifierEnd:]
  91. docs := parseBSONDocuments(docsData)
  92. if len(docs) > 0 {
  93. section1 = strings.Join(docs, ", ")
  94. }
  95. }
  96. offset += section1Size
  97. } else {
  98. // 未知的 section kind,停止解析
  99. break
  100. }
  101. }
  102. // 构建返回结果 格式:
  103. // - 有 section1: "section0@[section1]" (例: {"insert":"users"}@[{"name":"Alice"}, ...])
  104. // - 无 section1: "section0" (例: {"find":"users","filter":{...}})
  105. var baseResult string
  106. if section1 != "" {
  107. // 同时包含命令和文档(批量操作)
  108. baseResult = fmt.Sprintf("%s@[%s]", section0, section1)
  109. } else {
  110. // 只有命令(查询、单文档操作)
  111. baseResult = section0
  112. }
  113. return baseResult
  114. }
  115. // extractMongoOpType 从 Section 0 的 BSON 文档中提取操作类型
  116. // BSON 格式:[4 bytes: length] + [1 byte: type] + [cstring: field name] + [value] + ... + [0x00]
  117. /*func extractMongoOpType(bsonData []byte) string {
  118. if len(bsonData) < 5 {
  119. return ""
  120. }
  121. // 跳过文档长度(前 4 字节)
  122. offset := 4
  123. // 读取第一个元素的 type(1 字节)
  124. if offset >= len(bsonData) {
  125. return ""
  126. }
  127. // elementType := bsonData[offset] // 暂不需要
  128. offset += 1
  129. // 读取第一个元素的 field name(null-terminated string)
  130. fieldNameStart := offset
  131. for offset < len(bsonData) && bsonData[offset] != 0 {
  132. offset++
  133. if offset-fieldNameStart > 50 { // 防止无限循环
  134. return ""
  135. }
  136. }
  137. if offset >= len(bsonData) {
  138. return ""
  139. }
  140. // 提取操作类型(字段名),操作类型总是 BSON 文档的第一个字段,所以这里直接提取
  141. opType := string(bsonData[fieldNameStart:offset])
  142. // 过滤掉内部字段(以 $ 开头的)(理论上不会,容错)
  143. if strings.HasPrefix(opType, "$") {
  144. return ""
  145. }
  146. return opType
  147. }*/
  148. // parseBSONDocuments 解析连续的 BSON 文档
  149. func parseBSONDocuments(data []byte) []string {
  150. var docs []string
  151. offset := 0
  152. for offset < len(data) {
  153. if offset+4 > len(data) {
  154. break
  155. }
  156. // 读取 BSON 文档长度
  157. bsonLen := int(binary.LittleEndian.Uint32(data[offset:]))
  158. if bsonLen < 5 || offset+bsonLen > len(data) {
  159. break
  160. }
  161. // 解析 BSON 文档
  162. doc := bson.Raw(data[offset : offset+bsonLen])
  163. docs = append(docs, doc.String())
  164. offset += bsonLen
  165. // 限制解析数量,避免过多文档, 最多解析前1个文档
  166. if len(docs) > 1 {
  167. docs = append(docs, "...")
  168. break
  169. }
  170. }
  171. return docs
  172. }