apm_exporter.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. package otlptrace
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
  11. tracesdk "go.opentelemetry.io/otel/sdk/trace"
  12. tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
  13. )
  14. const (
  15. APP_SERVICE_TYPE = "APPLICATION"
  16. SQL_SERVICE_TYPE = "SQL"
  17. NOSQL_SERVICE_TYPE = "NOSQL"
  18. HTTP_SERVICE_TYPE = "HTTP"
  19. )
  20. const (
  21. GO_SERVICE_NAME = "GO"
  22. MYSQL_SERVICE_NAME = "MYSQL"
  23. REDIS_SERVICE_NAME = "REDIS"
  24. HTTP_SERVICE_NAME = "HTTPCLIENT"
  25. )
  26. type apmTraceSpan tracesdk.ReadOnlySpan
  27. // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
  28. type RootDataT struct {
  29. AccountId int `json:"account_id"`
  30. AgentId int64 `json:"agent_id"`
  31. AgentVersion string `json:"agent_version"`
  32. AppId int64 `json:"app_id"`
  33. AppIdFrom int64 `json:"app_id_from"` // from header app_id
  34. AppName string `json:"app_name"`
  35. CalledId int64 `json:"called_id"` // from header assumed_app_id
  36. ClientIp string `json:"client_ip"`
  37. CollTime uint64 `json:"coll_time"`
  38. Cpu int `json:"cpu"`
  39. Custom string `json:"custom"`
  40. HostId int64 `json:"host_id"`
  41. HostName string `json:"host_name"`
  42. HttpCode int64 `json:"http_code"`
  43. HttpMethod string `json:"http_method"`
  44. InstanceId int64 `json:"instance_id"`
  45. InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
  46. LocalPort int64 `json:"local_port"`
  47. Maps []MapInfoT `json:"maps"`
  48. MemU int `json:"mem_u"`
  49. MemUP int `json:"mem_u_p"`
  50. OperType string `json:"oper_type"`
  51. Parameters []interface{} `json:"parameters"`
  52. ParentTaskName int `json:"parent_task_name"`
  53. Period int `json:"period"`
  54. RespTime uint64 `json:"resp_time"`
  55. Sampling int `json:"sampling"`
  56. ServiceName string `json:"service_name"`
  57. ServiceType string `json:"service_type"`
  58. Sip string `json:"sip"`
  59. Sn string `json:"sn"`
  60. SpanIdFrom string `json:"span_id_from"` // from header span_id
  61. Sport int64 `json:"sport"`
  62. TId int `json:"t_id"`
  63. TName string `json:"t_name"`
  64. TraceId string `json:"trace_id"` // from header trace_id
  65. TransIds []interface{} `json:"trans_ids"`
  66. TypeFrom string `json:"type_from"`
  67. Uri string `json:"uri"`
  68. UserDir int `json:"user_dir"`
  69. VipIds []interface{} `json:"vip_ids"`
  70. }
  71. type MapInfoT struct {
  72. Dbn string `json:"dbn,omitempty"`
  73. Exception int `json:"exception,omitempty"`
  74. ExceptionMsg string `json:"exception_msg,omitempty"`
  75. ExceptionStack string `json:"exception_stack,omitempty"`
  76. Ip string `json:"ip,omitempty"`
  77. Level int `json:"level"`
  78. MethodDesc string `json:"method_desc,omitempty"`
  79. MethodName string `json:"method_name"`
  80. Nid int `json:"nid"`
  81. OperType string `json:"oper_type,omitempty"`
  82. Pid int `json:"pid"`
  83. Port int64 `json:"port,omitempty"`
  84. Ps []string `json:"ps,omitempty"`
  85. PureTime uint64 `json:"pure_time"`
  86. ServiceName string `json:"service_name"`
  87. ServiceType string `json:"service_type"`
  88. StartTime uint64 `json:"start_time"`
  89. WallTime uint64 `json:"wall_time"`
  90. Schema string `json:"schema,omitempty"`
  91. AssumedAppId int64 `json:"assumed_app_id,omitempty"`
  92. Uri string `json:"uri,omitempty"`
  93. SpanId string `json:"span_id,omitempty"`
  94. }
  95. type TraceMapT struct {
  96. RootData RootDataT
  97. Index int
  98. lock *sync.RWMutex
  99. TheEnd bool
  100. }
  101. var TraceRootMap map[string]*TraceMapT
  102. func init() {
  103. TraceRootMap = make(map[string]*TraceMapT)
  104. go func() {
  105. for {
  106. //fmt.Println(G_sdl)
  107. time.Sleep(5 * time.Second)
  108. }
  109. }()
  110. }
  111. var G_sdl int
  112. func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT {
  113. G_sdl += len(sdl)
  114. if len(sdl) == 0 {
  115. return nil
  116. }
  117. for _, sd := range sdl {
  118. if sd == nil {
  119. continue
  120. }
  121. traceId := sd.SpanContext().TraceID().String()
  122. if _, ok := TraceRootMap[traceId]; !ok {
  123. TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
  124. }
  125. TraceRootMap[traceId].Index++
  126. buildAndAssemblyMap(sd, TraceRootMap[traceId])
  127. }
  128. // 发送完整数据 | 大量长耗时请求会增加内存占用
  129. sendData := []RootDataT{}
  130. for traceId, v := range TraceRootMap {
  131. if v.TheEnd {
  132. buildLevel(v)
  133. sendData = append(sendData, v.RootData)
  134. delete(TraceRootMap, traceId)
  135. //fmt.Println("the end!")
  136. } else {
  137. //fmt.Println("not end!")
  138. }
  139. }
  140. // Transform the categorized map into a slice
  141. aa, err := json.Marshal(sendData)
  142. fmt.Println(err)
  143. fmt.Println(string(aa))
  144. fmt.Println(len(sendData))
  145. fmt.Println(len(sdl))
  146. return sendData
  147. }
  148. func buildLevelRecursion(parentMap map[int]map[int]*MapInfoT, pid int, level int) {
  149. for i, v := range parentMap[pid] {
  150. parentMap[pid][i].Level = level
  151. // fmt.Printf("buildLevelRecursion: %x, %x\n", v.Nid, v.Nid%0x1000000)
  152. buildLevelRecursion(parentMap, v.Nid%0x1000000, level+1)
  153. }
  154. }
  155. func buildLevel(sdl *TraceMapT) {
  156. parentMap := make(map[int]map[int]*MapInfoT)
  157. nidMap := make(map[int]*MapInfoT)
  158. for i, v := range sdl.RootData.Maps {
  159. nidMap[v.Nid%0x1000000] = &sdl.RootData.Maps[i]
  160. }
  161. // 没有 pid 的放到 application 上
  162. for i, v := range sdl.RootData.Maps {
  163. _, ok := nidMap[v.Pid]
  164. if !ok && v.Pid != 0 {
  165. sdl.RootData.Maps[i].Pid = 1
  166. }
  167. }
  168. // fmt.Println("nidMapnidMapnidMapnidMap--------------")
  169. // fmt.Println(nidMap)
  170. // fmt.Println(sdl.RootData.Maps)
  171. // 构建 pid 映射map
  172. for i, v := range sdl.RootData.Maps {
  173. _, ok := parentMap[v.Pid]
  174. if !ok {
  175. parentMap[v.Pid] = make(map[int]*MapInfoT)
  176. }
  177. parentMap[v.Pid][v.Nid] = &sdl.RootData.Maps[i]
  178. }
  179. // fmt.Println(parentMap)
  180. // fmt.Println("nidMapnidMapnidMapnidMap--------------||||")
  181. // 构建层级
  182. level := 1
  183. buildLevelRecursion(parentMap, 0, level)
  184. }
  185. func initRootData(traceId string) RootDataT {
  186. data := RootDataT{
  187. AccountId: 110,
  188. AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
  189. AgentVersion: "2.1.0",
  190. AppId: 5410049101545798, // TODO 更新 基于appname生成
  191. AppIdFrom: -1,
  192. AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
  193. CalledId: -1,
  194. ClientIp: "",
  195. CollTime: 0,
  196. Cpu: 0,
  197. Custom: "",
  198. HostId: 10154813500555812,
  199. HostName: "localhost",
  200. HttpCode: 0,
  201. HttpMethod: "",
  202. InstanceId: 1005051101515357, // TODO 更新 基于ip:port
  203. InstanceIdFrom: -1,
  204. Maps: []MapInfoT{},
  205. MemU: 0,
  206. MemUP: 0,
  207. OperType: "",
  208. Parameters: []interface{}{},
  209. ParentTaskName: 0,
  210. Period: -1,
  211. RespTime: 0,
  212. Sampling: 0,
  213. ServiceName: "GO",
  214. ServiceType: APP_SERVICE_TYPE,
  215. Sip: "",
  216. Sn: "",
  217. SpanIdFrom: "",
  218. Sport: 0,
  219. TId: -1,
  220. TName: "",
  221. TraceId: traceId,
  222. TransIds: []interface{}{},
  223. TypeFrom: "",
  224. Uri: "",
  225. UserDir: 0,
  226. VipIds: []interface{}{},
  227. }
  228. return data
  229. }
  230. func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
  231. mNode := MapInfoT{
  232. Exception: 0,
  233. ExceptionMsg: "",
  234. ExceptionStack: "",
  235. Ip: "",
  236. Level: 2,
  237. Pid: 1,
  238. Port: 0,
  239. Ps: []string{},
  240. ServiceName: "",
  241. ServiceType: "",
  242. WallTime: 0,
  243. }
  244. mNode.MethodName = spanSd.Name
  245. mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
  246. mNode.WallTime = mNode.PureTime
  247. mNode.StartTime = spanSd.StartTimeUnixNano / 1e6
  248. for _, attr := range spanSd.GetAttributes() {
  249. switch attr.Key {
  250. case "nid":
  251. mNode.Nid = int(attr.Value.GetIntValue())
  252. case "pid":
  253. mNode.Pid = int(attr.Value.GetIntValue())
  254. case "level":
  255. mNode.Level = int(attr.Value.GetIntValue())
  256. }
  257. }
  258. return mNode, spanSd.Name
  259. }
  260. // 构建拼装
  261. func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
  262. mNode, mapType := initMapNode(span(sd))
  263. switch mapType {
  264. case "APPLICATION":
  265. buildAppMap(&mNode, traceRoot, sd)
  266. traceRoot.TheEnd = true
  267. case "HTTP":
  268. buildHttpMap(&mNode, sd)
  269. case "Mysql":
  270. buildMysqlMap(&mNode, sd)
  271. case "Redis":
  272. buildRedisMap(&mNode, sd)
  273. }
  274. if mapType != "" {
  275. // mNode.Nid = traceRoot.Index
  276. traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
  277. }
  278. return mNode
  279. }
  280. func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
  281. mNode.ServiceName = GO_SERVICE_NAME
  282. mNode.ServiceType = APP_SERVICE_TYPE
  283. mNode.MethodName = "net/http.(*Transport).roundTrip()"
  284. mNode.Level = 1
  285. mNode.Pid = 0
  286. mNode.Nid = 1
  287. // 构建root节点
  288. traceRoot.RootData.RespTime = mNode.PureTime
  289. traceRoot.RootData.CollTime = mNode.StartTime
  290. traceRoot.Index = 1
  291. for _, attr := range sd.Attributes() {
  292. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  293. switch attr.Key {
  294. case "http.uri":
  295. traceRoot.RootData.Uri = attr.Value.AsString()
  296. case "http.method":
  297. traceRoot.RootData.HttpMethod = attr.Value.AsString()
  298. case "http.status_code":
  299. traceRoot.RootData.HttpCode = attr.Value.AsInt64()
  300. case "net.peer.name":
  301. traceRoot.RootData.ClientIp = attr.Value.AsString()
  302. traceRoot.RootData.Sip = attr.Value.AsString()
  303. traceRoot.RootData.Sn = attr.Value.AsString()
  304. case "net.peer.port":
  305. traceRoot.RootData.Sport = attr.Value.AsInt64()
  306. traceRoot.RootData.LocalPort = attr.Value.AsInt64()
  307. case "server.trace_id_from":
  308. traceRoot.RootData.TraceId = attr.Value.AsString()
  309. case "server.called_id":
  310. traceRoot.RootData.CalledId = attr.Value.AsInt64()
  311. case "server.instance_id_from":
  312. traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
  313. case "server.app_id_from":
  314. traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
  315. case "server.span_id_from":
  316. traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
  317. case "server.type_from":
  318. traceRoot.RootData.TypeFrom = attr.Value.AsString()
  319. }
  320. }
  321. }
  322. func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
  323. mNode.ServiceName = HTTP_SERVICE_NAME
  324. mNode.ServiceType = HTTP_SERVICE_TYPE
  325. mNode.Schema = "http"
  326. mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
  327. var descAddr string
  328. for _, attr := range sd.Attributes() {
  329. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  330. switch attr.Key {
  331. case "http.ip":
  332. mNode.Ip = attr.Value.AsString()
  333. descAddr += mNode.Ip
  334. case "http.port":
  335. mNode.Port = attr.Value.AsInt64()
  336. descAddr += ":" + attr.Value.AsString()
  337. case "http.uri":
  338. mNode.Uri = attr.Value.AsString()
  339. case "http.assumed_app_id":
  340. mNode.AssumedAppId = attr.Value.AsInt64()
  341. case "http.span_id":
  342. mNode.SpanId = attr.Value.AsString()
  343. }
  344. }
  345. //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  346. }
  347. func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
  348. mNode.Dbn = "unknown"
  349. mNode.ServiceName = MYSQL_SERVICE_NAME
  350. mNode.ServiceType = SQL_SERVICE_TYPE
  351. mNode.MethodName = "database/sql.Query()"
  352. for _, attr := range sd.Attributes() {
  353. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  354. switch attr.Key {
  355. case "net.peer.name":
  356. mNode.Ip = attr.Value.AsString()
  357. case "net.peer.port":
  358. mNode.Port = attr.Value.AsInt64()
  359. case "db.statement":
  360. query := attr.Value.AsString()
  361. mNode.Ps = []string{query}
  362. words := strings.Fields(query)
  363. if len(words) > 0 {
  364. mNode.OperType = strings.ToUpper(words[0])
  365. }
  366. }
  367. }
  368. }
  369. func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
  370. mNode.ServiceName = REDIS_SERVICE_NAME
  371. mNode.ServiceType = NOSQL_SERVICE_TYPE
  372. //mNode.MethodName = span(sd).Name + " query"
  373. mNode.MethodName = "redis.Do()"
  374. for _, attr := range sd.Attributes() {
  375. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  376. switch attr.Key {
  377. case "net.peer.name":
  378. mNode.Ip = attr.Value.AsString()
  379. case "net.peer.port":
  380. mNode.Port = attr.Value.AsInt64()
  381. case "db.statement":
  382. query := attr.Value.AsString()
  383. mNode.Ps = []string{query}
  384. words := strings.Fields(query)
  385. if len(words) > 0 {
  386. mNode.OperType = strings.ToUpper(words[0])
  387. }
  388. }
  389. }
  390. }
  391. func isEnter(_type string) bool {
  392. if _type == "APPLICATION" {
  393. return true
  394. }
  395. return false
  396. }
  397. func span(sd apmTraceSpan) *tracepb.Span {
  398. if sd == nil {
  399. return nil
  400. }
  401. tid := sd.SpanContext().TraceID()
  402. sid := sd.SpanContext().SpanID()
  403. s := &tracepb.Span{
  404. TraceId: tid[:],
  405. SpanId: sid[:],
  406. TraceState: sd.SpanContext().TraceState().String(),
  407. //Status: status(sd.Status().Code, sd.Status().Description),
  408. StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
  409. EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
  410. //Links: links(sd.Links()),
  411. //Kind: spanKind(sd.SpanKind()),
  412. Name: sd.Name(),
  413. Attributes: tracetransform.KeyValues(sd.Attributes()),
  414. //Events: spanEvents(sd.Events()),
  415. DroppedAttributesCount: uint32(sd.DroppedAttributes()),
  416. DroppedEventsCount: uint32(sd.DroppedEvents()),
  417. DroppedLinksCount: uint32(sd.DroppedLinks()),
  418. }
  419. if psid := sd.Parent().SpanID(); psid.IsValid() {
  420. s.ParentSpanId = psid[:]
  421. }
  422. return s
  423. }
  424. func Md5ToInt64(strParam string, Len int) int64 {
  425. sign := md5.Sum([]byte(strParam))
  426. signStr := fmt.Sprintf("%x", sign)
  427. charArr := []rune(signStr)
  428. var intStr string
  429. for _, value := range charArr {
  430. intStr += strconv.Itoa(int(value))
  431. }
  432. intStr = intStr[:Len]
  433. int64Data, err := strconv.ParseInt(intStr, 10, 64)
  434. if err != nil {
  435. return 0
  436. }
  437. return int64Data
  438. }