package otlptrace import ( "crypto/md5" "encoding/json" "fmt" "strconv" "strings" "sync" "time" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform" tracesdk "go.opentelemetry.io/otel/sdk/trace" tracepb "go.opentelemetry.io/proto/otlp/trace/v1" ) const ( APP_SERVICE_TYPE = "APPLICATION" SQL_SERVICE_TYPE = "SQL" NOSQL_SERVICE_TYPE = "NOSQL" HTTP_SERVICE_TYPE = "HTTP" ) const ( GO_SERVICE_NAME = "GO" MYSQL_SERVICE_NAME = "MYSQL" REDIS_SERVICE_NAME = "REDIS" HTTP_SERVICE_NAME = "HTTPCLIENT" ) type apmTraceSpan tracesdk.ReadOnlySpan // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110 type RootDataT struct { AccountId int `json:"account_id"` AgentId int64 `json:"agent_id"` AgentVersion string `json:"agent_version"` AppId int64 `json:"app_id"` AppIdFrom int64 `json:"app_id_from"` // from header app_id AppName string `json:"app_name"` CalledId int64 `json:"called_id"` // from header assumed_app_id ClientIp string `json:"client_ip"` CollTime uint64 `json:"coll_time"` Cpu int `json:"cpu"` Custom string `json:"custom"` HostId int64 `json:"host_id"` HostName string `json:"host_name"` HttpCode int64 `json:"http_code"` HttpMethod string `json:"http_method"` InstanceId int64 `json:"instance_id"` InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id LocalPort int64 `json:"local_port"` Maps []MapInfoT `json:"maps"` MemU int `json:"mem_u"` MemUP int `json:"mem_u_p"` OperType string `json:"oper_type"` Parameters []interface{} `json:"parameters"` ParentTaskName int `json:"parent_task_name"` Period int `json:"period"` RespTime uint64 `json:"resp_time"` Sampling int `json:"sampling"` ServiceName string `json:"service_name"` ServiceType string `json:"service_type"` Sip string `json:"sip"` Sn string `json:"sn"` SpanIdFrom string `json:"span_id_from"` // from header span_id Sport int64 `json:"sport"` TId int `json:"t_id"` TName string `json:"t_name"` TraceId string `json:"trace_id"` // from header trace_id TransIds []interface{} `json:"trans_ids"` TypeFrom string `json:"type_from"` Uri string `json:"uri"` UserDir int `json:"user_dir"` VipIds []interface{} `json:"vip_ids"` } type MapInfoT struct { Dbn string `json:"dbn,omitempty"` Exception int `json:"exception,omitempty"` ExceptionMsg string `json:"exception_msg,omitempty"` ExceptionStack string `json:"exception_stack,omitempty"` Ip string `json:"ip,omitempty"` Level int `json:"level"` MethodDesc string `json:"method_desc,omitempty"` MethodName string `json:"method_name"` Nid int `json:"nid"` OperType string `json:"oper_type,omitempty"` Pid int `json:"pid"` Port int64 `json:"port,omitempty"` Ps []string `json:"ps,omitempty"` PureTime uint64 `json:"pure_time"` ServiceName string `json:"service_name"` ServiceType string `json:"service_type"` StartTime uint64 `json:"start_time"` WallTime uint64 `json:"wall_time"` Schema string `json:"schema,omitempty"` AssumedAppId int64 `json:"assumed_app_id,omitempty"` Uri string `json:"uri,omitempty"` SpanId string `json:"span_id,omitempty"` } type TraceMapT struct { RootData RootDataT Index int lock *sync.RWMutex TheEnd bool } var TraceRootMap map[string]*TraceMapT func init() { TraceRootMap = make(map[string]*TraceMapT) go func() { for { //fmt.Println(G_sdl) time.Sleep(5 * time.Second) } }() } var G_sdl int func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT { G_sdl += len(sdl) if len(sdl) == 0 { return nil } for _, sd := range sdl { if sd == nil { continue } traceId := sd.SpanContext().TraceID().String() if _, ok := TraceRootMap[traceId]; !ok { TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1} } TraceRootMap[traceId].Index++ buildAndAssemblyMap(sd, TraceRootMap[traceId]) } // 发送完整数据 | 大量长耗时请求会增加内存占用 sendData := []RootDataT{} for traceId, v := range TraceRootMap { if v.TheEnd { buildLevel(v) sendData = append(sendData, v.RootData) delete(TraceRootMap, traceId) //fmt.Println("the end!") } else { //fmt.Println("not end!") } } // Transform the categorized map into a slice aa, err := json.Marshal(sendData) fmt.Println(err) fmt.Println(string(aa)) fmt.Println(len(sendData)) fmt.Println(len(sdl)) return sendData } func buildLevelRecursion(parentMap map[int]map[int]*MapInfoT, pid int, level int) { for i, v := range parentMap[pid] { parentMap[pid][i].Level = level // fmt.Printf("buildLevelRecursion: %x, %x\n", v.Nid, v.Nid%0x1000000) buildLevelRecursion(parentMap, v.Nid%0x1000000, level+1) } } func buildLevel(sdl *TraceMapT) { parentMap := make(map[int]map[int]*MapInfoT) nidMap := make(map[int]*MapInfoT) for i, v := range sdl.RootData.Maps { nidMap[v.Nid%0x1000000] = &sdl.RootData.Maps[i] } // 没有 pid 的放到 application 上 for i, v := range sdl.RootData.Maps { _, ok := nidMap[v.Pid] if !ok && v.Pid != 0 { sdl.RootData.Maps[i].Pid = 1 } } // fmt.Println("nidMapnidMapnidMapnidMap--------------") // fmt.Println(nidMap) // fmt.Println(sdl.RootData.Maps) // 构建 pid 映射map for i, v := range sdl.RootData.Maps { _, ok := parentMap[v.Pid] if !ok { parentMap[v.Pid] = make(map[int]*MapInfoT) } parentMap[v.Pid][v.Nid] = &sdl.RootData.Maps[i] } // fmt.Println(parentMap) // fmt.Println("nidMapnidMapnidMapnidMap--------------||||") // 构建层级 level := 1 buildLevelRecursion(parentMap, 0, level) } func initRootData(traceId string) RootDataT { data := RootDataT{ AccountId: 110, AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成 AgentVersion: "2.1.0", AppId: 5410049101545798, // TODO 更新 基于appname生成 AppIdFrom: -1, AppName: "eBPF-agent", // TODO 更新 ip:port || process_name CalledId: -1, ClientIp: "", CollTime: 0, Cpu: 0, Custom: "", HostId: 10154813500555812, HostName: "localhost", HttpCode: 0, HttpMethod: "", InstanceId: 1005051101515357, // TODO 更新 基于ip:port InstanceIdFrom: -1, Maps: []MapInfoT{}, MemU: 0, MemUP: 0, OperType: "", Parameters: []interface{}{}, ParentTaskName: 0, Period: -1, RespTime: 0, Sampling: 0, ServiceName: "GO", ServiceType: APP_SERVICE_TYPE, Sip: "", Sn: "", SpanIdFrom: "", Sport: 0, TId: -1, TName: "", TraceId: traceId, TransIds: []interface{}{}, TypeFrom: "", Uri: "", UserDir: 0, VipIds: []interface{}{}, } return data } func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) { mNode := MapInfoT{ Exception: 0, ExceptionMsg: "", ExceptionStack: "", Ip: "", Level: 2, Pid: 1, Port: 0, Ps: []string{}, ServiceName: "", ServiceType: "", WallTime: 0, } mNode.MethodName = spanSd.Name mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3 mNode.WallTime = mNode.PureTime mNode.StartTime = spanSd.StartTimeUnixNano / 1e6 for _, attr := range spanSd.GetAttributes() { switch attr.Key { case "nid": mNode.Nid = int(attr.Value.GetIntValue()) case "pid": mNode.Pid = int(attr.Value.GetIntValue()) case "level": mNode.Level = int(attr.Value.GetIntValue()) } } return mNode, spanSd.Name } // 构建拼装 func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT { mNode, mapType := initMapNode(span(sd)) switch mapType { case "APPLICATION": buildAppMap(&mNode, traceRoot, sd) traceRoot.TheEnd = true case "HTTP": buildHttpMap(&mNode, sd) case "Mysql": buildMysqlMap(&mNode, sd) case "Redis": buildRedisMap(&mNode, sd) } if mapType != "" { // mNode.Nid = traceRoot.Index traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode) } return mNode } func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) { mNode.ServiceName = GO_SERVICE_NAME mNode.ServiceType = APP_SERVICE_TYPE mNode.MethodName = "net/http.(*Transport).roundTrip()" mNode.Level = 1 mNode.Pid = 0 mNode.Nid = 1 // 构建root节点 traceRoot.RootData.RespTime = mNode.PureTime traceRoot.RootData.CollTime = mNode.StartTime traceRoot.Index = 1 for _, attr := range sd.Attributes() { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "http.uri": traceRoot.RootData.Uri = attr.Value.AsString() case "http.method": traceRoot.RootData.HttpMethod = attr.Value.AsString() case "http.status_code": traceRoot.RootData.HttpCode = attr.Value.AsInt64() case "net.peer.name": traceRoot.RootData.ClientIp = attr.Value.AsString() traceRoot.RootData.Sip = attr.Value.AsString() traceRoot.RootData.Sn = attr.Value.AsString() case "net.peer.port": traceRoot.RootData.Sport = attr.Value.AsInt64() traceRoot.RootData.LocalPort = attr.Value.AsInt64() case "server.trace_id_from": traceRoot.RootData.TraceId = attr.Value.AsString() case "server.called_id": traceRoot.RootData.CalledId = attr.Value.AsInt64() case "server.instance_id_from": traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64() case "server.app_id_from": traceRoot.RootData.AppIdFrom = attr.Value.AsInt64() case "server.span_id_from": traceRoot.RootData.SpanIdFrom = attr.Value.AsString() case "server.type_from": traceRoot.RootData.TypeFrom = attr.Value.AsString() } } } func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) { mNode.ServiceName = HTTP_SERVICE_NAME mNode.ServiceType = HTTP_SERVICE_TYPE mNode.Schema = "http" mNode.MethodName = "net/http.serverHandler.ServeHTTP()" var descAddr string for _, attr := range sd.Attributes() { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "http.ip": mNode.Ip = attr.Value.AsString() descAddr += mNode.Ip case "http.port": mNode.Port = attr.Value.AsInt64() descAddr += ":" + attr.Value.AsString() case "http.uri": mNode.Uri = attr.Value.AsString() case "http.assumed_app_id": mNode.AssumedAppId = attr.Value.AsInt64() case "http.span_id": mNode.SpanId = attr.Value.AsString() } } //mNode.AssumedAppId = Md5ToInt64(descAddr, 16) } func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) { mNode.Dbn = "unknown" mNode.ServiceName = MYSQL_SERVICE_NAME mNode.ServiceType = SQL_SERVICE_TYPE mNode.MethodName = "database/sql.Query()" for _, attr := range sd.Attributes() { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.Ps = []string{query} words := strings.Fields(query) if len(words) > 0 { mNode.OperType = strings.ToUpper(words[0]) } } } } func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) { mNode.ServiceName = REDIS_SERVICE_NAME mNode.ServiceType = NOSQL_SERVICE_TYPE //mNode.MethodName = span(sd).Name + " query" mNode.MethodName = "redis.Do()" for _, attr := range sd.Attributes() { //fmt.Println(attr.Key, ":", attr.Value.AsInterface()) switch attr.Key { case "net.peer.name": mNode.Ip = attr.Value.AsString() case "net.peer.port": mNode.Port = attr.Value.AsInt64() case "db.statement": query := attr.Value.AsString() mNode.Ps = []string{query} words := strings.Fields(query) if len(words) > 0 { mNode.OperType = strings.ToUpper(words[0]) } } } } func isEnter(_type string) bool { if _type == "APPLICATION" { return true } return false } func span(sd apmTraceSpan) *tracepb.Span { if sd == nil { return nil } tid := sd.SpanContext().TraceID() sid := sd.SpanContext().SpanID() s := &tracepb.Span{ TraceId: tid[:], SpanId: sid[:], TraceState: sd.SpanContext().TraceState().String(), //Status: status(sd.Status().Code, sd.Status().Description), StartTimeUnixNano: uint64(sd.StartTime().UnixNano()), EndTimeUnixNano: uint64(sd.EndTime().UnixNano()), //Links: links(sd.Links()), //Kind: spanKind(sd.SpanKind()), Name: sd.Name(), Attributes: tracetransform.KeyValues(sd.Attributes()), //Events: spanEvents(sd.Events()), DroppedAttributesCount: uint32(sd.DroppedAttributes()), DroppedEventsCount: uint32(sd.DroppedEvents()), DroppedLinksCount: uint32(sd.DroppedLinks()), } if psid := sd.Parent().SpanID(); psid.IsValid() { s.ParentSpanId = psid[:] } return s } func Md5ToInt64(strParam string, Len int) int64 { sign := md5.Sum([]byte(strParam)) signStr := fmt.Sprintf("%x", sign) charArr := []rune(signStr) var intStr string for _, value := range charArr { intStr += strconv.Itoa(int(value)) } intStr = intStr[:Len] int64Data, err := strconv.ParseInt(intStr, 10, 64) if err != nil { return 0 } return int64Data }