package otlptrace import ( "crypto/md5" "encoding/json" "fmt" "math" "net/url" "sort" "strconv" "sync" . "github.com/coroot/coroot-node-agent/ebpftracer" "github.com/coroot/coroot-node-agent/ebpftracer/l7" "github.com/coroot/coroot-node-agent/utils" klog "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform" tracesdk "go.opentelemetry.io/otel/sdk/trace" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) const ( APP_SERVICE_TYPE = "APPLICATION" SQL_SERVICE_TYPE = "SQL" NOSQL_SERVICE_TYPE = "NOSQL" HTTP_SERVICE_TYPE = "HTTP" NET_SERVICE_TYPE = "L7_NET" RPC_SERVICE_TYPE = "RPC" MQ_SERVICE_TYPE = "MQ" ) const ( GO_SERVICE_NAME = "GO" MYSQL_SERVICE_NAME = "MYSQL" DM_SERVICE_NAME = "DM" REDIS_SERVICE_NAME = "REDIS" MONGO_SERVICE_NAME = "MONGODB" HTTP_SERVICE_NAME = "HTTPCLIENT" POSTGRESQL_SERVICE_NAME = "POSTGRESQL" GRPC_SERVICE_NAME = "GRPC" ) type apmTraceSpan tracesdk.ReadOnlySpan // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110 type RootDataT struct { AccountId int `json:"account_id"` AgentId int64 `json:"agent_id"` AgentVersion string `json:"agent_version"` AppId int64 `json:"app_id"` AppIdFrom int64 `json:"app_id_from"` // from header app_id AppName string `json:"app_name"` CalledId int64 `json:"called_id"` // from header assumed_app_id ClientIp string `json:"client_ip"` CollTime uint64 `json:"coll_time"` Cpu int `json:"cpu"` Custom string `json:"custom"` HostId int64 `json:"host_id"` HostName string `json:"host_name"` HttpCode int64 `json:"http_code"` HttpMethod string `json:"http_method"` InstanceId int64 `json:"instance_id"` InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id LocalPort int64 `json:"local_port"` Maps []MapInfoT `json:"maps"` MemU int `json:"mem_u"` MemUP int `json:"mem_u_p"` OperType string `json:"oper_type"` Parameters []ParamStruct `json:"parameters,omitempty"` ParentTaskName string `json:"parent_task_name"` Period int `json:"period"` RespTime uint64 `json:"resp_time"` Sampling int `json:"sampling"` ServiceName string `json:"service_name"` ServiceType string `json:"service_type"` Sip string `json:"sip"` Sn string `json:"sn"` SpanIdFrom string `json:"span_id_from"` // from header span_id Sport int64 `json:"sport"` TId int `json:"t_id"` TName string `json:"t_name"` TraceId string `json:"trace_id"` // from header trace_id TransIds []interface{} `json:"trans_ids"` TypeFrom string `json:"type_from"` Uri string `json:"uri"` UserDir string `json:"user_dir"` VipIds []interface{} `json:"vip_ids"` SrcAddr string `json:"src_addr"` DestinationAddr string `json:"destination_addr"` UserAgent string `json:"user_agent"` // op 新增字段 Pid uint32 `json:"pid"` ContainerID string `json:"container_id"` Sys string `json:"sys"` SystemUUID string `json:"system_uuid"` ParentSys string `json:"parent_sys"` // from cwother header: SysTag AppNameFrom string `json:"app_name_from"` // from cwother header: app_name ServiceTypeFrom string `json:"service_type_from"` // from cwother header: appServiceType } // ParamStruct 定义目标结构 type ParamStruct struct { Name string `json:"name"` Values []string `json:"values"` } type MapInfoT struct { Dbn string `json:"dbn,omitempty"` Exception int `json:"exception,omitempty"` ExceptionMsg string `json:"exception_msg,omitempty"` ExceptionStack string `json:"exception_stack,omitempty"` Ip string `json:"ip,omitempty"` Level int `json:"level"` MethodDesc string `json:"method_desc,omitempty"` MethodName string `json:"method_name"` Nid int `json:"nid"` OperType string `json:"oper_type,omitempty"` Pid int `json:"pid"` Port int64 `json:"port,omitempty"` Ps []string `json:"ps,omitempty"` PureTime uint64 `json:"pure_time"` ServiceName string `json:"service_name"` ServiceType string `json:"service_type"` StartTime uint64 `json:"-"` EndTime uint64 `json:"-"` StartTimeMs uint64 `json:"start_time"` EndTimeMs uint64 `json:"end_time"` WallTime uint64 `json:"wall_time"` Schema string `json:"schema,omitempty"` AssumedAppId int64 `json:"assumed_app_id,omitempty"` Uri string `json:"uri,omitempty"` SpanId string `json:"span_id,omitempty"` SrcAddr string `json:"src_addr,omitempty"` DestinationAddr string `json:"destination_addr,omitempty"` } type TraceMapT struct { RootData RootDataT Index int lock *sync.RWMutex TheEnd bool } var TraceRootMap map[string]*TraceMapT func init() { TraceRootMap = make(map[string]*TraceMapT) //go func() { // for { // //fmt.Println(G_sdl) // time.Sleep(5 * time.Second) // } //}() } var G_sdl int func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT { //G_sdl += len(sdl) if len(sdl) == 0 { return nil } // 多次请求 sdl sendDataMap := make(map[int][]RootDataT) //sendData := []RootDataT{} for _, sd := range sdl { if sd == nil { continue } //traceId := sd.SpanContext().TraceID().String() //fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据 // 构建map *RootDataT var rootData RootDataT rootData = initRootDataFromEvent() // build http入口 MapInfoT code_type := buildAppMapFromEvent(&rootData, sd) // 构建maps for _, event := range sd.Events() { //aaa, _ := json.Marshal(event) //fmt.Println("event.info", string(aaa)) mNode := buildMapNodeFromEvent(event) switch EventType(event.EventType) { // stack case EventTypeFunEnt: // l7 event case EventTypeL7Request: switch l7.Protocol(event.ProtocolType) { case l7.ProtocolHTTP: buildHttpMapFromEvent(&mNode, event) case l7.ProtocolDNS: buildDNSMapEvent(&mNode, event) case l7.ProtocolMysql, l7.ProtocolMariaDB, l7.ProtocolTiDB, l7.ProtocolPostgres, l7.ProtocolDM: buildSQLMapEvent(&mNode, event) case l7.ProtocolRedis, l7.ProtocolMemcached, l7.ProtocolCassandra, l7.ProtocolES: buildNoSqlMapEvent(&mNode, event) case l7.ProtocolMongo: buildMongoMapEvent(&mNode, event) case l7.ProtocolGrpc: buildGrpcMapEvent(&mNode, event) case l7.ProtocolKafka: buildMQMapEvent(&mNode, event) } } rootData.Maps = append(rootData.Maps, mNode) //fmt.Println(event.Name) //buildAndAssemblyMapFromEvent(event, rootData) } buildLevelFromEvent(&rootData) sendDataMap[code_type] = append(sendDataMap[code_type], rootData) //a, _ := json.Marshal(rootData) //fmt.Println(string(a)) //sendData = append(sendData, rootData) //if _, ok := TraceRootMap[traceId]; !ok { //TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1} //} //TraceRootMap[traceId].Index++ //buildAndAssemblyMap(sd, TraceRootMap[traceId]) } // 发送完整数据 | 大量长耗时请求会增加内存占用 //sendData := []RootDataT{} //for traceId, v := range TraceRootMap { // if v.TheEnd { // buildLevel(v) // sendData = append(sendData, v.RootData) // delete(TraceRootMap, traceId) // //fmt.Println("the end!") // } else { // //fmt.Println("not end!") // } //} //Transform the categorized map into a slice data, _ := json.Marshal(sendDataMap) klog.Debug(string(data)) //fmt.Println(len(sendData)) //fmt.Println("sdl len:", len(sdl)) return sendDataMap } type TimeMap struct { Time uint64 Type int Map *MapInfoT } //type TraceMapT struct { // RootData RootDataT // Index int // lock *sync.RWMutex // TheEnd bool //} //func buildLevel(sdl *TraceMapT) { // nidMap := make(map[int]*MapInfoT) // // mapSlice := []TimeMap{} // // for i, v := range sdl.RootData.Maps { // if v.ServiceType == "APPLICATION" { // continue // } // nidMap[v.Nid] = &sdl.RootData.Maps[i] // timeStartMap := TimeMap{ // Time: v.StartTime, // Type: 0, // Map: &sdl.RootData.Maps[i], // } // mapSlice = append(mapSlice, timeStartMap) // timeEndMap := TimeMap{ // Time: v.EndTime, // Type: 1, // Map: &sdl.RootData.Maps[i], // } // mapSlice = append(mapSlice, timeEndMap) // } // sort.Slice(mapSlice, func(i, j int) bool { // return mapSlice[i].Time < mapSlice[j].Time // }) // // funStack := []TimeMap{} // // currentNid := 1 // Nid := 2 // level := 2 // // for _, v := range mapSlice { // // fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid) // if v.Type == 0 { // // 函数入口 // funStack = append(funStack, v) // v.Map.Pid = currentNid // v.Map.Level = level // v.Map.Nid = Nid // currentNid = Nid // level += 1 // Nid += 1 // } else if v.Type == 1 { // // 函数出口 // len := len(funStack) // funStack = funStack[:len-1] // if (len - 2) < 0 { // currentNid = 1 // } else { // currentNid = funStack[len-2].Map.Nid // } // // level -= 1 // } // } //} func buildLevelFromEvent(sdl *RootDataT) { nidMap := make(map[int]*MapInfoT) mapSlice := []TimeMap{} for i, v := range sdl.Maps { if v.ServiceType == "APPLICATION" || v.Level == 1 { continue } nidMap[v.Nid] = &sdl.Maps[i] timeStartMap := TimeMap{ Time: v.StartTime, Type: 0, Map: &sdl.Maps[i], } mapSlice = append(mapSlice, timeStartMap) timeEndMap := TimeMap{ Time: v.EndTime, Type: 1, Map: &sdl.Maps[i], } mapSlice = append(mapSlice, timeEndMap) } sort.Slice(mapSlice, func(i, j int) bool { return mapSlice[i].Time < mapSlice[j].Time }) funStack := []TimeMap{} currentNid := 1 Nid := 2 level := 2 for _, v := range mapSlice { //klog.Debugln("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid) if v.Type == 0 { // 函数入口 funStack = append(funStack, v) v.Map.Pid = currentNid v.Map.Level = level v.Map.Nid = Nid currentNid = Nid level += 1 Nid += 1 } else if v.Type == 1 { // 函数出口 len := len(funStack) funStack = funStack[:len-1] if (len - 2) < 0 { currentNid = 1 } else { currentNid = funStack[len-2].Map.Nid } level -= 1 } } } //func initRootData(traceId string) RootDataT { // data := RootDataT{ // AccountId: 110, // AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成 // AgentVersion: "2.1.0", // AppId: 5410049101545798, // TODO 更新 基于appname生成 // AppIdFrom: -1, // AppName: "eBPF-agent", // TODO 更新 ip:port || process_name // CalledId: -1, // ClientIp: "", // CollTime: 0, // Cpu: 0, // Custom: "", // HostId: 10154813500555812, // HostName: "localhost", // HttpCode: 0, // HttpMethod: "", // InstanceId: 1005051101515357, // TODO 更新 基于ip:port // InstanceIdFrom: -1, // Maps: []MapInfoT{}, // MemU: 0, // MemUP: 0, // OperType: "", // Parameters: []interface{}{}, // ParentTaskName: 0, // Period: -1, // RespTime: 0, // Sampling: 0, // ServiceName: "GO", // ServiceType: APP_SERVICE_TYPE, // Sip: "", // Sn: "", // SpanIdFrom: "", // Sport: 0, // TId: -1, // TName: "", // TraceId: traceId, // TransIds: []interface{}{}, // TypeFrom: "", // Uri: "", // UserDir: 0, // VipIds: []interface{}{}, // } // return data //} func initRootDataFromEvent() RootDataT { hostID := utils.GetHostID() accountID := utils.GetAccountID() sip := utils.GetHostIP() sysTag := utils.GetSysTag() systemUUID := utils.GetSystemUUID() agentVersion := utils.GetAgentVersion() data := RootDataT{ // todo AccountId AccountId: accountID, AgentId: 0, // 基于 ip:port + process_name + exe路径生成 AgentVersion: agentVersion, AppId: 0, // 基于appname生成 AppIdFrom: -1, AppName: "eBPF-agent", // server配置 CalledId: -1, ClientIp: "", CollTime: 0, Cpu: 0, Custom: "", HostId: hostID, HostName: "localhost", HttpCode: 0, HttpMethod: "", InstanceId: 0, // 基于ip:port InstanceIdFrom: -1, Maps: []MapInfoT{}, MemU: 0, MemUP: 0, OperType: "", Parameters: []ParamStruct{}, ParentTaskName: "", Period: -1, RespTime: 0, Sampling: 0, ServiceName: "", ServiceType: APP_SERVICE_TYPE, Sip: sip, Sn: "", SpanIdFrom: "", Sport: 0, TId: -1, TName: "", TraceId: "", TransIds: []interface{}{}, TypeFrom: "", Uri: "", UserDir: "", VipIds: []interface{}{}, SrcAddr: "", DestinationAddr: "", Sys: sysTag, SystemUUID: systemUUID, UserAgent: "", } return data } func initRootDataJava() RootDataT { data := RootDataT{ AccountId: 110, AgentId: 3934815089541000, // TODO 更新 基于 ip:port + process_name + exe路径生成 AgentVersion: "2.21.0", AppId: 3365853273187618, // TODO 更新 基于appname生成 AppIdFrom: -1, AppName: "eBPF-javaApplication", // TODO 更新 ip:port || process_name CalledId: -1, ClientIp: "", CollTime: 0, Cpu: 0, Custom: "", HostId: 2315065183171055, HostName: "localhost", HttpCode: 0, HttpMethod: "", InstanceId: 1128864082033413, // TODO 更新 基于ip:port InstanceIdFrom: -1, Maps: []MapInfoT{}, MemU: 0, MemUP: 0, OperType: "", Parameters: []ParamStruct{}, ParentTaskName: "", Period: -1, RespTime: 0, Sampling: 0, ServiceName: "TOMCAT", ServiceType: APP_SERVICE_TYPE, Sip: "", Sn: "", SpanIdFrom: "", Sport: 0, TId: -1, TName: "", TraceId: "", TransIds: []interface{}{}, TypeFrom: "", Uri: "", UserDir: "", VipIds: []interface{}{}, } return data } //func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) { // mNode := MapInfoT{ // Exception: 0, // ExceptionMsg: "", // ExceptionStack: "", // Ip: "", // Level: 2, // Pid: 1, // Port: 0, // Ps: []string{}, // ServiceName: "", // ServiceType: "", // WallTime: 0, // } // mNode.MethodName = spanSd.Name // mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3 // mNode.WallTime = mNode.PureTime // mNode.StartTime = spanSd.StartTimeUnixNano // mNode.EndTime = spanSd.EndTimeUnixNano // // for _, attr := range spanSd.GetAttributes() { // fmt.Println(attr.Key, ":", attr.Value.GetValue()) // // switch attr.Key { // case "nid": // mNode.Nid = int(attr.Value.GetIntValue()) // case "pid": // mNode.Pid = int(attr.Value.GetIntValue()) // case "level": // mNode.Level = int(attr.Value.GetIntValue()) // } // } // // return mNode, spanSd.Name //} func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT { mNode := MapInfoT{ Exception: 0, ExceptionMsg: "", ExceptionStack: "", Ip: "", Level: 2, Pid: 1, Port: 0, Ps: []string{}, ServiceName: "", ServiceType: "", WallTime: 0, } mNode.MethodName = event.Name //mNode.PureTime = (event.EndTimeUnixNano - event.StartTimeUnixNano) / 1e3 //mNode.WallTime = mNode.PureTime //mNode.StartTime = spanSd.StartTimeUnixNano //mNode.EndTime = spanSd.EndTimeUnixNano for _, attr := range event.Attributes { //fmt.Println(event.Name, "--->buildMapNodeFromEvent--->", attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "nid": mNode.Nid = int(attr.Value.AsInt64()) case "pid": mNode.Pid = int(attr.Value.AsInt64()) case "level": mNode.Level = int(attr.Value.AsInt64()) case "time.start_at": mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64()) case "time.end_at": mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64()) case "time.duration": //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3 mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3 } } return mNode } func parseURIToParams(input string) (string, []ParamStruct, error) { // 解析输入 URI parsedURL, err := url.Parse(input) if err != nil { return "", nil, fmt.Errorf("failed to parse URI: %w", err) } // 提取查询参数 queryParams := parsedURL.Query() // 转换为目标结构 var params []ParamStruct for key, values := range queryParams { params = append(params, ParamStruct{ Name: key, Values: values, }) } return parsedURL.Path, params, nil } // 构建拼装 //func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT { // mNode, mapType := initMapNode(span(sd)) // switch mapType { // case "APPLICATION": // buildAppMap(&mNode, traceRoot, sd) // traceRoot.TheEnd = true // case "HTTP": // buildHttpMap(&mNode, sd) // case "Mysql": // buildMysqlMap(&mNode, sd) // case "Redis": // buildRedisMap(&mNode, sd) // } // if mapType != "" { // mNode.Nid = traceRoot.Index // traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode) // } // return mNode //} //func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT { // mNode := buildMapNodeFromEvent(event) // switch mapType { // case "HTTP": // buildHttpMapFromEvent(mNode, event) // //case "Mysql": // // buildMysqlMap(mNode, sd) // //case "Redis": // // buildRedisMap(mNode, sd) // } // if mapType != "" { // //mNode.Nid = traceRoot.Index // traceRoot.Maps = append(traceRoot.Maps, mNode) // } // return mNode //} //func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) { // mNode.ServiceName = GO_SERVICE_NAME // mNode.ServiceType = APP_SERVICE_TYPE // mNode.MethodName = "net/http.(*Transport).roundTrip()" // mNode.Level = 1 // mNode.Pid = 0 // mNode.Nid = 1 // // 构建root节点 // traceRoot.RootData.RespTime = mNode.PureTime // traceRoot.RootData.CollTime = mNode.StartTime // traceRoot.Index = 1 // for _, attr := range sd.Attributes() { // fmt.Println(attr.Key, ":", attr.Value.AsInterface()) // switch attr.Key { // case "http.uri": // traceRoot.RootData.Uri = attr.Value.AsString() // case "http.method": // traceRoot.RootData.HttpMethod = attr.Value.AsString() // case "http.status_code": // traceRoot.RootData.HttpCode = attr.Value.AsInt64() // case "net.peer.name": // traceRoot.RootData.ClientIp = attr.Value.AsString() // traceRoot.RootData.Sip = attr.Value.AsString() // traceRoot.RootData.Sn = attr.Value.AsString() // case "net.peer.port": // traceRoot.RootData.Sport = attr.Value.AsInt64() // traceRoot.RootData.LocalPort = attr.Value.AsInt64() // case "server.trace_id_from": // traceRoot.RootData.TraceId = attr.Value.AsString() // case "server.called_id": // traceRoot.RootData.CalledId = attr.Value.AsInt64() // case "server.instance_id_from": // traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64() // case "server.app_id_from": // traceRoot.RootData.AppIdFrom = attr.Value.AsInt64() // case "server.span_id_from": // traceRoot.RootData.SpanIdFrom = attr.Value.AsString() // case "server.type_from": // traceRoot.RootData.TypeFrom = attr.Value.AsString() // } // } // //} func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int { mNode := MapInfoT{ Exception: 0, ExceptionMsg: "", ExceptionStack: "", Ip: "", Level: 1, Pid: 1, Port: 0, Ps: []string{}, ServiceName: "", ServiceType: "", WallTime: 0, } mNode.ServiceName = GO_SERVICE_NAME mNode.ServiceType = APP_SERVICE_TYPE mNode.MethodName = "Kernel Endpoint()" mNode.Level = 1 mNode.Pid = 0 mNode.Nid = 1 var code_type int64 // 构建root节点 //traceRoot.RespTime = mNode.PureTimex //traceRoot.CollTime = mNode.StartTime for _, attr := range sd.Attributes() { klog.Debugln("Appmap:", attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "http.uri": traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString()) case "rpc.uri": traceRoot.Uri = attr.Value.AsString() mNode.Uri = attr.Value.AsString() case "http.method": traceRoot.HttpMethod = attr.Value.AsString() case "http.status_code": traceRoot.HttpCode = attr.Value.AsInt64() case "net.peer.name": // TODO 修改 ClientIp sip获取方式 traceRoot.ClientIp = attr.Value.AsString() //traceRoot.Sip = attr.Value.AsString() traceRoot.Sn = attr.Value.AsString() case "net.peer.port": traceRoot.Sport = attr.Value.AsInt64() traceRoot.LocalPort = attr.Value.AsInt64() case "server.trace_id_from": traceRoot.TraceId = attr.Value.AsString() case "server.called_id": traceRoot.CalledId = attr.Value.AsInt64() case "server.instance_id_from": traceRoot.InstanceIdFrom = attr.Value.AsInt64() case "server.app_id_from": traceRoot.AppIdFrom = attr.Value.AsInt64() case "server.span_id_from": traceRoot.SpanIdFrom = attr.Value.AsString() case "server.type_from": traceRoot.TypeFrom = attr.Value.AsString() case "time.start_at": mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64()) traceRoot.CollTime = mNode.StartTimeMs case "time.end_at": mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64()) case "time.duration": traceRoot.RespTime = uint64(attr.Value.AsInt64()) / 1e3 mNode.PureTime = traceRoot.RespTime mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3 case "server.code_type": code_type = attr.Value.AsInt64() case "server.app_name": traceRoot.AppName = attr.Value.AsString() case "server.service_name": traceRoot.ServiceName = attr.Value.AsString() mNode.ServiceName = attr.Value.AsString() case "rpc.service_type": traceRoot.ServiceType = RPC_SERVICE_TYPE mNode.ServiceType = RPC_SERVICE_TYPE case "rpc.oper_type": traceRoot.OperType = "PROVIDER" mNode.OperType = "PROVIDER" // map tag case "service_type": mNode.ServiceType = attr.Value.AsString() // map tag case "oper_type": mNode.OperType = attr.Value.AsString() case "server.app_id": traceRoot.AppId = attr.Value.AsInt64() case "server.agent_id": traceRoot.AgentId = attr.Value.AsInt64() case "server.instance_id": traceRoot.InstanceId = attr.Value.AsInt64() case "server.src_addr": traceRoot.SrcAddr = attr.Value.AsString() case "server.dst_addr": traceRoot.DestinationAddr = attr.Value.AsString() case "server.pid": traceRoot.Pid = uint32(attr.Value.AsInt64()) case "server.container_id": traceRoot.ContainerID = attr.Value.AsString() case "server.user_agent": traceRoot.UserAgent = attr.Value.AsString() case "method_name": mNode.MethodName = attr.Value.AsString() case "mq.ip": mNode.Ip = attr.Value.AsString() case "mq.port": mNode.Port = attr.Value.AsInt64() // root tag case "server.service_type": traceRoot.ServiceType = attr.Value.AsString() // root tag case "server.oper_type": traceRoot.OperType = attr.Value.AsString() case "server.sysvc_from": // 解析 SysvcFrom 格式:{app_name_len}:app_name:{appServiceType_len}:appServiceType:{SysTagLen}[:SysTag] // 例如:08:eBPF-APP:12:APPLICATION:00 或 08:eBPF-APP:12:APPLICATION:03:tag sysvcFrom := attr.Value.AsString() appNameFrom, serviceTypeFrom, parentSys := parseSysvcFrom(sysvcFrom) traceRoot.AppNameFrom = appNameFrom traceRoot.ServiceTypeFrom = serviceTypeFrom traceRoot.ParentSys = parentSys case "service.parent_sys": // 支持直接从 attribute 传递 traceRoot.ParentSys = attr.Value.AsString() case "app_name_from": // 支持直接从 attribute 传递 traceRoot.AppNameFrom = attr.Value.AsString() } } traceRoot.Maps = append(traceRoot.Maps, mNode) return int(code_type) } //func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) { // mNode.ServiceName = HTTP_SERVICE_NAME // mNode.ServiceType = HTTP_SERVICE_TYPE // mNode.Schema = "http" // mNode.MethodName = "net/http.serverHandler.ServeHTTP()" // var descAddr string // for _, attr := range sd.Attributes() { // //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) // switch attr.Key { // case "http.ip": // mNode.Ip = attr.Value.AsString() // descAddr += mNode.Ip // case "http.port": // mNode.Port = attr.Value.AsInt64() // descAddr += ":" + attr.Value.AsString() // case "http.uri": // mNode.Uri = attr.Value.AsString() // case "http.assumed_app_id": // mNode.AssumedAppId = attr.Value.AsInt64() // case "http.span_id": // mNode.SpanId = attr.Value.AsString() // } // } // //mNode.AssumedAppId = Md5ToInt64(descAddr, 16) //} func buildGrpcMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.ServiceName = GRPC_SERVICE_NAME mNode.ServiceType = RPC_SERVICE_TYPE mNode.Schema = "grpc" //mNode.MethodName = "HTTP" //var descAddr string // var method string for _, attr := range event.Attributes { switch attr.Key { case "rpc.ip": mNode.Ip = attr.Value.AsString() //descAddr += mNode.Ip case "rpc.port": mNode.Port = attr.Value.AsInt64() case "rpc.method": //mNode.MethodName += " " + attr.Value.AsString() // method = attr.Value.AsString() mNode.MethodName = attr.Value.AsString() //descAddr += ":" + attr.Value.AsString() case "rpc.uri": mNode.Uri = attr.Value.AsString() //mNode.MethodName += " " + attr.Value.AsString() case "rpc.oper_type": mNode.OperType = attr.Value.AsString() case "rpc.assumed_app_id": mNode.AssumedAppId = attr.Value.AsInt64() case "rpc.span_id": mNode.SpanId = attr.Value.AsString() case "time.start_at": mNode.StartTime = uint64(attr.Value.AsInt64()) case "time.end_at": mNode.EndTime = uint64(attr.Value.AsInt64()) case "time.duration": //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3 mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3 } } // mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri) //mNode.AssumedAppId = Md5ToInt64(descAddr, 16) } func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.ServiceName = HTTP_SERVICE_NAME mNode.ServiceType = HTTP_SERVICE_TYPE mNode.Schema = "http" //mNode.MethodName = "HTTP" //var descAddr string var method string for _, attr := range event.Attributes { klog.Debugln("HTTP--->", attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "http.ip": mNode.Ip = attr.Value.AsString() //descAddr += mNode.Ip case "http.port": mNode.Port = attr.Value.AsInt64() case "http.method": //mNode.MethodName += " " + attr.Value.AsString() method = attr.Value.AsString() //descAddr += ":" + attr.Value.AsString() case "http.uri": mNode.Uri = attr.Value.AsString() //mNode.MethodName += " " + attr.Value.AsString() case "http.assumed_app_id": mNode.AssumedAppId = attr.Value.AsInt64() case "http.span_id": mNode.SpanId = attr.Value.AsString() case "time.start_at": mNode.StartTime = uint64(attr.Value.AsInt64()) case "time.end_at": mNode.EndTime = uint64(attr.Value.AsInt64()) case "time.duration": //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3 mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3 case "http.src_addr": mNode.SrcAddr = attr.Value.AsString() case "http.destination_addr": mNode.DestinationAddr = attr.Value.AsString() case "http.is_tls": if attr.Value.AsBool() { mNode.Schema = "https" } } } mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri) //mNode.AssumedAppId = Md5ToInt64(descAddr, 16) } //func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) { // mNode.Dbn = "unknown" // mNode.ServiceName = MYSQL_SERVICE_NAME // mNode.ServiceType = SQL_SERVICE_TYPE // mNode.MethodName = "database/sql.Query()" // for _, attr := range sd.Attributes() { // //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) // switch attr.Key { // case "net.peer.name": // mNode.Ip = attr.Value.AsString() // case "net.peer.port": // mNode.Port = attr.Value.AsInt64() // case "db.statement": // query := attr.Value.AsString() // mNode.Ps = []string{query} // words := strings.Fields(query) // if len(words) > 0 { // mNode.OperType = strings.ToUpper(words[0]) // } // } // } //} func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.Dbn = "-" mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString() mNode.ServiceType = SQL_SERVICE_TYPE //mNode.MethodName = "database/sql.Query()" for _, attr := range event.Attributes { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.MethodName = query mNode.Ps = []string{query} //words := strings.Fields(query) //if len(words) > 0 { // mNode.OperType = strings.ToUpper(words[0]) //} case "sql.exception": if attr.Value.AsBool() { mNode.Exception = 1 } else { mNode.Exception = 0 } case "sql.exception_msg": mNode.ExceptionMsg = attr.Value.AsString() case "sql.src_addr": mNode.SrcAddr = attr.Value.AsString() case "sql.destination_addr": mNode.DestinationAddr = attr.Value.AsString() case "sql.dbn": mNode.Dbn = attr.Value.AsString() } } } func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString() mNode.ServiceType = NET_SERVICE_TYPE var _type string var fqdn string var ips string var ttl int64 for _, attr := range event.Attributes { switch attr.Key { case "dns.type": _type = attr.Value.AsString() case "dns.fqdn": fqdn = attr.Value.AsString() case "dns.ttl": ttl = attr.Value.AsInt64() case "dns.ips": if attr.Value.AsString() != "" { ips = "Addr: " + attr.Value.AsString() } } } mNode.MethodName = fmt.Sprintf("DNS Name: %s Type: %s TTL: %d %s", fqdn, _type, ttl, ips) } func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.Dbn = "-" mNode.ServiceName = POSTGRESQL_SERVICE_NAME mNode.ServiceType = SQL_SERVICE_TYPE //mNode.MethodName = "database/sql.Query()" for _, attr := range event.Attributes { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.Ps = []string{query} //words := strings.Fields(query) //if len(words) > 0 { // mNode.OperType = strings.ToUpper(words[0]) //} case "sql.exception": if attr.Value.AsBool() { mNode.Exception = 1 } else { mNode.Exception = 0 } case "sql.src_addr": mNode.SrcAddr = attr.Value.AsString() case "sql.destination_addr": mNode.DestinationAddr = attr.Value.AsString() } } } func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.Dbn = "-" mNode.ServiceName = MYSQL_SERVICE_NAME mNode.ServiceType = SQL_SERVICE_TYPE //mNode.MethodName = "database/sql.Query()" for _, attr := range event.Attributes { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.MethodName = query mNode.Ps = []string{query} //words := strings.Fields(query) //if len(words) > 0 { // mNode.OperType = strings.ToUpper(words[0]) //} case "sql.exception": if attr.Value.AsBool() { mNode.Exception = 1 } else { mNode.Exception = 0 } case "sql.src_addr": mNode.SrcAddr = attr.Value.AsString() case "sql.destination_addr": mNode.DestinationAddr = attr.Value.AsString() } } } func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.Dbn = "TEST" mNode.ServiceName = DM_SERVICE_NAME mNode.ServiceType = SQL_SERVICE_TYPE mNode.MethodName = "database/sql.Query()" for _, attr := range event.Attributes { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.Ps = []string{query} //words := strings.Fields(query) //if len(words) > 0 { // mNode.OperType = strings.ToUpper(words[0]) //} case "sql.exception": if attr.Value.AsBool() { mNode.Exception = 1 } else { mNode.Exception = 0 } case "sql.src_addr": mNode.SrcAddr = attr.Value.AsString() case "sql.destination_addr": mNode.DestinationAddr = attr.Value.AsString() } } } func buildNoSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString() mNode.ServiceType = NOSQL_SERVICE_TYPE for _, attr := range event.Attributes { switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.MethodName = query mNode.Ps = []string{query} case "nosql.src_addr": mNode.SrcAddr = attr.Value.AsString() case "nosql.destination_addr": mNode.DestinationAddr = attr.Value.AsString() case "nosql.exception": if attr.Value.AsBool() { mNode.Exception = 1 } case "nosql.exception_msg": mNode.ExceptionMsg = attr.Value.AsString() } } } func buildMQMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString() mNode.ServiceType = MQ_SERVICE_TYPE for _, attr := range event.Attributes { switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "mq.info": query := attr.Value.AsString() mNode.MethodName = query case "mq.topic": mNode.Uri = "/" + attr.Value.AsString() case "mq.destination_addr": mNode.DestinationAddr = attr.Value.AsString() case "mq.exception": if attr.Value.AsBool() { mNode.Exception = 1 } case "mq.exception_msg": mNode.ExceptionMsg = attr.Value.AsString() case "mq.oper_type": mNode.OperType = attr.Value.AsString() case "mq.assumed_app_id": mNode.AssumedAppId = attr.Value.AsInt64() case "mq.span_id": mNode.SpanId = attr.Value.AsString() } } } func buildMongoMapEvent(mNode *MapInfoT, event tracesdk.Event) { mNode.ServiceName = MONGO_SERVICE_NAME mNode.ServiceType = NOSQL_SERVICE_TYPE for _, attr := range event.Attributes { switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.MethodName = query mNode.Ps = []string{query} case "nosql.src_addr": mNode.SrcAddr = attr.Value.AsString() case "nosql.destination_addr": mNode.DestinationAddr = attr.Value.AsString() } } } func isEnter(_type string) bool { if _type == "APPLICATION" { return true } return false } func span(sd apmTraceSpan) *tracepb.Span { if sd == nil { return nil } tid := sd.SpanContext().TraceID() sid := sd.SpanContext().SpanID() s := &tracepb.Span{ TraceId: tid[:], SpanId: sid[:], TraceState: sd.SpanContext().TraceState().String(), //Status: status(sd.Status().Code, sd.Status().Description), StartTimeUnixNano: uint64(sd.StartTime().UnixNano()), EndTimeUnixNano: uint64(sd.EndTime().UnixNano()), //Links: links(sd.Links()), //Kind: spanKind(sd.SpanKind()), Name: sd.Name(), Attributes: tracetransform.KeyValues(sd.Attributes()), //Events: spanEvents(sd.Events()), DroppedAttributesCount: uint32(sd.DroppedAttributes()), DroppedEventsCount: uint32(sd.DroppedEvents()), DroppedLinksCount: uint32(sd.DroppedLinks()), } if psid := sd.Parent().SpanID(); psid.IsValid() { s.ParentSpanId = psid[:] } return s } func Md5ToInt64(strParam string, Len int) int64 { sign := md5.Sum([]byte(strParam)) signStr := fmt.Sprintf("%x", sign) charArr := []rune(signStr) var intStr string for _, value := range charArr { intStr += strconv.Itoa(int(value)) } intStr = intStr[:Len] int64Data, err := strconv.ParseInt(intStr, 10, 64) if err != nil { return 0 } return int64Data } // ns,ms func cleanNsTime(time int64) (uint64, uint64) { return uint64(time), uint64(math.Round(float64(time) / 1e6)) } func parseLen2(s string, i int) (int, bool) { if len(s) < i+2 { return 0, false } c1, c2 := s[i], s[i+1] if c1 < '0' || c1 > '9' || c2 < '0' || c2 > '9' { return 0, false } return int(c1-'0')*10 + int(c2-'0'), true } func parseSysvcFrom(sysvcFrom string) (string, string, string) { // 新格式:{app_name_len}:app_name:{appServiceType_len}:appServiceType[:{SysTagLen}:SysTag] // 例如:08:eBPF-APP:12:APPLICATION 或 08:eBPF-APP:12:APPLICATION:03:tag // 最小长度:12 (如 "08:x:12:APPLICATION") if len(sysvcFrom) < 12 { return "", "", "" } // 解析 app_name_len(前2位) appNameLen, ok := parseLen2(sysvcFrom, 0) if !ok || appNameLen < 0 || appNameLen > 32 { return "", "", "" } // 检查第一个冒号 if len(sysvcFrom) < 3 || sysvcFrom[2] != ':' { return "", "", "" } // 提取 app_name(从位置 3 开始,长度为 appNameLen) appNameStart := 3 appNameEnd := appNameStart + appNameLen if len(sysvcFrom) < appNameEnd { return "", "", "" } appNameFrom := sysvcFrom[appNameStart:appNameEnd] // 检查 app_name 后的冒号 if len(sysvcFrom) < appNameEnd+1 || sysvcFrom[appNameEnd] != ':' { return appNameFrom, "", "" } // 解析 appServiceType_len(app_name 后的2位) appServiceTypeLenStart := appNameEnd + 1 appServiceTypeLen, ok := parseLen2(sysvcFrom, appServiceTypeLenStart) if !ok || appServiceTypeLen < 0 || appServiceTypeLen > 32 { return appNameFrom, "", "" } // 检查 appServiceType_len 后的冒号 appServiceTypeStart := appServiceTypeLenStart + 2 if len(sysvcFrom) < appServiceTypeStart+1 || sysvcFrom[appServiceTypeStart] != ':' { return appNameFrom, "", "" } // 提取 appServiceType appServiceTypeStart++ appServiceTypeEnd := appServiceTypeStart + appServiceTypeLen if len(sysvcFrom) < appServiceTypeEnd { return appNameFrom, "", "" } serviceTypeFrom := sysvcFrom[appServiceTypeStart:appServiceTypeEnd] // 检查 appServiceType 后的冒号 if len(sysvcFrom) < appServiceTypeEnd+1 || sysvcFrom[appServiceTypeEnd] != ':' { return appNameFrom, serviceTypeFrom, "" } // 解析 sysTagLen(appServiceType 后的2位) sysTagLenStart := appServiceTypeEnd + 1 sysTagLen, ok := parseLen2(sysvcFrom, sysTagLenStart) if !ok || sysTagLen < 0 || sysTagLen > 32 { return appNameFrom, serviceTypeFrom, "" } // 如果 sysTagLen == 0,则没有 sysTag if sysTagLen == 0 { return appNameFrom, serviceTypeFrom, "" } // 检查 sysTagLen 后的冒号 sysTagStart := sysTagLenStart + 2 if len(sysvcFrom) < sysTagStart+1 || sysvcFrom[sysTagStart] != ':' { return appNameFrom, serviceTypeFrom, "" } // 提取 sysTag sysTagStart++ sysTagEnd := sysTagStart + sysTagLen if len(sysvcFrom) < sysTagEnd { return appNameFrom, serviceTypeFrom, "" } parentSys := sysvcFrom[sysTagStart:sysTagEnd] return appNameFrom, serviceTypeFrom, parentSys }