| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- package l7
- import (
- "encoding/binary"
- "fmt"
- )
- const (
- CassandraHeaderSize = 9 // version(1) + flags(1) + stream_id(2) + opcode(1) + length(4)
- CassandraOpcodeQuery = 0x07
- CassandraOpcodeExecute = 0x0A
- CassandraOpcodeBatch = 0x0D
- CassandraOpcodePrepare = 0x09
- CassandraOpcodeClose = 0x04
- )
- type CassandraParser struct {
- preparedStatements map[string]string // statement_id (hex string) -> query
- pendingPrepares map[int16]string // stream_id -> query (用于关联 PREPARE 请求和响应)
- lastPrepareQuery string // 最近一次 PREPARE 的查询语句(备用方案)
- }
- func NewCassandraParser() *CassandraParser {
- return &CassandraParser{
- preparedStatements: make(map[string]string),
- pendingPrepares: make(map[int16]string),
- lastPrepareQuery: "",
- }
- }
- // ParseCassandra 解析 Cassandra CQL 查询语句
- // 支持 QUERY、EXECUTE、BATCH、PREPARE 操作
- func ParseCassandra(payload []byte) string {
- if len(payload) < CassandraHeaderSize {
- return ""
- }
- // 读取 header
- version := payload[0]
- if version != 0x04 { // REQUEST_FRAME
- return ""
- }
- opcode := payload[4]
- bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
- if len(payload) < CassandraHeaderSize+bodyLength {
- // 数据不完整,但尝试解析
- bodyLength = len(payload) - CassandraHeaderSize
- }
- body := payload[CassandraHeaderSize:]
- if len(body) > bodyLength {
- body = body[:bodyLength]
- }
- switch opcode {
- case CassandraOpcodeQuery:
- return parseQuery(body)
- case CassandraOpcodeExecute:
- return parseExecute(body)
- case CassandraOpcodeBatch:
- return parseBatch(body)
- default:
- return ""
- }
- }
- // parseQuery 解析 QUERY 操作
- // QUERY 格式: query string (long string) + consistency (2 bytes) + flags (1 byte) + ...
- // long string 格式: 4 bytes (length, big-endian) + string bytes (NOT null-terminated)
- func parseQuery(body []byte) string {
- if len(body) < 4 {
- return ""
- }
- // 读取 long string 的长度(4 bytes, big-endian)
- queryLen := int(binary.BigEndian.Uint32(body[0:4]))
- if queryLen <= 0 {
- return ""
- }
- // 检查是否有足够的数据
- availableData := len(body) - 4
- if queryLen > availableData {
- // 数据被截断,只读取可用的部分
- if availableData > 0 {
- queryBytes := body[4 : 4+availableData]
- // 尝试找到字符串的结束位置(可能是null terminator或有效字符串的末尾)
- // 但Cassandra协议中long string不是null-terminated,所以直接读取
- query := string(queryBytes)
- // 检查是否包含可打印字符
- if isPrintableString(queryBytes) {
- return query + "..."
- }
- return ""
- }
- return ""
- }
- // 读取完整的查询字符串
- queryBytes := body[4 : 4+queryLen]
- // Cassandra的long string不是null-terminated,但某些实现可能会添加null terminator
- // 检查并移除末尾的null terminator(如果存在)
- if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 {
- queryBytes = queryBytes[:len(queryBytes)-1]
- }
- // 验证是否为有效的字符串(包含可打印字符)
- if !isPrintableString(queryBytes) {
- return ""
- }
- query := string(queryBytes)
- return query
- }
- // isPrintableString 检查字节数组是否包含可打印字符
- func isPrintableString(b []byte) bool {
- if len(b) == 0 {
- return false
- }
- printableCount := 0
- for _, c := range b {
- // 允许可打印ASCII字符、换行符、制表符等
- if c >= 32 && c <= 126 || c == '\n' || c == '\r' || c == '\t' {
- printableCount++
- }
- }
- // 如果至少80%的字符是可打印的,认为是有效字符串
- return printableCount*100/len(b) >= 80
- }
- // parseExecute 解析 EXECUTE 操作
- // EXECUTE 格式: statement id (short bytes) + ...
- func parseExecute(body []byte) string {
- if len(body) < 2 {
- return ""
- }
- // 读取 short bytes (2 bytes length + bytes)
- stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
- if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
- return ""
- }
- if stmtIdLen == 0 {
- return ""
- }
- stmtIdBytes := body[2 : 2+stmtIdLen]
- stmtId := fmt.Sprintf("%x", stmtIdBytes)
- return fmt.Sprintf("EXECUTE %s /* unknown */", stmtId)
- }
- // parseBatch 解析 BATCH 操作
- // BATCH 格式: batch type (1 byte) + queries count (2 bytes) + queries...
- func parseBatch(body []byte) string {
- if len(body) < 3 {
- return ""
- }
- batchType := body[0]
- queriesCount := int(binary.BigEndian.Uint16(body[1:3]))
- if queriesCount == 0 {
- return "BATCH (empty)"
- }
- offset := 3
- var queries []string
- for i := 0; i < queriesCount && offset < len(body); i++ {
- if offset >= len(body) {
- break
- }
- // 每个查询: kind (1 byte) + query string 或 statement id
- if offset+1 > len(body) {
- break
- }
- kind := body[offset]
- offset++
- switch kind {
- case 0: // QUERY
- if offset+4 > len(body) {
- break
- }
- queryLen := int(binary.BigEndian.Uint32(body[offset : offset+4]))
- offset += 4
- if queryLen > 0 && offset+queryLen <= len(body) {
- queryBytes := body[offset : offset+queryLen]
- // 移除null terminator(如果存在)
- if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 {
- queryBytes = queryBytes[:len(queryBytes)-1]
- }
- // 验证是否为有效字符串
- if isPrintableString(queryBytes) {
- queries = append(queries, string(queryBytes))
- } else {
- queries = append(queries, "<binary>")
- }
- offset += queryLen
- } else if queryLen > 0 && offset < len(body) {
- // 数据被截断
- availableData := len(body) - offset
- queryBytes := body[offset : offset+availableData]
- if isPrintableString(queryBytes) {
- queries = append(queries, string(queryBytes)+"...")
- }
- offset = len(body) // 到达末尾
- }
- case 1: // EXECUTE (prepared statement)
- if offset+2 > len(body) {
- break
- }
- stmtIdLen := int(binary.BigEndian.Uint16(body[offset : offset+2]))
- offset += 2
- if stmtIdLen > 0 && offset+stmtIdLen <= len(body) {
- stmtIdBytes := body[offset : offset+stmtIdLen]
- stmtId := fmt.Sprintf("%x", stmtIdBytes)
- queries = append(queries, fmt.Sprintf("EXECUTE %s", stmtId))
- offset += stmtIdLen
- }
- default:
- // 跳过未知类型
- break
- }
- }
- batchTypeStr := "BATCH"
- if batchType == 0 {
- batchTypeStr = "BATCH LOGGED"
- } else if batchType == 1 {
- batchTypeStr = "BATCH UNLOGGED"
- } else if batchType == 2 {
- batchTypeStr = "BATCH COUNTER"
- }
- if len(queries) == 0 {
- return batchTypeStr + " (empty)"
- }
- if len(queries) == 1 {
- return fmt.Sprintf("%s: %s", batchTypeStr, queries[0])
- }
- return fmt.Sprintf("%s: [%d queries]", batchTypeStr, len(queries))
- }
- // Parse 方法用于支持预编译语句(需要维护状态)
- func (p *CassandraParser) Parse(payload []byte) string {
- if len(payload) < CassandraHeaderSize {
- return ""
- }
- version := payload[0]
- if version != 0x04 {
- return ""
- }
- // 提取 stream_id (在 payload[2:4] 位置,big-endian)
- streamId := int16(binary.BigEndian.Uint16(payload[2:4]))
- opcode := payload[4]
- bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
- if len(payload) < CassandraHeaderSize+bodyLength {
- bodyLength = len(payload) - CassandraHeaderSize
- }
- body := payload[CassandraHeaderSize:]
- if len(body) > bodyLength {
- body = body[:bodyLength]
- }
- switch opcode {
- case CassandraOpcodeQuery:
- return parseQuery(body)
- case CassandraOpcodeExecute:
- return p.parseExecuteWithStatements(body)
- case CassandraOpcodeBatch:
- return parseBatch(body)
- case CassandraOpcodePrepare:
- return p.parsePrepare(body, streamId)
- case CassandraOpcodeClose:
- return p.parseClose(body)
- default:
- return ""
- }
- }
- // parseExecuteWithStatements 解析 EXECUTE 操作,尝试从预编译语句映射中查找
- func (p *CassandraParser) parseExecuteWithStatements(body []byte) string {
- if len(body) < 2 {
- return ""
- }
- stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
- if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
- return ""
- }
- if stmtIdLen == 0 {
- return ""
- }
- stmtIdBytes := body[2 : 2+stmtIdLen]
- stmtId := fmt.Sprintf("%x", stmtIdBytes)
- statement, ok := p.preparedStatements[stmtId]
- if ok {
- return fmt.Sprintf("EXECUTE: %s", statement)
- }
- // 如果找不到预编译语句,尝试使用最近一次 PREPARE 的查询语句(备用方案)
- if p.lastPrepareQuery != "" {
- // 使用 lastPrepareQuery,即使 statement id 不匹配
- return fmt.Sprintf("EXECUTE: %s", p.lastPrepareQuery)
- }
- // 如果还是找不到,显示简化的 statement id
- if len(stmtId) > 16 {
- return fmt.Sprintf("EXECUTE [%s...]", stmtId[:16])
- }
- return fmt.Sprintf("EXECUTE [%s]", stmtId)
- }
- // parsePrepare 解析 PREPARE 操作,保存预编译语句
- func (p *CassandraParser) parsePrepare(body []byte, streamId int16) string {
- query := parseQuery(body)
- if query == "" {
- return ""
- }
- // 保存查询语句和 stream_id 的映射,等待响应时关联 statement id
- p.pendingPrepares[streamId] = query
- // 同时保存为最近一次 PREPARE 的查询语句(备用方案)
- p.lastPrepareQuery = query
- return fmt.Sprintf("PREPARE: %s", query)
- }
- // parseClose 解析 CLOSE 操作,清除预编译语句
- func (p *CassandraParser) parseClose(body []byte) string {
- if len(body) < 2 {
- return ""
- }
- stmtIdLen := int(binary.BigEndian.Uint16(body[0:2]))
- if stmtIdLen < 0 || stmtIdLen > len(body)-2 {
- return ""
- }
- if stmtIdLen == 0 {
- return ""
- }
- stmtIdBytes := body[2 : 2+stmtIdLen]
- stmtId := fmt.Sprintf("%x", stmtIdBytes)
- delete(p.preparedStatements, stmtId)
- return fmt.Sprintf("CLOSE %s", stmtId)
- }
- // GetLastPrepareQuery 返回最近一次 PREPARE 的查询语句(用于调试)
- func (p *CassandraParser) GetLastPrepareQuery() string {
- return p.lastPrepareQuery
- }
- // ParseResponse 解析 Cassandra 响应,用于关联 PREPARE 请求和响应
- // 响应格式: version(1) + flags(1) + stream_id(2) + opcode(1) + length(4) + body
- func (p *CassandraParser) ParseResponse(payload []byte) {
- if len(payload) < CassandraHeaderSize {
- return
- }
- version := payload[0]
- if version != 0x84 { // RESPONSE_FRAME
- return
- }
- // 提取 stream_id
- streamId := int16(binary.BigEndian.Uint16(payload[2:4]))
- opcode := payload[4]
- bodyLength := int(binary.BigEndian.Uint32(payload[5:9]))
- if len(payload) < CassandraHeaderSize+bodyLength {
- bodyLength = len(payload) - CassandraHeaderSize
- }
- body := payload[CassandraHeaderSize:]
- if len(body) > bodyLength {
- body = body[:bodyLength]
- }
- // 只处理 RESULT 类型的响应(PREPARE 响应是 RESULT 类型)
- if opcode == 0x08 { // CASSANDRA_OPCODE_RESULT
- // 检查是否是 PREPARE 响应
- // PREPARE 响应的 body 格式: result kind (4 bytes) + statement id (short bytes) + ...
- if len(body) >= 4 {
- resultKind := binary.BigEndian.Uint32(body[0:4])
- // result kind = 0x0004 表示 PREPARED
- if resultKind == 0x0004 {
- // 提取 statement id
- if len(body) >= 6 {
- stmtIdLen := int(binary.BigEndian.Uint16(body[4:6]))
- if stmtIdLen > 0 && len(body) >= 6+stmtIdLen {
- stmtIdBytes := body[6 : 6+stmtIdLen]
- stmtId := fmt.Sprintf("%x", stmtIdBytes)
- // 从 pendingPrepares 中查找对应的查询语句
- if query, ok := p.pendingPrepares[streamId]; ok {
- // 关联 statement id 和查询语句
- p.preparedStatements[stmtId] = query
- // 删除临时映射
- delete(p.pendingPrepares, streamId)
- }
- }
- }
- }
- }
- }
- }
|