cassandra.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. package l7
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. )
  6. const (
  7. CassandraHeaderSize = 9 // version(1) + flags(1) + stream_id(2) + opcode(1) + length(4)
  8. CassandraOpcodeQuery = 0x07
  9. CassandraOpcodeExecute = 0x0A
  10. CassandraOpcodeBatch = 0x0D
  11. CassandraOpcodePrepare = 0x09
  12. CassandraOpcodeClose = 0x04
  13. )
  14. type CassandraParser struct {
  15. preparedStatements map[string]string // statement_id (hex string) -> query
  16. pendingPrepares map[int16]string // stream_id -> query (用于关联 PREPARE 请求和响应)
  17. lastPrepareQuery string // 最近一次 PREPARE 的查询语句(备用方案)
  18. }
  19. func NewCassandraParser() *CassandraParser {
  20. return &CassandraParser{
  21. preparedStatements: make(map[string]string),
  22. pendingPrepares: make(map[int16]string),
  23. lastPrepareQuery: "",
  24. }
  25. }
  26. // ParseCassandra 解析 Cassandra CQL 查询语句
  27. // 支持 QUERY、EXECUTE、BATCH、PREPARE 操作
  28. func ParseCassandra(payload []byte) string {
  29. if len(payload) < CassandraHeaderSize {
  30. return ""
  31. }
  32. // 读取 header
  33. version := payload[0]
  34. if version != 0x04 { // REQUEST_FRAME
  35. return ""
  36. }
  37. opcode := payload[4]
  38. bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
  39. if len(payload) < CassandraHeaderSize+bodyLength {
  40. // 数据不完整,但尝试解析
  41. bodyLength = len(payload) - CassandraHeaderSize
  42. }
  43. body := payload[CassandraHeaderSize:]
  44. if len(body) > bodyLength {
  45. body = body[:bodyLength]
  46. }
  47. switch opcode {
  48. case CassandraOpcodeQuery:
  49. return parseQuery(body)
  50. case CassandraOpcodeExecute:
  51. return parseExecute(body)
  52. case CassandraOpcodeBatch:
  53. return parseBatch(body)
  54. default:
  55. return ""
  56. }
  57. }
  58. // parseQuery 解析 QUERY 操作
  59. // QUERY 格式: query string (long string) + consistency (2 bytes) + flags (1 byte) + ...
  60. // long string 格式: 4 bytes (length, big-endian) + string bytes (NOT null-terminated)
  61. func parseQuery(body []byte) string {
  62. if len(body) < 4 {
  63. return ""
  64. }
  65. // 读取 long string 的长度(4 bytes, big-endian)
  66. queryLen := int(binary.BigEndian.Uint32(body[0:4]))
  67. if queryLen <= 0 {
  68. return ""
  69. }
  70. // 检查是否有足够的数据
  71. availableData := len(body) - 4
  72. if queryLen > availableData {
  73. // 数据被截断,只读取可用的部分
  74. if availableData > 0 {
  75. queryBytes := body[4 : 4+availableData]
  76. // 尝试找到字符串的结束位置(可能是null terminator或有效字符串的末尾)
  77. // 但Cassandra协议中long string不是null-terminated,所以直接读取
  78. query := string(queryBytes)
  79. // 检查是否包含可打印字符
  80. if isPrintableString(queryBytes) {
  81. return query + "..."
  82. }
  83. return ""
  84. }
  85. return ""
  86. }
  87. // 读取完整的查询字符串
  88. queryBytes := body[4 : 4+queryLen]
  89. // Cassandra的long string不是null-terminated,但某些实现可能会添加null terminator
  90. // 检查并移除末尾的null terminator(如果存在)
  91. if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 {
  92. queryBytes = queryBytes[:len(queryBytes)-1]
  93. }
  94. // 验证是否为有效的字符串(包含可打印字符)
  95. if !isPrintableString(queryBytes) {
  96. return ""
  97. }
  98. query := string(queryBytes)
  99. return query
  100. }
  101. // isPrintableString 检查字节数组是否包含可打印字符
  102. func isPrintableString(b []byte) bool {
  103. if len(b) == 0 {
  104. return false
  105. }
  106. printableCount := 0
  107. for _, c := range b {
  108. // 允许可打印ASCII字符、换行符、制表符等
  109. if c >= 32 && c <= 126 || c == '\n' || c == '\r' || c == '\t' {
  110. printableCount++
  111. }
  112. }
  113. // 如果至少80%的字符是可打印的,认为是有效字符串
  114. return printableCount*100/len(b) >= 80
  115. }
  116. // parseExecute 解析 EXECUTE 操作
  117. // EXECUTE 格式: statement id (short bytes) + ...
  118. func parseExecute(body []byte) string {
  119. if len(body) < 2 {
  120. return ""
  121. }
  122. // 读取 short bytes (2 bytes length + bytes)
  123. stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
  124. if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
  125. return ""
  126. }
  127. if stmtIdLen == 0 {
  128. return ""
  129. }
  130. stmtIdBytes := body[2 : 2+stmtIdLen]
  131. stmtId := fmt.Sprintf("%x", stmtIdBytes)
  132. return fmt.Sprintf("EXECUTE %s /* unknown */", stmtId)
  133. }
  134. // parseBatch 解析 BATCH 操作
  135. // BATCH 格式: batch type (1 byte) + queries count (2 bytes) + queries...
  136. func parseBatch(body []byte) string {
  137. if len(body) < 3 {
  138. return ""
  139. }
  140. batchType := body[0]
  141. queriesCount := int(binary.BigEndian.Uint16(body[1:3]))
  142. if queriesCount == 0 {
  143. return "BATCH (empty)"
  144. }
  145. offset := 3
  146. var queries []string
  147. for i := 0; i < queriesCount && offset < len(body); i++ {
  148. if offset >= len(body) {
  149. break
  150. }
  151. // 每个查询: kind (1 byte) + query string 或 statement id
  152. if offset+1 > len(body) {
  153. break
  154. }
  155. kind := body[offset]
  156. offset++
  157. switch kind {
  158. case 0: // QUERY
  159. if offset+4 > len(body) {
  160. break
  161. }
  162. queryLen := int(binary.BigEndian.Uint32(body[offset : offset+4]))
  163. offset += 4
  164. if queryLen > 0 && offset+queryLen <= len(body) {
  165. queryBytes := body[offset : offset+queryLen]
  166. // 移除null terminator(如果存在)
  167. if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 {
  168. queryBytes = queryBytes[:len(queryBytes)-1]
  169. }
  170. // 验证是否为有效字符串
  171. if isPrintableString(queryBytes) {
  172. queries = append(queries, string(queryBytes))
  173. } else {
  174. queries = append(queries, "<binary>")
  175. }
  176. offset += queryLen
  177. } else if queryLen > 0 && offset < len(body) {
  178. // 数据被截断
  179. availableData := len(body) - offset
  180. queryBytes := body[offset : offset+availableData]
  181. if isPrintableString(queryBytes) {
  182. queries = append(queries, string(queryBytes)+"...")
  183. }
  184. offset = len(body) // 到达末尾
  185. }
  186. case 1: // EXECUTE (prepared statement)
  187. if offset+2 > len(body) {
  188. break
  189. }
  190. stmtIdLen := int(binary.BigEndian.Uint16(body[offset : offset+2]))
  191. offset += 2
  192. if stmtIdLen > 0 && offset+stmtIdLen <= len(body) {
  193. stmtIdBytes := body[offset : offset+stmtIdLen]
  194. stmtId := fmt.Sprintf("%x", stmtIdBytes)
  195. queries = append(queries, fmt.Sprintf("EXECUTE %s", stmtId))
  196. offset += stmtIdLen
  197. }
  198. default:
  199. // 跳过未知类型
  200. break
  201. }
  202. }
  203. batchTypeStr := "BATCH"
  204. if batchType == 0 {
  205. batchTypeStr = "BATCH LOGGED"
  206. } else if batchType == 1 {
  207. batchTypeStr = "BATCH UNLOGGED"
  208. } else if batchType == 2 {
  209. batchTypeStr = "BATCH COUNTER"
  210. }
  211. if len(queries) == 0 {
  212. return batchTypeStr + " (empty)"
  213. }
  214. if len(queries) == 1 {
  215. return fmt.Sprintf("%s: %s", batchTypeStr, queries[0])
  216. }
  217. return fmt.Sprintf("%s: [%d queries]", batchTypeStr, len(queries))
  218. }
  219. // Parse 方法用于支持预编译语句(需要维护状态)
  220. func (p *CassandraParser) Parse(payload []byte) string {
  221. if len(payload) < CassandraHeaderSize {
  222. return ""
  223. }
  224. version := payload[0]
  225. if version != 0x04 {
  226. return ""
  227. }
  228. // 提取 stream_id (在 payload[2:4] 位置,big-endian)
  229. streamId := int16(binary.BigEndian.Uint16(payload[2:4]))
  230. opcode := payload[4]
  231. bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
  232. if len(payload) < CassandraHeaderSize+bodyLength {
  233. bodyLength = len(payload) - CassandraHeaderSize
  234. }
  235. body := payload[CassandraHeaderSize:]
  236. if len(body) > bodyLength {
  237. body = body[:bodyLength]
  238. }
  239. switch opcode {
  240. case CassandraOpcodeQuery:
  241. return parseQuery(body)
  242. case CassandraOpcodeExecute:
  243. return p.parseExecuteWithStatements(body)
  244. case CassandraOpcodeBatch:
  245. return parseBatch(body)
  246. case CassandraOpcodePrepare:
  247. return p.parsePrepare(body, streamId)
  248. case CassandraOpcodeClose:
  249. return p.parseClose(body)
  250. default:
  251. return ""
  252. }
  253. }
  254. // parseExecuteWithStatements 解析 EXECUTE 操作,尝试从预编译语句映射中查找
  255. func (p *CassandraParser) parseExecuteWithStatements(body []byte) string {
  256. if len(body) < 2 {
  257. return ""
  258. }
  259. stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
  260. if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
  261. return ""
  262. }
  263. if stmtIdLen == 0 {
  264. return ""
  265. }
  266. stmtIdBytes := body[2 : 2+stmtIdLen]
  267. stmtId := fmt.Sprintf("%x", stmtIdBytes)
  268. statement, ok := p.preparedStatements[stmtId]
  269. if ok {
  270. return fmt.Sprintf("EXECUTE: %s", statement)
  271. }
  272. // 如果找不到预编译语句,尝试使用最近一次 PREPARE 的查询语句(备用方案)
  273. if p.lastPrepareQuery != "" {
  274. // 使用 lastPrepareQuery,即使 statement id 不匹配
  275. return fmt.Sprintf("EXECUTE: %s", p.lastPrepareQuery)
  276. }
  277. // 如果还是找不到,显示简化的 statement id
  278. if len(stmtId) > 16 {
  279. return fmt.Sprintf("EXECUTE [%s...]", stmtId[:16])
  280. }
  281. return fmt.Sprintf("EXECUTE [%s]", stmtId)
  282. }
  283. // parsePrepare 解析 PREPARE 操作,保存预编译语句
  284. func (p *CassandraParser) parsePrepare(body []byte, streamId int16) string {
  285. query := parseQuery(body)
  286. if query == "" {
  287. return ""
  288. }
  289. // 保存查询语句和 stream_id 的映射,等待响应时关联 statement id
  290. p.pendingPrepares[streamId] = query
  291. // 同时保存为最近一次 PREPARE 的查询语句(备用方案)
  292. p.lastPrepareQuery = query
  293. return fmt.Sprintf("PREPARE: %s", query)
  294. }
  295. // parseClose 解析 CLOSE 操作,清除预编译语句
  296. func (p *CassandraParser) parseClose(body []byte) string {
  297. if len(body) < 2 {
  298. return ""
  299. }
  300. stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
  301. if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
  302. return ""
  303. }
  304. if stmtIdLen == 0 {
  305. return ""
  306. }
  307. stmtIdBytes := body[2 : 2+stmtIdLen]
  308. stmtId := fmt.Sprintf("%x", stmtIdBytes)
  309. delete(p.preparedStatements, stmtId)
  310. return fmt.Sprintf("CLOSE %s", stmtId)
  311. }
  312. // GetLastPrepareQuery 返回最近一次 PREPARE 的查询语句(用于调试)
  313. func (p *CassandraParser) GetLastPrepareQuery() string {
  314. return p.lastPrepareQuery
  315. }
  316. // ParseResponse 解析 Cassandra 响应,用于关联 PREPARE 请求和响应
  317. // 响应格式: version(1) + flags(1) + stream_id(2) + opcode(1) + length(4) + body
  318. func (p *CassandraParser) ParseResponse(payload []byte) {
  319. if len(payload) < CassandraHeaderSize {
  320. return
  321. }
  322. version := payload[0]
  323. if version != 0x84 { // RESPONSE_FRAME
  324. return
  325. }
  326. // 提取 stream_id
  327. streamId := int16(binary.BigEndian.Uint16(payload[2:4]))
  328. opcode := payload[4]
  329. bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
  330. if len(payload) < CassandraHeaderSize+bodyLength {
  331. bodyLength = len(payload) - CassandraHeaderSize
  332. }
  333. body := payload[CassandraHeaderSize:]
  334. if len(body) > bodyLength {
  335. body = body[:bodyLength]
  336. }
  337. // 只处理 RESULT 类型的响应(PREPARE 响应是 RESULT 类型)
  338. if opcode == 0x08 { // CASSANDRA_OPCODE_RESULT
  339. // 检查是否是 PREPARE 响应
  340. // PREPARE 响应的 body 格式: result kind (4 bytes) + statement id (short bytes) + ...
  341. if len(body) >= 4 {
  342. resultKind := binary.BigEndian.Uint32(body[0:4])
  343. // result kind = 0x0004 表示 PREPARED
  344. if resultKind == 0x0004 {
  345. // 提取 statement id
  346. if len(body) >= 6 {
  347. stmtIdLen := int(binary.BigEndian.Uint16(body[4:6]))
  348. if stmtIdLen > 0 && len(body) >= 6+stmtIdLen {
  349. stmtIdBytes := body[6 : 6+stmtIdLen]
  350. stmtId := fmt.Sprintf("%x", stmtIdBytes)
  351. // 从 pendingPrepares 中查找对应的查询语句
  352. if query, ok := p.pendingPrepares[streamId]; ok {
  353. // 关联 statement id 和查询语句
  354. p.preparedStatements[stmtId] = query
  355. // 删除临时映射
  356. delete(p.pendingPrepares, streamId)
  357. }
  358. }
  359. }
  360. }
  361. }
  362. }
  363. }