| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144 |
- package otlptrace
- import (
- "crypto/md5"
- "encoding/json"
- "fmt"
- . "github.com/coroot/coroot-node-agent/ebpftracer"
- "github.com/coroot/coroot-node-agent/ebpftracer/l7"
- "github.com/coroot/coroot-node-agent/utils"
- klog "github.com/sirupsen/logrus"
- "math"
- "net/url"
- "sort"
- "strconv"
- "sync"
- "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
- tracesdk "go.opentelemetry.io/otel/sdk/trace"
- tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
- )
- const (
- APP_SERVICE_TYPE = "APPLICATION"
- SQL_SERVICE_TYPE = "SQL"
- NOSQL_SERVICE_TYPE = "NOSQL"
- HTTP_SERVICE_TYPE = "HTTP"
- NET_SERVICE_TYPE = "L7_NET"
- )
- const (
- GO_SERVICE_NAME = "GO"
- MYSQL_SERVICE_NAME = "MYSQL"
- DM_SERVICE_NAME = "DM"
- REDIS_SERVICE_NAME = "REDIS"
- MONGO_SERVICE_NAME = "MONGODB"
- HTTP_SERVICE_NAME = "HTTPCLIENT"
- POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
- )
- type apmTraceSpan tracesdk.ReadOnlySpan
- // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
- type RootDataT struct {
- AccountId int `json:"account_id"`
- AgentId int64 `json:"agent_id"`
- AgentVersion string `json:"agent_version"`
- AppId int64 `json:"app_id"`
- AppIdFrom int64 `json:"app_id_from"` // from header app_id
- AppName string `json:"app_name"`
- CalledId int64 `json:"called_id"` // from header assumed_app_id
- ClientIp string `json:"client_ip"`
- CollTime uint64 `json:"coll_time"`
- Cpu int `json:"cpu"`
- Custom string `json:"custom"`
- HostId int64 `json:"host_id"`
- HostName string `json:"host_name"`
- HttpCode int64 `json:"http_code"`
- HttpMethod string `json:"http_method"`
- InstanceId int64 `json:"instance_id"`
- InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
- LocalPort int64 `json:"local_port"`
- Maps []MapInfoT `json:"maps"`
- MemU int `json:"mem_u"`
- MemUP int `json:"mem_u_p"`
- OperType string `json:"oper_type"`
- Parameters []ParamStruct `json:"parameters,omitempty"`
- ParentTaskName string `json:"parent_task_name"`
- Period int `json:"period"`
- RespTime uint64 `json:"resp_time"`
- Sampling int `json:"sampling"`
- ServiceName string `json:"service_name"`
- ServiceType string `json:"service_type"`
- Sip string `json:"sip"`
- Sn string `json:"sn"`
- SpanIdFrom string `json:"span_id_from"` // from header span_id
- Sport int64 `json:"sport"`
- TId int `json:"t_id"`
- TName string `json:"t_name"`
- TraceId string `json:"trace_id"` // from header trace_id
- TransIds []interface{} `json:"trans_ids"`
- TypeFrom string `json:"type_from"`
- Uri string `json:"uri"`
- UserDir string `json:"user_dir"`
- VipIds []interface{} `json:"vip_ids"`
- SrcAddr string `json:"src_addr"`
- DestinationAddr string `json:"destination_addr"`
- // op 新增字段
- Pid uint32 `json:"pid"`
- ContainerID string `json:"container_id"`
- }
- // ParamStruct 定义目标结构
- type ParamStruct struct {
- Name string `json:"name"`
- Values []string `json:"values"`
- }
- type MapInfoT struct {
- Dbn string `json:"dbn,omitempty"`
- Exception int `json:"exception,omitempty"`
- ExceptionMsg string `json:"exception_msg,omitempty"`
- ExceptionStack string `json:"exception_stack,omitempty"`
- Ip string `json:"ip,omitempty"`
- Level int `json:"level"`
- MethodDesc string `json:"method_desc,omitempty"`
- MethodName string `json:"method_name"`
- Nid int `json:"nid"`
- OperType string `json:"oper_type,omitempty"`
- Pid int `json:"pid"`
- Port int64 `json:"port,omitempty"`
- Ps []string `json:"ps,omitempty"`
- PureTime uint64 `json:"pure_time"`
- ServiceName string `json:"service_name"`
- ServiceType string `json:"service_type"`
- StartTime uint64 `json:"-"`
- EndTime uint64 `json:"-"`
- StartTimeMs uint64 `json:"start_time"`
- EndTimeMs uint64 `json:"end_time"`
- WallTime uint64 `json:"wall_time"`
- Schema string `json:"schema,omitempty"`
- AssumedAppId int64 `json:"assumed_app_id,omitempty"`
- Uri string `json:"uri,omitempty"`
- SpanId string `json:"span_id,omitempty"`
- SrcAddr string `json:"src_addr,omitempty"`
- DestinationAddr string `json:"destination_addr,omitempty"`
- }
- type TraceMapT struct {
- RootData RootDataT
- Index int
- lock *sync.RWMutex
- TheEnd bool
- }
- var TraceRootMap map[string]*TraceMapT
- func init() {
- TraceRootMap = make(map[string]*TraceMapT)
- //go func() {
- // for {
- // //fmt.Println(G_sdl)
- // time.Sleep(5 * time.Second)
- // }
- //}()
- }
- var G_sdl int
- func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
- //G_sdl += len(sdl)
- if len(sdl) == 0 {
- return nil
- }
- // 多次请求 sdl
- sendDataMap := make(map[int][]RootDataT)
- //sendData := []RootDataT{}
- for _, sd := range sdl {
- if sd == nil {
- continue
- }
- //traceId := sd.SpanContext().TraceID().String()
- //fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据
- // 构建map *RootDataT
- var rootData RootDataT
- rootData = initRootDataFromEvent()
- // build http入口 MapInfoT
- code_type := buildAppMapFromEvent(&rootData, sd)
- // 构建maps
- for _, event := range sd.Events() {
- //aaa, _ := json.Marshal(event)
- //fmt.Println("event.info", string(aaa))
- mNode := buildMapNodeFromEvent(event)
- switch EventType(event.EventType) {
- // stack
- case EventTypeFunEnt:
- // l7 event
- case EventTypeL7Request:
- switch l7.Protocol(event.ProtocolType) {
- case l7.ProtocolHTTP:
- buildHttpMapFromEvent(&mNode, event)
- case l7.ProtocolDNS:
- buildDNSMapEvent(&mNode, event)
- case l7.ProtocolMysql:
- buildSQLMapEvent(&mNode, event)
- case l7.ProtocolRedis:
- buildRedisMapEvent(&mNode, event)
- case l7.ProtocolMongo:
- buildMongoMapEvent(&mNode, event)
- // dm
- case l7.ProtocolDM:
- buildSQLMapEvent(&mNode, event)
- case l7.ProtocolPostgres:
- buildSQLMapEvent(&mNode, event)
- }
- }
- rootData.Maps = append(rootData.Maps, mNode)
- //fmt.Println(event.Name)
- //buildAndAssemblyMapFromEvent(event, rootData)
- }
- buildLevelFromEvent(&rootData)
- sendDataMap[code_type] = append(sendDataMap[code_type], rootData)
- //a, _ := json.Marshal(rootData)
- //fmt.Println(string(a))
- //sendData = append(sendData, rootData)
- //if _, ok := TraceRootMap[traceId]; !ok {
- //TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
- //}
- //TraceRootMap[traceId].Index++
- //buildAndAssemblyMap(sd, TraceRootMap[traceId])
- }
- // 发送完整数据 | 大量长耗时请求会增加内存占用
- //sendData := []RootDataT{}
- //for traceId, v := range TraceRootMap {
- // if v.TheEnd {
- // buildLevel(v)
- // sendData = append(sendData, v.RootData)
- // delete(TraceRootMap, traceId)
- // //fmt.Println("the end!")
- // } else {
- // //fmt.Println("not end!")
- // }
- //}
- //Transform the categorized map into a slice
- data, _ := json.Marshal(sendDataMap)
- klog.Debug(string(data))
- //fmt.Println(len(sendData))
- //fmt.Println("sdl len:", len(sdl))
- return sendDataMap
- }
- type TimeMap struct {
- Time uint64
- Type int
- Map *MapInfoT
- }
- //type TraceMapT struct {
- // RootData RootDataT
- // Index int
- // lock *sync.RWMutex
- // TheEnd bool
- //}
- //func buildLevel(sdl *TraceMapT) {
- // nidMap := make(map[int]*MapInfoT)
- //
- // mapSlice := []TimeMap{}
- //
- // for i, v := range sdl.RootData.Maps {
- // if v.ServiceType == "APPLICATION" {
- // continue
- // }
- // nidMap[v.Nid] = &sdl.RootData.Maps[i]
- // timeStartMap := TimeMap{
- // Time: v.StartTime,
- // Type: 0,
- // Map: &sdl.RootData.Maps[i],
- // }
- // mapSlice = append(mapSlice, timeStartMap)
- // timeEndMap := TimeMap{
- // Time: v.EndTime,
- // Type: 1,
- // Map: &sdl.RootData.Maps[i],
- // }
- // mapSlice = append(mapSlice, timeEndMap)
- // }
- // sort.Slice(mapSlice, func(i, j int) bool {
- // return mapSlice[i].Time < mapSlice[j].Time
- // })
- //
- // funStack := []TimeMap{}
- //
- // currentNid := 1
- // Nid := 2
- // level := 2
- //
- // for _, v := range mapSlice {
- // // fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
- // if v.Type == 0 {
- // // 函数入口
- // funStack = append(funStack, v)
- // v.Map.Pid = currentNid
- // v.Map.Level = level
- // v.Map.Nid = Nid
- // currentNid = Nid
- // level += 1
- // Nid += 1
- // } else if v.Type == 1 {
- // // 函数出口
- // len := len(funStack)
- // funStack = funStack[:len-1]
- // if (len - 2) < 0 {
- // currentNid = 1
- // } else {
- // currentNid = funStack[len-2].Map.Nid
- // }
- //
- // level -= 1
- // }
- // }
- //}
- func buildLevelFromEvent(sdl *RootDataT) {
- nidMap := make(map[int]*MapInfoT)
- mapSlice := []TimeMap{}
- for i, v := range sdl.Maps {
- if v.ServiceType == "APPLICATION" {
- continue
- }
- nidMap[v.Nid] = &sdl.Maps[i]
- timeStartMap := TimeMap{
- Time: v.StartTime,
- Type: 0,
- Map: &sdl.Maps[i],
- }
- mapSlice = append(mapSlice, timeStartMap)
- timeEndMap := TimeMap{
- Time: v.EndTime,
- Type: 1,
- Map: &sdl.Maps[i],
- }
- mapSlice = append(mapSlice, timeEndMap)
- }
- sort.Slice(mapSlice, func(i, j int) bool {
- return mapSlice[i].Time < mapSlice[j].Time
- })
- funStack := []TimeMap{}
- currentNid := 1
- Nid := 2
- level := 2
- for _, v := range mapSlice {
- //klog.Debugln("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
- if v.Type == 0 {
- // 函数入口
- funStack = append(funStack, v)
- v.Map.Pid = currentNid
- v.Map.Level = level
- v.Map.Nid = Nid
- currentNid = Nid
- level += 1
- Nid += 1
- } else if v.Type == 1 {
- // 函数出口
- len := len(funStack)
- funStack = funStack[:len-1]
- if (len - 2) < 0 {
- currentNid = 1
- } else {
- currentNid = funStack[len-2].Map.Nid
- }
- level -= 1
- }
- }
- }
- //func initRootData(traceId string) RootDataT {
- // data := RootDataT{
- // AccountId: 110,
- // AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
- // AgentVersion: "2.1.0",
- // AppId: 5410049101545798, // TODO 更新 基于appname生成
- // AppIdFrom: -1,
- // AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
- // CalledId: -1,
- // ClientIp: "",
- // CollTime: 0,
- // Cpu: 0,
- // Custom: "",
- // HostId: 10154813500555812,
- // HostName: "localhost",
- // HttpCode: 0,
- // HttpMethod: "",
- // InstanceId: 1005051101515357, // TODO 更新 基于ip:port
- // InstanceIdFrom: -1,
- // Maps: []MapInfoT{},
- // MemU: 0,
- // MemUP: 0,
- // OperType: "",
- // Parameters: []interface{}{},
- // ParentTaskName: 0,
- // Period: -1,
- // RespTime: 0,
- // Sampling: 0,
- // ServiceName: "GO",
- // ServiceType: APP_SERVICE_TYPE,
- // Sip: "",
- // Sn: "",
- // SpanIdFrom: "",
- // Sport: 0,
- // TId: -1,
- // TName: "",
- // TraceId: traceId,
- // TransIds: []interface{}{},
- // TypeFrom: "",
- // Uri: "",
- // UserDir: 0,
- // VipIds: []interface{}{},
- // }
- // return data
- //}
- func initRootDataFromEvent() RootDataT {
- hostID := utils.GetHostID()
- accountID := utils.GetAccountID()
- data := RootDataT{
- // todo AccountId
- AccountId: accountID,
- AgentId: 0, // 基于 ip:port + process_name + exe路径生成
- AgentVersion: "2.1.0",
- AppId: 0, // 基于appname生成
- AppIdFrom: -1,
- AppName: "eBPF-agent", // server配置
- CalledId: -1,
- ClientIp: "",
- CollTime: 0,
- Cpu: 0,
- Custom: "",
- HostId: hostID,
- HostName: "localhost",
- HttpCode: 0,
- HttpMethod: "",
- InstanceId: 0, // 基于ip:port
- InstanceIdFrom: -1,
- Maps: []MapInfoT{},
- MemU: 0,
- MemUP: 0,
- OperType: "",
- Parameters: []ParamStruct{},
- ParentTaskName: "",
- Period: -1,
- RespTime: 0,
- Sampling: 0,
- ServiceName: "",
- ServiceType: APP_SERVICE_TYPE,
- Sip: "",
- Sn: "",
- SpanIdFrom: "",
- Sport: 0,
- TId: -1,
- TName: "",
- TraceId: "",
- TransIds: []interface{}{},
- TypeFrom: "",
- Uri: "",
- UserDir: "",
- VipIds: []interface{}{},
- SrcAddr: "",
- DestinationAddr: "",
- }
- return data
- }
- func initRootDataJava() RootDataT {
- data := RootDataT{
- AccountId: 110,
- AgentId: 3934815089541000, // TODO 更新 基于 ip:port + process_name + exe路径生成
- AgentVersion: "2.21.0",
- AppId: 3365853273187618, // TODO 更新 基于appname生成
- AppIdFrom: -1,
- AppName: "eBPF-javaApplication", // TODO 更新 ip:port || process_name
- CalledId: -1,
- ClientIp: "",
- CollTime: 0,
- Cpu: 0,
- Custom: "",
- HostId: 2315065183171055,
- HostName: "localhost",
- HttpCode: 0,
- HttpMethod: "",
- InstanceId: 1128864082033413, // TODO 更新 基于ip:port
- InstanceIdFrom: -1,
- Maps: []MapInfoT{},
- MemU: 0,
- MemUP: 0,
- OperType: "",
- Parameters: []ParamStruct{},
- ParentTaskName: "",
- Period: -1,
- RespTime: 0,
- Sampling: 0,
- ServiceName: "TOMCAT",
- ServiceType: APP_SERVICE_TYPE,
- Sip: "",
- Sn: "",
- SpanIdFrom: "",
- Sport: 0,
- TId: -1,
- TName: "",
- TraceId: "",
- TransIds: []interface{}{},
- TypeFrom: "",
- Uri: "",
- UserDir: "",
- VipIds: []interface{}{},
- }
- return data
- }
- //func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
- // mNode := MapInfoT{
- // Exception: 0,
- // ExceptionMsg: "",
- // ExceptionStack: "",
- // Ip: "",
- // Level: 2,
- // Pid: 1,
- // Port: 0,
- // Ps: []string{},
- // ServiceName: "",
- // ServiceType: "",
- // WallTime: 0,
- // }
- // mNode.MethodName = spanSd.Name
- // mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
- // mNode.WallTime = mNode.PureTime
- // mNode.StartTime = spanSd.StartTimeUnixNano
- // mNode.EndTime = spanSd.EndTimeUnixNano
- //
- // for _, attr := range spanSd.GetAttributes() {
- // fmt.Println(attr.Key, ":", attr.Value.GetValue())
- //
- // switch attr.Key {
- // case "nid":
- // mNode.Nid = int(attr.Value.GetIntValue())
- // case "pid":
- // mNode.Pid = int(attr.Value.GetIntValue())
- // case "level":
- // mNode.Level = int(attr.Value.GetIntValue())
- // }
- // }
- //
- // return mNode, spanSd.Name
- //}
- func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT {
- mNode := MapInfoT{
- Exception: 0,
- ExceptionMsg: "",
- ExceptionStack: "",
- Ip: "",
- Level: 2,
- Pid: 1,
- Port: 0,
- Ps: []string{},
- ServiceName: "",
- ServiceType: "",
- WallTime: 0,
- }
- mNode.MethodName = event.Name
- //mNode.PureTime = (event.EndTimeUnixNano - event.StartTimeUnixNano) / 1e3
- //mNode.WallTime = mNode.PureTime
- //mNode.StartTime = spanSd.StartTimeUnixNano
- //mNode.EndTime = spanSd.EndTimeUnixNano
- for _, attr := range event.Attributes {
- //fmt.Println(event.Name, "--->buildMapNodeFromEvent--->", attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "nid":
- mNode.Nid = int(attr.Value.AsInt64())
- case "pid":
- mNode.Pid = int(attr.Value.AsInt64())
- case "level":
- mNode.Level = int(attr.Value.AsInt64())
- case "time.start_at":
- mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
- case "time.end_at":
- mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
- case "time.duration":
- //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
- mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
- }
- }
- return mNode
- }
- func parseURIToParams(input string) (string, []ParamStruct, error) {
- // 解析输入 URI
- parsedURL, err := url.Parse(input)
- if err != nil {
- return "", nil, fmt.Errorf("failed to parse URI: %w", err)
- }
- // 提取查询参数
- queryParams := parsedURL.Query()
- // 转换为目标结构
- var params []ParamStruct
- for key, values := range queryParams {
- params = append(params, ParamStruct{
- Name: key,
- Values: values,
- })
- }
- return parsedURL.Path, params, nil
- }
- // 构建拼装
- //func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
- // mNode, mapType := initMapNode(span(sd))
- // switch mapType {
- // case "APPLICATION":
- // buildAppMap(&mNode, traceRoot, sd)
- // traceRoot.TheEnd = true
- // case "HTTP":
- // buildHttpMap(&mNode, sd)
- // case "Mysql":
- // buildMysqlMap(&mNode, sd)
- // case "Redis":
- // buildRedisMap(&mNode, sd)
- // }
- // if mapType != "" {
- // mNode.Nid = traceRoot.Index
- // traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
- // }
- // return mNode
- //}
- //func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT {
- // mNode := buildMapNodeFromEvent(event)
- // switch mapType {
- // case "HTTP":
- // buildHttpMapFromEvent(mNode, event)
- // //case "Mysql":
- // // buildMysqlMap(mNode, sd)
- // //case "Redis":
- // // buildRedisMap(mNode, sd)
- // }
- // if mapType != "" {
- // //mNode.Nid = traceRoot.Index
- // traceRoot.Maps = append(traceRoot.Maps, mNode)
- // }
- // return mNode
- //}
- //func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
- // mNode.ServiceName = GO_SERVICE_NAME
- // mNode.ServiceType = APP_SERVICE_TYPE
- // mNode.MethodName = "net/http.(*Transport).roundTrip()"
- // mNode.Level = 1
- // mNode.Pid = 0
- // mNode.Nid = 1
- // // 构建root节点
- // traceRoot.RootData.RespTime = mNode.PureTime
- // traceRoot.RootData.CollTime = mNode.StartTime
- // traceRoot.Index = 1
- // for _, attr := range sd.Attributes() {
- // fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- // switch attr.Key {
- // case "http.uri":
- // traceRoot.RootData.Uri = attr.Value.AsString()
- // case "http.method":
- // traceRoot.RootData.HttpMethod = attr.Value.AsString()
- // case "http.status_code":
- // traceRoot.RootData.HttpCode = attr.Value.AsInt64()
- // case "net.peer.name":
- // traceRoot.RootData.ClientIp = attr.Value.AsString()
- // traceRoot.RootData.Sip = attr.Value.AsString()
- // traceRoot.RootData.Sn = attr.Value.AsString()
- // case "net.peer.port":
- // traceRoot.RootData.Sport = attr.Value.AsInt64()
- // traceRoot.RootData.LocalPort = attr.Value.AsInt64()
- // case "server.trace_id_from":
- // traceRoot.RootData.TraceId = attr.Value.AsString()
- // case "server.called_id":
- // traceRoot.RootData.CalledId = attr.Value.AsInt64()
- // case "server.instance_id_from":
- // traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
- // case "server.app_id_from":
- // traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
- // case "server.span_id_from":
- // traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
- // case "server.type_from":
- // traceRoot.RootData.TypeFrom = attr.Value.AsString()
- // }
- // }
- //
- //}
- func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
- mNode := MapInfoT{
- Exception: 0,
- ExceptionMsg: "",
- ExceptionStack: "",
- Ip: "",
- Level: 1,
- Pid: 1,
- Port: 0,
- Ps: []string{},
- ServiceName: "",
- ServiceType: "",
- WallTime: 0,
- }
- mNode.ServiceName = GO_SERVICE_NAME
- mNode.ServiceType = APP_SERVICE_TYPE
- mNode.MethodName = "Kernel Endpoint()"
- mNode.Level = 1
- mNode.Pid = 0
- mNode.Nid = 1
- var code_type int64
- // 构建root节点
- //traceRoot.RespTime = mNode.PureTimex
- //traceRoot.CollTime = mNode.StartTime
- for _, attr := range sd.Attributes() {
- klog.Debugln("Appmap:", attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "http.uri":
- traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString())
- case "http.method":
- traceRoot.HttpMethod = attr.Value.AsString()
- case "http.status_code":
- traceRoot.HttpCode = attr.Value.AsInt64()
- case "net.peer.name":
- // TODO 修改 ClientIp sip获取方式
- traceRoot.ClientIp = attr.Value.AsString()
- traceRoot.Sip = attr.Value.AsString()
- traceRoot.Sn = attr.Value.AsString()
- case "net.peer.port":
- traceRoot.Sport = attr.Value.AsInt64()
- traceRoot.LocalPort = attr.Value.AsInt64()
- case "server.trace_id_from":
- traceRoot.TraceId = attr.Value.AsString()
- case "server.called_id":
- traceRoot.CalledId = attr.Value.AsInt64()
- case "server.instance_id_from":
- traceRoot.InstanceIdFrom = attr.Value.AsInt64()
- case "server.app_id_from":
- traceRoot.AppIdFrom = attr.Value.AsInt64()
- case "server.span_id_from":
- traceRoot.SpanIdFrom = attr.Value.AsString()
- case "server.type_from":
- traceRoot.TypeFrom = attr.Value.AsString()
- case "time.start_at":
- mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
- traceRoot.CollTime = mNode.StartTimeMs
- case "time.end_at":
- mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
- case "time.duration":
- traceRoot.RespTime = uint64(attr.Value.AsInt64()) / 1e3
- mNode.PureTime = traceRoot.RespTime
- mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
- case "server.code_type":
- code_type = attr.Value.AsInt64()
- case "server.app_name":
- traceRoot.AppName = attr.Value.AsString()
- case "server.service_name":
- traceRoot.ServiceName = attr.Value.AsString()
- mNode.ServiceName = attr.Value.AsString()
- case "server.app_id":
- traceRoot.AppId = attr.Value.AsInt64()
- case "server.agent_id":
- traceRoot.AgentId = attr.Value.AsInt64()
- case "server.instance_id":
- traceRoot.InstanceId = attr.Value.AsInt64()
- case "server.src_addr":
- traceRoot.SrcAddr = attr.Value.AsString()
- case "server.dst_addr":
- traceRoot.DestinationAddr = attr.Value.AsString()
- case "server.pid":
- traceRoot.Pid = uint32(attr.Value.AsInt64())
- case "server.container_id":
- traceRoot.ContainerID = attr.Value.AsString()
- }
- }
- traceRoot.Maps = append(traceRoot.Maps, mNode)
- return int(code_type)
- }
- //func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
- // mNode.ServiceName = HTTP_SERVICE_NAME
- // mNode.ServiceType = HTTP_SERVICE_TYPE
- // mNode.Schema = "http"
- // mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
- // var descAddr string
- // for _, attr := range sd.Attributes() {
- // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- // switch attr.Key {
- // case "http.ip":
- // mNode.Ip = attr.Value.AsString()
- // descAddr += mNode.Ip
- // case "http.port":
- // mNode.Port = attr.Value.AsInt64()
- // descAddr += ":" + attr.Value.AsString()
- // case "http.uri":
- // mNode.Uri = attr.Value.AsString()
- // case "http.assumed_app_id":
- // mNode.AssumedAppId = attr.Value.AsInt64()
- // case "http.span_id":
- // mNode.SpanId = attr.Value.AsString()
- // }
- // }
- // //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
- //}
- func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.ServiceName = HTTP_SERVICE_NAME
- mNode.ServiceType = HTTP_SERVICE_TYPE
- mNode.Schema = "http"
- //mNode.MethodName = "HTTP"
- //var descAddr string
- var method string
- for _, attr := range event.Attributes {
- klog.Debugln("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "http.ip":
- mNode.Ip = attr.Value.AsString()
- //descAddr += mNode.Ip
- case "http.port":
- mNode.Port = attr.Value.AsInt64()
- case "http.method":
- //mNode.MethodName += " " + attr.Value.AsString()
- method = attr.Value.AsString()
- //descAddr += ":" + attr.Value.AsString()
- case "http.uri":
- mNode.Uri = attr.Value.AsString()
- //mNode.MethodName += " " + attr.Value.AsString()
- case "http.assumed_app_id":
- mNode.AssumedAppId = attr.Value.AsInt64()
- case "http.span_id":
- mNode.SpanId = attr.Value.AsString()
- case "time.start_at":
- mNode.StartTime = uint64(attr.Value.AsInt64())
- case "time.end_at":
- mNode.EndTime = uint64(attr.Value.AsInt64())
- case "time.duration":
- //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
- mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
- case "http.src_addr":
- mNode.SrcAddr = attr.Value.AsString()
- case "http.destination_addr":
- mNode.DestinationAddr = attr.Value.AsString()
- }
- }
- mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri)
- //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
- }
- //func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
- // mNode.Dbn = "unknown"
- // mNode.ServiceName = MYSQL_SERVICE_NAME
- // mNode.ServiceType = SQL_SERVICE_TYPE
- // mNode.MethodName = "database/sql.Query()"
- // for _, attr := range sd.Attributes() {
- // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- // switch attr.Key {
- // case "net.peer.name":
- // mNode.Ip = attr.Value.AsString()
- // case "net.peer.port":
- // mNode.Port = attr.Value.AsInt64()
- // case "db.statement":
- // query := attr.Value.AsString()
- // mNode.Ps = []string{query}
- // words := strings.Fields(query)
- // if len(words) > 0 {
- // mNode.OperType = strings.ToUpper(words[0])
- // }
- // }
- // }
- //}
- func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.Dbn = "-"
- mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
- mNode.ServiceType = SQL_SERVICE_TYPE
- //mNode.MethodName = "database/sql.Query()"
- for _, attr := range event.Attributes {
- //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.MethodName = query
- mNode.Ps = []string{query}
- //words := strings.Fields(query)
- //if len(words) > 0 {
- // mNode.OperType = strings.ToUpper(words[0])
- //}
- case "sql.exception":
- if attr.Value.AsBool() {
- mNode.Exception = 1
- } else {
- mNode.Exception = 0
- }
- case "sql.exception_msg":
- mNode.ExceptionMsg = attr.Value.AsString()
- case "sql.src_addr":
- mNode.SrcAddr = attr.Value.AsString()
- case "sql.destination_addr":
- mNode.DestinationAddr = attr.Value.AsString()
- }
- }
- }
- func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
- mNode.ServiceType = NET_SERVICE_TYPE
- var _type string
- var fqdn string
- var ips string
- var ttl int64
- for _, attr := range event.Attributes {
- switch attr.Key {
- case "dns.type":
- _type = attr.Value.AsString()
- case "dns.fqdn":
- fqdn = attr.Value.AsString()
- case "dns.ttl":
- ttl = attr.Value.AsInt64()
- case "dns.ips":
- if attr.Value.AsString() != "" {
- ips = "Addr: " + attr.Value.AsString()
- }
- }
- }
- mNode.MethodName = fmt.Sprintf("DNS Name: %s Type: %s TTL: %d %s", fqdn, _type, ttl, ips)
- }
- func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.Dbn = "-"
- mNode.ServiceName = POSTGRESQL_SERVICE_NAME
- mNode.ServiceType = SQL_SERVICE_TYPE
- //mNode.MethodName = "database/sql.Query()"
- for _, attr := range event.Attributes {
- //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.Ps = []string{query}
- //words := strings.Fields(query)
- //if len(words) > 0 {
- // mNode.OperType = strings.ToUpper(words[0])
- //}
- case "sql.exception":
- if attr.Value.AsBool() {
- mNode.Exception = 1
- } else {
- mNode.Exception = 0
- }
- case "sql.src_addr":
- mNode.SrcAddr = attr.Value.AsString()
- case "sql.destination_addr":
- mNode.DestinationAddr = attr.Value.AsString()
- }
- }
- }
- func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.Dbn = "-"
- mNode.ServiceName = MYSQL_SERVICE_NAME
- mNode.ServiceType = SQL_SERVICE_TYPE
- //mNode.MethodName = "database/sql.Query()"
- for _, attr := range event.Attributes {
- //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.MethodName = query
- mNode.Ps = []string{query}
- //words := strings.Fields(query)
- //if len(words) > 0 {
- // mNode.OperType = strings.ToUpper(words[0])
- //}
- case "sql.exception":
- if attr.Value.AsBool() {
- mNode.Exception = 1
- } else {
- mNode.Exception = 0
- }
- case "sql.src_addr":
- mNode.SrcAddr = attr.Value.AsString()
- case "sql.destination_addr":
- mNode.DestinationAddr = attr.Value.AsString()
- }
- }
- }
- func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.Dbn = "TEST"
- mNode.ServiceName = DM_SERVICE_NAME
- mNode.ServiceType = SQL_SERVICE_TYPE
- mNode.MethodName = "database/sql.Query()"
- for _, attr := range event.Attributes {
- //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.Ps = []string{query}
- //words := strings.Fields(query)
- //if len(words) > 0 {
- // mNode.OperType = strings.ToUpper(words[0])
- //}
- case "sql.exception":
- if attr.Value.AsBool() {
- mNode.Exception = 1
- } else {
- mNode.Exception = 0
- }
- case "sql.src_addr":
- mNode.SrcAddr = attr.Value.AsString()
- case "sql.destination_addr":
- mNode.DestinationAddr = attr.Value.AsString()
- }
- }
- }
- func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.ServiceName = REDIS_SERVICE_NAME
- mNode.ServiceType = NOSQL_SERVICE_TYPE
- //mNode.MethodName = span(sd).Name + " query"
- //mNode.MethodName = "redis.Do()"
- for _, attr := range event.Attributes {
- //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.MethodName = query
- mNode.Ps = []string{query}
- //words := strings.Fields(query)
- //if len(words) > 0 {
- // mNode.OperType = strings.ToUpper(words[0])
- //}
- case "nosql.src_addr":
- mNode.SrcAddr = attr.Value.AsString()
- case "nosql.destination_addr":
- mNode.DestinationAddr = attr.Value.AsString()
- }
- }
- }
- func buildMongoMapEvent(mNode *MapInfoT, event tracesdk.Event) {
- mNode.ServiceName = MONGO_SERVICE_NAME
- mNode.ServiceType = NOSQL_SERVICE_TYPE
- for _, attr := range event.Attributes {
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.MethodName = query
- mNode.Ps = []string{query}
- case "nosql.src_addr":
- mNode.SrcAddr = attr.Value.AsString()
- case "nosql.destination_addr":
- mNode.DestinationAddr = attr.Value.AsString()
- }
- }
- }
- func isEnter(_type string) bool {
- if _type == "APPLICATION" {
- return true
- }
- return false
- }
- func span(sd apmTraceSpan) *tracepb.Span {
- if sd == nil {
- return nil
- }
- tid := sd.SpanContext().TraceID()
- sid := sd.SpanContext().SpanID()
- s := &tracepb.Span{
- TraceId: tid[:],
- SpanId: sid[:],
- TraceState: sd.SpanContext().TraceState().String(),
- //Status: status(sd.Status().Code, sd.Status().Description),
- StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
- EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
- //Links: links(sd.Links()),
- //Kind: spanKind(sd.SpanKind()),
- Name: sd.Name(),
- Attributes: tracetransform.KeyValues(sd.Attributes()),
- //Events: spanEvents(sd.Events()),
- DroppedAttributesCount: uint32(sd.DroppedAttributes()),
- DroppedEventsCount: uint32(sd.DroppedEvents()),
- DroppedLinksCount: uint32(sd.DroppedLinks()),
- }
- if psid := sd.Parent().SpanID(); psid.IsValid() {
- s.ParentSpanId = psid[:]
- }
- return s
- }
- func Md5ToInt64(strParam string, Len int) int64 {
- sign := md5.Sum([]byte(strParam))
- signStr := fmt.Sprintf("%x", sign)
- charArr := []rune(signStr)
- var intStr string
- for _, value := range charArr {
- intStr += strconv.Itoa(int(value))
- }
- intStr = intStr[:Len]
- int64Data, err := strconv.ParseInt(intStr, 10, 64)
- if err != nil {
- return 0
- }
- return int64Data
- }
- // ns,ms
- func cleanNsTime(time int64) (uint64, uint64) {
- return uint64(time), uint64(math.Round(float64(time) / 1e6))
- }
|