package l7 import ( "encoding/binary" "fmt" ) const ( CassandraHeaderSize = 9 // version(1) + flags(1) + stream_id(2) + opcode(1) + length(4) CassandraOpcodeQuery = 0x07 CassandraOpcodeExecute = 0x0A CassandraOpcodeBatch = 0x0D CassandraOpcodePrepare = 0x09 CassandraOpcodeClose = 0x04 ) type CassandraParser struct { preparedStatements map[string]string // statement_id (hex string) -> query pendingPrepares map[int16]string // stream_id -> query (用于关联 PREPARE 请求和响应) lastPrepareQuery string // 最近一次 PREPARE 的查询语句(备用方案) } func NewCassandraParser() *CassandraParser { return &CassandraParser{ preparedStatements: make(map[string]string), pendingPrepares: make(map[int16]string), lastPrepareQuery: "", } } // ParseCassandra 解析 Cassandra CQL 查询语句 // 支持 QUERY、EXECUTE、BATCH、PREPARE 操作 func ParseCassandra(payload []byte) string { if len(payload) < CassandraHeaderSize { return "" } // 读取 header version := payload[0] if version != 0x04 { // REQUEST_FRAME return "" } opcode := payload[4] bodyLength := int(binary.BigEndian.Uint32(payload[5:9])) if len(payload) < CassandraHeaderSize+bodyLength { // 数据不完整,但尝试解析 bodyLength = len(payload) - CassandraHeaderSize } body := payload[CassandraHeaderSize:] if len(body) > bodyLength { body = body[:bodyLength] } switch opcode { case CassandraOpcodeQuery: return parseQuery(body) case CassandraOpcodeExecute: return parseExecute(body) case CassandraOpcodeBatch: return parseBatch(body) default: return "" } } // parseQuery 解析 QUERY 操作 // QUERY 格式: query string (long string) + consistency (2 bytes) + flags (1 byte) + ... // long string 格式: 4 bytes (length, big-endian) + string bytes (NOT null-terminated) func parseQuery(body []byte) string { if len(body) < 4 { return "" } // 读取 long string 的长度(4 bytes, big-endian) queryLen := int(binary.BigEndian.Uint32(body[0:4])) if queryLen <= 0 { return "" } // 检查是否有足够的数据 availableData := len(body) - 4 if queryLen > availableData { // 数据被截断,只读取可用的部分 if availableData > 0 { queryBytes := body[4 : 4+availableData] // 尝试找到字符串的结束位置(可能是null terminator或有效字符串的末尾) // 但Cassandra协议中long string不是null-terminated,所以直接读取 query := string(queryBytes) // 检查是否包含可打印字符 if isPrintableString(queryBytes) { return query + "..." } return "" } return "" } // 读取完整的查询字符串 queryBytes := body[4 : 4+queryLen] // Cassandra的long string不是null-terminated,但某些实现可能会添加null terminator // 检查并移除末尾的null terminator(如果存在) if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 { queryBytes = queryBytes[:len(queryBytes)-1] } // 验证是否为有效的字符串(包含可打印字符) if !isPrintableString(queryBytes) { return "" } query := string(queryBytes) return query } // isPrintableString 检查字节数组是否包含可打印字符 func isPrintableString(b []byte) bool { if len(b) == 0 { return false } printableCount := 0 for _, c := range b { // 允许可打印ASCII字符、换行符、制表符等 if c >= 32 && c <= 126 || c == '\n' || c == '\r' || c == '\t' { printableCount++ } } // 如果至少80%的字符是可打印的,认为是有效字符串 return printableCount*100/len(b) >= 80 } // parseExecute 解析 EXECUTE 操作 // EXECUTE 格式: statement id (short bytes) + ... func parseExecute(body []byte) string { if len(body) < 2 { return "" } // 读取 short bytes (2 bytes length + bytes) stmtIdLen := int(binary.BigEndian.Uint16(body[0:2])) if stmtIdLen < 0 || stmtIdLen > len(body)-2 { return "" } if stmtIdLen == 0 { return "" } stmtIdBytes := body[2 : 2+stmtIdLen] stmtId := fmt.Sprintf("%x", stmtIdBytes) return fmt.Sprintf("EXECUTE %s /* unknown */", stmtId) } // parseBatch 解析 BATCH 操作 // BATCH 格式: batch type (1 byte) + queries count (2 bytes) + queries... func parseBatch(body []byte) string { if len(body) < 3 { return "" } batchType := body[0] queriesCount := int(binary.BigEndian.Uint16(body[1:3])) if queriesCount == 0 { return "BATCH (empty)" } offset := 3 var queries []string for i := 0; i < queriesCount && offset < len(body); i++ { if offset >= len(body) { break } // 每个查询: kind (1 byte) + query string 或 statement id if offset+1 > len(body) { break } kind := body[offset] offset++ switch kind { case 0: // QUERY if offset+4 > len(body) { break } queryLen := int(binary.BigEndian.Uint32(body[offset : offset+4])) offset += 4 if queryLen > 0 && offset+queryLen <= len(body) { queryBytes := body[offset : offset+queryLen] // 移除null terminator(如果存在) if len(queryBytes) > 0 && queryBytes[len(queryBytes)-1] == 0 { queryBytes = queryBytes[:len(queryBytes)-1] } // 验证是否为有效字符串 if isPrintableString(queryBytes) { queries = append(queries, string(queryBytes)) } else { queries = append(queries, "") } offset += queryLen } else if queryLen > 0 && offset < len(body) { // 数据被截断 availableData := len(body) - offset queryBytes := body[offset : offset+availableData] if isPrintableString(queryBytes) { queries = append(queries, string(queryBytes)+"...") } offset = len(body) // 到达末尾 } case 1: // EXECUTE (prepared statement) if offset+2 > len(body) { break } stmtIdLen := int(binary.BigEndian.Uint16(body[offset : offset+2])) offset += 2 if stmtIdLen > 0 && offset+stmtIdLen <= len(body) { stmtIdBytes := body[offset : offset+stmtIdLen] stmtId := fmt.Sprintf("%x", stmtIdBytes) queries = append(queries, fmt.Sprintf("EXECUTE %s", stmtId)) offset += stmtIdLen } default: // 跳过未知类型 break } } batchTypeStr := "BATCH" if batchType == 0 { batchTypeStr = "BATCH LOGGED" } else if batchType == 1 { batchTypeStr = "BATCH UNLOGGED" } else if batchType == 2 { batchTypeStr = "BATCH COUNTER" } if len(queries) == 0 { return batchTypeStr + " (empty)" } if len(queries) == 1 { return fmt.Sprintf("%s: %s", batchTypeStr, queries[0]) } return fmt.Sprintf("%s: [%d queries]", batchTypeStr, len(queries)) } // Parse 方法用于支持预编译语句(需要维护状态) func (p *CassandraParser) Parse(payload []byte) string { if len(payload) < CassandraHeaderSize { return "" } version := payload[0] if version != 0x04 { return "" } // 提取 stream_id (在 payload[2:4] 位置,big-endian) streamId := int16(binary.BigEndian.Uint16(payload[2:4])) opcode := payload[4] bodyLength := int(binary.BigEndian.Uint32(payload[5:9])) if len(payload) < CassandraHeaderSize+bodyLength { bodyLength = len(payload) - CassandraHeaderSize } body := payload[CassandraHeaderSize:] if len(body) > bodyLength { body = body[:bodyLength] } switch opcode { case CassandraOpcodeQuery: return parseQuery(body) case CassandraOpcodeExecute: return p.parseExecuteWithStatements(body) case CassandraOpcodeBatch: return parseBatch(body) case CassandraOpcodePrepare: return p.parsePrepare(body, streamId) case CassandraOpcodeClose: return p.parseClose(body) default: return "" } } // parseExecuteWithStatements 解析 EXECUTE 操作,尝试从预编译语句映射中查找 func (p *CassandraParser) parseExecuteWithStatements(body []byte) string { if len(body) < 2 { return "" } stmtIdLen := int(binary.BigEndian.Uint16(body[0:2])) if stmtIdLen < 0 || stmtIdLen > len(body)-2 { return "" } if stmtIdLen == 0 { return "" } stmtIdBytes := body[2 : 2+stmtIdLen] stmtId := fmt.Sprintf("%x", stmtIdBytes) statement, ok := p.preparedStatements[stmtId] if ok { return fmt.Sprintf("EXECUTE: %s", statement) } // 如果找不到预编译语句,尝试使用最近一次 PREPARE 的查询语句(备用方案) if p.lastPrepareQuery != "" { // 使用 lastPrepareQuery,即使 statement id 不匹配 return fmt.Sprintf("EXECUTE: %s", p.lastPrepareQuery) } // 如果还是找不到,显示简化的 statement id if len(stmtId) > 16 { return fmt.Sprintf("EXECUTE [%s...]", stmtId[:16]) } return fmt.Sprintf("EXECUTE [%s]", stmtId) } // parsePrepare 解析 PREPARE 操作,保存预编译语句 func (p *CassandraParser) parsePrepare(body []byte, streamId int16) string { query := parseQuery(body) if query == "" { return "" } // 保存查询语句和 stream_id 的映射,等待响应时关联 statement id p.pendingPrepares[streamId] = query // 同时保存为最近一次 PREPARE 的查询语句(备用方案) p.lastPrepareQuery = query return fmt.Sprintf("PREPARE: %s", query) } // parseClose 解析 CLOSE 操作,清除预编译语句 func (p *CassandraParser) parseClose(body []byte) string { if len(body) < 2 { return "" } stmtIdLen := int(binary.BigEndian.Uint16(body[0:2])) if stmtIdLen < 0 || stmtIdLen > len(body)-2 { return "" } if stmtIdLen == 0 { return "" } stmtIdBytes := body[2 : 2+stmtIdLen] stmtId := fmt.Sprintf("%x", stmtIdBytes) delete(p.preparedStatements, stmtId) return fmt.Sprintf("CLOSE %s", stmtId) } // GetLastPrepareQuery 返回最近一次 PREPARE 的查询语句(用于调试) func (p *CassandraParser) GetLastPrepareQuery() string { return p.lastPrepareQuery } // ParseResponse 解析 Cassandra 响应,用于关联 PREPARE 请求和响应 // 响应格式: version(1) + flags(1) + stream_id(2) + opcode(1) + length(4) + body func (p *CassandraParser) ParseResponse(payload []byte) { if len(payload) < CassandraHeaderSize { return } version := payload[0] if version != 0x84 { // RESPONSE_FRAME return } // 提取 stream_id streamId := int16(binary.BigEndian.Uint16(payload[2:4])) opcode := payload[4] bodyLength := int(binary.BigEndian.Uint32(payload[5:9])) if len(payload) < CassandraHeaderSize+bodyLength { bodyLength = len(payload) - CassandraHeaderSize } body := payload[CassandraHeaderSize:] if len(body) > bodyLength { body = body[:bodyLength] } // 只处理 RESULT 类型的响应(PREPARE 响应是 RESULT 类型) if opcode == 0x08 { // CASSANDRA_OPCODE_RESULT // 检查是否是 PREPARE 响应 // PREPARE 响应的 body 格式: result kind (4 bytes) + statement id (short bytes) + ... if len(body) >= 4 { resultKind := binary.BigEndian.Uint32(body[0:4]) // result kind = 0x0004 表示 PREPARED if resultKind == 0x0004 { // 提取 statement id if len(body) >= 6 { stmtIdLen := int(binary.BigEndian.Uint16(body[4:6])) if stmtIdLen > 0 && len(body) >= 6+stmtIdLen { stmtIdBytes := body[6 : 6+stmtIdLen] stmtId := fmt.Sprintf("%x", stmtIdBytes) // 从 pendingPrepares 中查找对应的查询语句 if query, ok := p.pendingPrepares[streamId]; ok { // 关联 statement id 和查询语句 p.preparedStatements[stmtId] = query // 删除临时映射 delete(p.pendingPrepares, streamId) } } } } } } }