| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406 |
- package otlptrace
- import (
- "crypto/md5"
- "encoding/json"
- "fmt"
- "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
- tracesdk "go.opentelemetry.io/otel/sdk/trace"
- tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
- "strconv"
- "strings"
- "sync"
- )
- const (
- APP_SERVICE_TYPE = "APPLICATION"
- SQL_SERVICE_TYPE = "SQL"
- NOSQL_SERVICE_TYPE = "NOSQL"
- HTTP_SERVICE_TYPE = "HTTP"
- )
- const (
- GO_SERVICE_NAME = "GO"
- MYSQL_SERVICE_NAME = "MYSQL"
- REDIS_SERVICE_NAME = "REDIS"
- HTTP_SERVICE_NAME = "HTTPCLIENT"
- )
- type apmTraceSpan tracesdk.ReadOnlySpan
- // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
- type RootDataT struct {
- AccountId int `json:"account_id"`
- AgentId int64 `json:"agent_id"`
- AgentVersion string `json:"agent_version"`
- AppId int64 `json:"app_id"`
- AppIdFrom int64 `json:"app_id_from"` // from header app_id
- AppName string `json:"app_name"`
- CalledId int64 `json:"called_id"` // from header assumed_app_id
- ClientIp string `json:"client_ip"`
- CollTime uint64 `json:"coll_time"`
- Cpu int `json:"cpu"`
- Custom string `json:"custom"`
- HostId int64 `json:"host_id"`
- HostName string `json:"host_name"`
- HttpCode int64 `json:"http_code"`
- HttpMethod string `json:"http_method"`
- InstanceId int64 `json:"instance_id"`
- InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
- LocalPort int64 `json:"local_port"`
- Maps []MapInfoT `json:"maps"`
- MemU int `json:"mem_u"`
- MemUP int `json:"mem_u_p"`
- OperType string `json:"oper_type"`
- Parameters []interface{} `json:"parameters"`
- ParentTaskName int `json:"parent_task_name"`
- Period int `json:"period"`
- RespTime uint64 `json:"resp_time"`
- Sampling int `json:"sampling"`
- ServiceName string `json:"service_name"`
- ServiceType string `json:"service_type"`
- Sip string `json:"sip"`
- Sn string `json:"sn"`
- SpanIdFrom string `json:"span_id_from"` // from header span_id
- Sport int64 `json:"sport"`
- TId int `json:"t_id"`
- TName string `json:"t_name"`
- TraceId string `json:"trace_id"` // from header trace_id
- TransIds []interface{} `json:"trans_ids"`
- TypeFrom string `json:"type_from"`
- Uri string `json:"uri"`
- UserDir int `json:"user_dir"`
- VipIds []interface{} `json:"vip_ids"`
- }
- type MapInfoT struct {
- Dbn string `json:"dbn,omitempty"`
- Exception int `json:"exception,omitempty"`
- ExceptionMsg string `json:"exception_msg,omitempty"`
- ExceptionStack string `json:"exception_stack,omitempty"`
- Ip string `json:"ip,omitempty"`
- Level int `json:"level"`
- MethodDesc string `json:"method_desc,omitempty"`
- MethodName string `json:"method_name"`
- Nid int `json:"nid"`
- OperType string `json:"oper_type,omitempty"`
- Pid int `json:"pid"`
- Port int64 `json:"port,omitempty"`
- Ps []string `json:"ps,omitempty"`
- PureTime uint64 `json:"pure_time"`
- ServiceName string `json:"service_name"`
- ServiceType string `json:"service_type"`
- StartTime uint64 `json:"start_time"`
- WallTime uint64 `json:"wall_time"`
- Schema string `json:"schema,omitempty"`
- AssumedAppId int64 `json:"assumed_app_id,omitempty"`
- Uri string `json:"uri,omitempty"`
- SpanId string `json:"span_id,omitempty"`
- }
- type TraceMapT struct {
- RootData RootDataT
- Index int
- lock *sync.RWMutex
- TheEnd bool
- }
- var TraceRootMap map[string]*TraceMapT
- func init() {
- TraceRootMap = make(map[string]*TraceMapT)
- }
- func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT {
- fmt.Println("len sdl", len(sdl))
- if len(sdl) == 0 {
- return nil
- }
- for _, sd := range sdl {
- if sd == nil {
- continue
- }
- traceId := sd.SpanContext().TraceID().String()
- if _, ok := TraceRootMap[traceId]; !ok {
- TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
- }
- TraceRootMap[traceId].Index++
- buildAndAssemblyMap(sd, TraceRootMap[traceId])
- }
- // 发送完整数据 | 大量长耗时请求会增加内存占用
- sendData := []RootDataT{}
- for traceId, v := range TraceRootMap {
- if v.TheEnd {
- sendData = append(sendData, v.RootData)
- delete(TraceRootMap, traceId)
- fmt.Println("the end!")
- } else {
- fmt.Println("not end!")
- }
- }
- // Transform the categorized map into a slice
- aa, err := json.Marshal(sendData)
- fmt.Println(err)
- fmt.Println(string(aa))
- fmt.Println(len(sendData))
- fmt.Println(len(sdl))
- return sendData
- }
- func initRootData(traceId string) RootDataT {
- data := RootDataT{
- AccountId: 110,
- AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
- AgentVersion: "2.1.0",
- AppId: 5410049101545798, // TODO 更新 基于appname生成
- AppIdFrom: -1,
- AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
- CalledId: -1,
- ClientIp: "",
- CollTime: 0,
- Cpu: 0,
- Custom: "",
- HostId: 10154813500555812,
- HostName: "localhost",
- HttpCode: 0,
- HttpMethod: "",
- InstanceId: 1005051101515357, // TODO 更新 基于ip:port
- InstanceIdFrom: -1,
- Maps: []MapInfoT{},
- MemU: 0,
- MemUP: 0,
- OperType: "",
- Parameters: []interface{}{},
- ParentTaskName: 0,
- Period: -1,
- RespTime: 0,
- Sampling: 0,
- ServiceName: "GO",
- ServiceType: APP_SERVICE_TYPE,
- Sip: "",
- Sn: "",
- SpanIdFrom: "",
- Sport: 0,
- TId: -1,
- TName: "",
- TraceId: traceId,
- TransIds: []interface{}{},
- TypeFrom: "",
- Uri: "",
- UserDir: 0,
- VipIds: []interface{}{},
- }
- return data
- }
- func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
- mNode := MapInfoT{
- Exception: 0,
- ExceptionMsg: "",
- ExceptionStack: "",
- Ip: "",
- Level: 2,
- Pid: 1,
- Port: 0,
- Ps: []string{},
- ServiceName: "",
- ServiceType: "",
- WallTime: 0,
- }
- mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
- mNode.WallTime = mNode.PureTime
- mNode.StartTime = spanSd.StartTimeUnixNano / 1e6
- return mNode, spanSd.Name
- }
- // 构建拼装
- func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
- mNode, mapType := initMapNode(span(sd))
- switch mapType {
- case "APPLICATION":
- buildAppMap(&mNode, traceRoot, sd)
- traceRoot.TheEnd = true
- case "HTTP":
- buildHttpMap(&mNode, sd)
- case "Mysql":
- buildMysqlMap(&mNode, sd)
- case "Redis":
- buildRedisMap(&mNode, sd)
- }
- if mapType != "" {
- mNode.Nid = traceRoot.Index
- traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
- }
- return mNode
- }
- func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
- mNode.ServiceName = GO_SERVICE_NAME
- mNode.ServiceType = APP_SERVICE_TYPE
- mNode.MethodName = "net/http.(*Transport).roundTrip()"
- mNode.Level = 1
- mNode.Pid = 0
- // 构建root节点
- traceRoot.RootData.RespTime = mNode.PureTime
- traceRoot.RootData.CollTime = mNode.StartTime
- traceRoot.Index = 1
- for _, attr := range sd.Attributes() {
- fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "http.uri":
- traceRoot.RootData.Uri = attr.Value.AsString()
- case "http.method":
- traceRoot.RootData.HttpMethod = attr.Value.AsString()
- case "http.status_code":
- traceRoot.RootData.HttpCode = attr.Value.AsInt64()
- case "net.peer.name":
- traceRoot.RootData.ClientIp = attr.Value.AsString()
- traceRoot.RootData.Sip = attr.Value.AsString()
- traceRoot.RootData.Sn = attr.Value.AsString()
- case "net.peer.port":
- traceRoot.RootData.Sport = attr.Value.AsInt64()
- traceRoot.RootData.LocalPort = attr.Value.AsInt64()
- case "server.trace_id_from":
- traceRoot.RootData.TraceId = attr.Value.AsString()
- case "server.called_id":
- traceRoot.RootData.CalledId = attr.Value.AsInt64()
- case "server.instance_id_from":
- traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
- case "server.app_id_from":
- traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
- case "server.span_id_from":
- traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
- }
- }
- }
- func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
- mNode.ServiceName = HTTP_SERVICE_NAME
- mNode.ServiceType = HTTP_SERVICE_TYPE
- mNode.Schema = "http"
- mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
- var descAddr string
- for _, attr := range sd.Attributes() {
- fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "http.ip":
- mNode.Ip = attr.Value.AsString()
- descAddr += mNode.Ip
- case "http.port":
- mNode.Port = attr.Value.AsInt64()
- descAddr += ":" + attr.Value.AsString()
- case "http.uri":
- mNode.Uri = attr.Value.AsString()
- case "http.assumed_app_id":
- mNode.AssumedAppId = attr.Value.AsInt64()
- case "http.span_id":
- mNode.SpanId = attr.Value.AsString()
- }
- }
- //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
- }
- func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
- mNode.Dbn = "unknown"
- mNode.ServiceName = MYSQL_SERVICE_NAME
- mNode.ServiceType = SQL_SERVICE_TYPE
- mNode.MethodName = "database/sql.Query()"
- for _, attr := range sd.Attributes() {
- //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.Ps = []string{query}
- words := strings.Fields(query)
- if len(words) > 0 {
- mNode.OperType = strings.ToUpper(words[0])
- }
- }
- }
- }
- func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
- mNode.ServiceName = REDIS_SERVICE_NAME
- mNode.ServiceType = NOSQL_SERVICE_TYPE
- //mNode.MethodName = span(sd).Name + " query"
- mNode.MethodName = "redis.Do()"
- for _, attr := range sd.Attributes() {
- //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
- switch attr.Key {
- case "net.peer.name":
- mNode.Ip = attr.Value.AsString()
- case "net.peer.port":
- mNode.Port = attr.Value.AsInt64()
- case "db.statement":
- query := attr.Value.AsString()
- mNode.Ps = []string{query}
- words := strings.Fields(query)
- if len(words) > 0 {
- mNode.OperType = strings.ToUpper(words[0])
- }
- }
- }
- }
- func isEnter(_type string) bool {
- if _type == "APPLICATION" {
- return true
- }
- return false
- }
- func span(sd apmTraceSpan) *tracepb.Span {
- if sd == nil {
- return nil
- }
- tid := sd.SpanContext().TraceID()
- sid := sd.SpanContext().SpanID()
- s := &tracepb.Span{
- TraceId: tid[:],
- SpanId: sid[:],
- TraceState: sd.SpanContext().TraceState().String(),
- //Status: status(sd.Status().Code, sd.Status().Description),
- StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
- EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
- //Links: links(sd.Links()),
- //Kind: spanKind(sd.SpanKind()),
- Name: sd.Name(),
- Attributes: tracetransform.KeyValues(sd.Attributes()),
- //Events: spanEvents(sd.Events()),
- DroppedAttributesCount: uint32(sd.DroppedAttributes()),
- DroppedEventsCount: uint32(sd.DroppedEvents()),
- DroppedLinksCount: uint32(sd.DroppedLinks()),
- }
- if psid := sd.Parent().SpanID(); psid.IsValid() {
- s.ParentSpanId = psid[:]
- }
- return s
- }
- func Md5ToInt64(strParam string, Len int) int64 {
- sign := md5.Sum([]byte(strParam))
- signStr := fmt.Sprintf("%x", sign)
- charArr := []rune(signStr)
- var intStr string
- for _, value := range charArr {
- intStr += strconv.Itoa(int(value))
- }
- intStr = intStr[:Len]
- int64Data, err := strconv.ParseInt(intStr, 10, 64)
- if err != nil {
- return 0
- }
- return int64Data
- }
|