apm_exporter.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144
  1. package otlptrace
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. . "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. klog "github.com/sirupsen/logrus"
  10. "math"
  11. "net/url"
  12. "sort"
  13. "strconv"
  14. "sync"
  15. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
  16. tracesdk "go.opentelemetry.io/otel/sdk/trace"
  17. tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
  18. )
  19. const (
  20. APP_SERVICE_TYPE = "APPLICATION"
  21. SQL_SERVICE_TYPE = "SQL"
  22. NOSQL_SERVICE_TYPE = "NOSQL"
  23. HTTP_SERVICE_TYPE = "HTTP"
  24. NET_SERVICE_TYPE = "L7_NET"
  25. )
  26. const (
  27. GO_SERVICE_NAME = "GO"
  28. MYSQL_SERVICE_NAME = "MYSQL"
  29. DM_SERVICE_NAME = "DM"
  30. REDIS_SERVICE_NAME = "REDIS"
  31. MONGO_SERVICE_NAME = "MONGODB"
  32. HTTP_SERVICE_NAME = "HTTPCLIENT"
  33. POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
  34. )
  35. type apmTraceSpan tracesdk.ReadOnlySpan
  36. // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
  37. type RootDataT struct {
  38. AccountId int `json:"account_id"`
  39. AgentId int64 `json:"agent_id"`
  40. AgentVersion string `json:"agent_version"`
  41. AppId int64 `json:"app_id"`
  42. AppIdFrom int64 `json:"app_id_from"` // from header app_id
  43. AppName string `json:"app_name"`
  44. CalledId int64 `json:"called_id"` // from header assumed_app_id
  45. ClientIp string `json:"client_ip"`
  46. CollTime uint64 `json:"coll_time"`
  47. Cpu int `json:"cpu"`
  48. Custom string `json:"custom"`
  49. HostId int64 `json:"host_id"`
  50. HostName string `json:"host_name"`
  51. HttpCode int64 `json:"http_code"`
  52. HttpMethod string `json:"http_method"`
  53. InstanceId int64 `json:"instance_id"`
  54. InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
  55. LocalPort int64 `json:"local_port"`
  56. Maps []MapInfoT `json:"maps"`
  57. MemU int `json:"mem_u"`
  58. MemUP int `json:"mem_u_p"`
  59. OperType string `json:"oper_type"`
  60. Parameters []ParamStruct `json:"parameters,omitempty"`
  61. ParentTaskName string `json:"parent_task_name"`
  62. Period int `json:"period"`
  63. RespTime uint64 `json:"resp_time"`
  64. Sampling int `json:"sampling"`
  65. ServiceName string `json:"service_name"`
  66. ServiceType string `json:"service_type"`
  67. Sip string `json:"sip"`
  68. Sn string `json:"sn"`
  69. SpanIdFrom string `json:"span_id_from"` // from header span_id
  70. Sport int64 `json:"sport"`
  71. TId int `json:"t_id"`
  72. TName string `json:"t_name"`
  73. TraceId string `json:"trace_id"` // from header trace_id
  74. TransIds []interface{} `json:"trans_ids"`
  75. TypeFrom string `json:"type_from"`
  76. Uri string `json:"uri"`
  77. UserDir string `json:"user_dir"`
  78. VipIds []interface{} `json:"vip_ids"`
  79. SrcAddr string `json:"src_addr"`
  80. DestinationAddr string `json:"destination_addr"`
  81. // op 新增字段
  82. Pid uint32 `json:"pid"`
  83. ContainerID string `json:"container_id"`
  84. }
  85. // ParamStruct 定义目标结构
  86. type ParamStruct struct {
  87. Name string `json:"name"`
  88. Values []string `json:"values"`
  89. }
  90. type MapInfoT struct {
  91. Dbn string `json:"dbn,omitempty"`
  92. Exception int `json:"exception,omitempty"`
  93. ExceptionMsg string `json:"exception_msg,omitempty"`
  94. ExceptionStack string `json:"exception_stack,omitempty"`
  95. Ip string `json:"ip,omitempty"`
  96. Level int `json:"level"`
  97. MethodDesc string `json:"method_desc,omitempty"`
  98. MethodName string `json:"method_name"`
  99. Nid int `json:"nid"`
  100. OperType string `json:"oper_type,omitempty"`
  101. Pid int `json:"pid"`
  102. Port int64 `json:"port,omitempty"`
  103. Ps []string `json:"ps,omitempty"`
  104. PureTime uint64 `json:"pure_time"`
  105. ServiceName string `json:"service_name"`
  106. ServiceType string `json:"service_type"`
  107. StartTime uint64 `json:"-"`
  108. EndTime uint64 `json:"-"`
  109. StartTimeMs uint64 `json:"start_time"`
  110. EndTimeMs uint64 `json:"end_time"`
  111. WallTime uint64 `json:"wall_time"`
  112. Schema string `json:"schema,omitempty"`
  113. AssumedAppId int64 `json:"assumed_app_id,omitempty"`
  114. Uri string `json:"uri,omitempty"`
  115. SpanId string `json:"span_id,omitempty"`
  116. SrcAddr string `json:"src_addr,omitempty"`
  117. DestinationAddr string `json:"destination_addr,omitempty"`
  118. }
  119. type TraceMapT struct {
  120. RootData RootDataT
  121. Index int
  122. lock *sync.RWMutex
  123. TheEnd bool
  124. }
  125. var TraceRootMap map[string]*TraceMapT
  126. func init() {
  127. TraceRootMap = make(map[string]*TraceMapT)
  128. //go func() {
  129. // for {
  130. // //fmt.Println(G_sdl)
  131. // time.Sleep(5 * time.Second)
  132. // }
  133. //}()
  134. }
  135. var G_sdl int
  136. func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
  137. //G_sdl += len(sdl)
  138. if len(sdl) == 0 {
  139. return nil
  140. }
  141. // 多次请求 sdl
  142. sendDataMap := make(map[int][]RootDataT)
  143. //sendData := []RootDataT{}
  144. for _, sd := range sdl {
  145. if sd == nil {
  146. continue
  147. }
  148. //traceId := sd.SpanContext().TraceID().String()
  149. //fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据
  150. // 构建map *RootDataT
  151. var rootData RootDataT
  152. rootData = initRootDataFromEvent()
  153. // build http入口 MapInfoT
  154. code_type := buildAppMapFromEvent(&rootData, sd)
  155. // 构建maps
  156. for _, event := range sd.Events() {
  157. //aaa, _ := json.Marshal(event)
  158. //fmt.Println("event.info", string(aaa))
  159. mNode := buildMapNodeFromEvent(event)
  160. switch EventType(event.EventType) {
  161. // stack
  162. case EventTypeFunEnt:
  163. // l7 event
  164. case EventTypeL7Request:
  165. switch l7.Protocol(event.ProtocolType) {
  166. case l7.ProtocolHTTP:
  167. buildHttpMapFromEvent(&mNode, event)
  168. case l7.ProtocolDNS:
  169. buildDNSMapEvent(&mNode, event)
  170. case l7.ProtocolMysql:
  171. buildSQLMapEvent(&mNode, event)
  172. case l7.ProtocolRedis:
  173. buildRedisMapEvent(&mNode, event)
  174. case l7.ProtocolMongo:
  175. buildMongoMapEvent(&mNode, event)
  176. // dm
  177. case l7.ProtocolDM:
  178. buildSQLMapEvent(&mNode, event)
  179. case l7.ProtocolPostgres:
  180. buildSQLMapEvent(&mNode, event)
  181. }
  182. }
  183. rootData.Maps = append(rootData.Maps, mNode)
  184. //fmt.Println(event.Name)
  185. //buildAndAssemblyMapFromEvent(event, rootData)
  186. }
  187. buildLevelFromEvent(&rootData)
  188. sendDataMap[code_type] = append(sendDataMap[code_type], rootData)
  189. //a, _ := json.Marshal(rootData)
  190. //fmt.Println(string(a))
  191. //sendData = append(sendData, rootData)
  192. //if _, ok := TraceRootMap[traceId]; !ok {
  193. //TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
  194. //}
  195. //TraceRootMap[traceId].Index++
  196. //buildAndAssemblyMap(sd, TraceRootMap[traceId])
  197. }
  198. // 发送完整数据 | 大量长耗时请求会增加内存占用
  199. //sendData := []RootDataT{}
  200. //for traceId, v := range TraceRootMap {
  201. // if v.TheEnd {
  202. // buildLevel(v)
  203. // sendData = append(sendData, v.RootData)
  204. // delete(TraceRootMap, traceId)
  205. // //fmt.Println("the end!")
  206. // } else {
  207. // //fmt.Println("not end!")
  208. // }
  209. //}
  210. //Transform the categorized map into a slice
  211. data, _ := json.Marshal(sendDataMap)
  212. klog.Debug(string(data))
  213. //fmt.Println(len(sendData))
  214. //fmt.Println("sdl len:", len(sdl))
  215. return sendDataMap
  216. }
  217. type TimeMap struct {
  218. Time uint64
  219. Type int
  220. Map *MapInfoT
  221. }
  222. //type TraceMapT struct {
  223. // RootData RootDataT
  224. // Index int
  225. // lock *sync.RWMutex
  226. // TheEnd bool
  227. //}
  228. //func buildLevel(sdl *TraceMapT) {
  229. // nidMap := make(map[int]*MapInfoT)
  230. //
  231. // mapSlice := []TimeMap{}
  232. //
  233. // for i, v := range sdl.RootData.Maps {
  234. // if v.ServiceType == "APPLICATION" {
  235. // continue
  236. // }
  237. // nidMap[v.Nid] = &sdl.RootData.Maps[i]
  238. // timeStartMap := TimeMap{
  239. // Time: v.StartTime,
  240. // Type: 0,
  241. // Map: &sdl.RootData.Maps[i],
  242. // }
  243. // mapSlice = append(mapSlice, timeStartMap)
  244. // timeEndMap := TimeMap{
  245. // Time: v.EndTime,
  246. // Type: 1,
  247. // Map: &sdl.RootData.Maps[i],
  248. // }
  249. // mapSlice = append(mapSlice, timeEndMap)
  250. // }
  251. // sort.Slice(mapSlice, func(i, j int) bool {
  252. // return mapSlice[i].Time < mapSlice[j].Time
  253. // })
  254. //
  255. // funStack := []TimeMap{}
  256. //
  257. // currentNid := 1
  258. // Nid := 2
  259. // level := 2
  260. //
  261. // for _, v := range mapSlice {
  262. // // fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  263. // if v.Type == 0 {
  264. // // 函数入口
  265. // funStack = append(funStack, v)
  266. // v.Map.Pid = currentNid
  267. // v.Map.Level = level
  268. // v.Map.Nid = Nid
  269. // currentNid = Nid
  270. // level += 1
  271. // Nid += 1
  272. // } else if v.Type == 1 {
  273. // // 函数出口
  274. // len := len(funStack)
  275. // funStack = funStack[:len-1]
  276. // if (len - 2) < 0 {
  277. // currentNid = 1
  278. // } else {
  279. // currentNid = funStack[len-2].Map.Nid
  280. // }
  281. //
  282. // level -= 1
  283. // }
  284. // }
  285. //}
  286. func buildLevelFromEvent(sdl *RootDataT) {
  287. nidMap := make(map[int]*MapInfoT)
  288. mapSlice := []TimeMap{}
  289. for i, v := range sdl.Maps {
  290. if v.ServiceType == "APPLICATION" {
  291. continue
  292. }
  293. nidMap[v.Nid] = &sdl.Maps[i]
  294. timeStartMap := TimeMap{
  295. Time: v.StartTime,
  296. Type: 0,
  297. Map: &sdl.Maps[i],
  298. }
  299. mapSlice = append(mapSlice, timeStartMap)
  300. timeEndMap := TimeMap{
  301. Time: v.EndTime,
  302. Type: 1,
  303. Map: &sdl.Maps[i],
  304. }
  305. mapSlice = append(mapSlice, timeEndMap)
  306. }
  307. sort.Slice(mapSlice, func(i, j int) bool {
  308. return mapSlice[i].Time < mapSlice[j].Time
  309. })
  310. funStack := []TimeMap{}
  311. currentNid := 1
  312. Nid := 2
  313. level := 2
  314. for _, v := range mapSlice {
  315. //klog.Debugln("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  316. if v.Type == 0 {
  317. // 函数入口
  318. funStack = append(funStack, v)
  319. v.Map.Pid = currentNid
  320. v.Map.Level = level
  321. v.Map.Nid = Nid
  322. currentNid = Nid
  323. level += 1
  324. Nid += 1
  325. } else if v.Type == 1 {
  326. // 函数出口
  327. len := len(funStack)
  328. funStack = funStack[:len-1]
  329. if (len - 2) < 0 {
  330. currentNid = 1
  331. } else {
  332. currentNid = funStack[len-2].Map.Nid
  333. }
  334. level -= 1
  335. }
  336. }
  337. }
  338. //func initRootData(traceId string) RootDataT {
  339. // data := RootDataT{
  340. // AccountId: 110,
  341. // AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
  342. // AgentVersion: "2.1.0",
  343. // AppId: 5410049101545798, // TODO 更新 基于appname生成
  344. // AppIdFrom: -1,
  345. // AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
  346. // CalledId: -1,
  347. // ClientIp: "",
  348. // CollTime: 0,
  349. // Cpu: 0,
  350. // Custom: "",
  351. // HostId: 10154813500555812,
  352. // HostName: "localhost",
  353. // HttpCode: 0,
  354. // HttpMethod: "",
  355. // InstanceId: 1005051101515357, // TODO 更新 基于ip:port
  356. // InstanceIdFrom: -1,
  357. // Maps: []MapInfoT{},
  358. // MemU: 0,
  359. // MemUP: 0,
  360. // OperType: "",
  361. // Parameters: []interface{}{},
  362. // ParentTaskName: 0,
  363. // Period: -1,
  364. // RespTime: 0,
  365. // Sampling: 0,
  366. // ServiceName: "GO",
  367. // ServiceType: APP_SERVICE_TYPE,
  368. // Sip: "",
  369. // Sn: "",
  370. // SpanIdFrom: "",
  371. // Sport: 0,
  372. // TId: -1,
  373. // TName: "",
  374. // TraceId: traceId,
  375. // TransIds: []interface{}{},
  376. // TypeFrom: "",
  377. // Uri: "",
  378. // UserDir: 0,
  379. // VipIds: []interface{}{},
  380. // }
  381. // return data
  382. //}
  383. func initRootDataFromEvent() RootDataT {
  384. hostID := utils.GetHostID()
  385. accountID := utils.GetAccountID()
  386. data := RootDataT{
  387. // todo AccountId
  388. AccountId: accountID,
  389. AgentId: 0, // 基于 ip:port + process_name + exe路径生成
  390. AgentVersion: "2.1.0",
  391. AppId: 0, // 基于appname生成
  392. AppIdFrom: -1,
  393. AppName: "eBPF-agent", // server配置
  394. CalledId: -1,
  395. ClientIp: "",
  396. CollTime: 0,
  397. Cpu: 0,
  398. Custom: "",
  399. HostId: hostID,
  400. HostName: "localhost",
  401. HttpCode: 0,
  402. HttpMethod: "",
  403. InstanceId: 0, // 基于ip:port
  404. InstanceIdFrom: -1,
  405. Maps: []MapInfoT{},
  406. MemU: 0,
  407. MemUP: 0,
  408. OperType: "",
  409. Parameters: []ParamStruct{},
  410. ParentTaskName: "",
  411. Period: -1,
  412. RespTime: 0,
  413. Sampling: 0,
  414. ServiceName: "",
  415. ServiceType: APP_SERVICE_TYPE,
  416. Sip: "",
  417. Sn: "",
  418. SpanIdFrom: "",
  419. Sport: 0,
  420. TId: -1,
  421. TName: "",
  422. TraceId: "",
  423. TransIds: []interface{}{},
  424. TypeFrom: "",
  425. Uri: "",
  426. UserDir: "",
  427. VipIds: []interface{}{},
  428. SrcAddr: "",
  429. DestinationAddr: "",
  430. }
  431. return data
  432. }
  433. func initRootDataJava() RootDataT {
  434. data := RootDataT{
  435. AccountId: 110,
  436. AgentId: 3934815089541000, // TODO 更新 基于 ip:port + process_name + exe路径生成
  437. AgentVersion: "2.21.0",
  438. AppId: 3365853273187618, // TODO 更新 基于appname生成
  439. AppIdFrom: -1,
  440. AppName: "eBPF-javaApplication", // TODO 更新 ip:port || process_name
  441. CalledId: -1,
  442. ClientIp: "",
  443. CollTime: 0,
  444. Cpu: 0,
  445. Custom: "",
  446. HostId: 2315065183171055,
  447. HostName: "localhost",
  448. HttpCode: 0,
  449. HttpMethod: "",
  450. InstanceId: 1128864082033413, // TODO 更新 基于ip:port
  451. InstanceIdFrom: -1,
  452. Maps: []MapInfoT{},
  453. MemU: 0,
  454. MemUP: 0,
  455. OperType: "",
  456. Parameters: []ParamStruct{},
  457. ParentTaskName: "",
  458. Period: -1,
  459. RespTime: 0,
  460. Sampling: 0,
  461. ServiceName: "TOMCAT",
  462. ServiceType: APP_SERVICE_TYPE,
  463. Sip: "",
  464. Sn: "",
  465. SpanIdFrom: "",
  466. Sport: 0,
  467. TId: -1,
  468. TName: "",
  469. TraceId: "",
  470. TransIds: []interface{}{},
  471. TypeFrom: "",
  472. Uri: "",
  473. UserDir: "",
  474. VipIds: []interface{}{},
  475. }
  476. return data
  477. }
  478. //func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
  479. // mNode := MapInfoT{
  480. // Exception: 0,
  481. // ExceptionMsg: "",
  482. // ExceptionStack: "",
  483. // Ip: "",
  484. // Level: 2,
  485. // Pid: 1,
  486. // Port: 0,
  487. // Ps: []string{},
  488. // ServiceName: "",
  489. // ServiceType: "",
  490. // WallTime: 0,
  491. // }
  492. // mNode.MethodName = spanSd.Name
  493. // mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
  494. // mNode.WallTime = mNode.PureTime
  495. // mNode.StartTime = spanSd.StartTimeUnixNano
  496. // mNode.EndTime = spanSd.EndTimeUnixNano
  497. //
  498. // for _, attr := range spanSd.GetAttributes() {
  499. // fmt.Println(attr.Key, ":", attr.Value.GetValue())
  500. //
  501. // switch attr.Key {
  502. // case "nid":
  503. // mNode.Nid = int(attr.Value.GetIntValue())
  504. // case "pid":
  505. // mNode.Pid = int(attr.Value.GetIntValue())
  506. // case "level":
  507. // mNode.Level = int(attr.Value.GetIntValue())
  508. // }
  509. // }
  510. //
  511. // return mNode, spanSd.Name
  512. //}
  513. func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT {
  514. mNode := MapInfoT{
  515. Exception: 0,
  516. ExceptionMsg: "",
  517. ExceptionStack: "",
  518. Ip: "",
  519. Level: 2,
  520. Pid: 1,
  521. Port: 0,
  522. Ps: []string{},
  523. ServiceName: "",
  524. ServiceType: "",
  525. WallTime: 0,
  526. }
  527. mNode.MethodName = event.Name
  528. //mNode.PureTime = (event.EndTimeUnixNano - event.StartTimeUnixNano) / 1e3
  529. //mNode.WallTime = mNode.PureTime
  530. //mNode.StartTime = spanSd.StartTimeUnixNano
  531. //mNode.EndTime = spanSd.EndTimeUnixNano
  532. for _, attr := range event.Attributes {
  533. //fmt.Println(event.Name, "--->buildMapNodeFromEvent--->", attr.Key, ":", attr.Value.AsInterface())
  534. switch attr.Key {
  535. case "nid":
  536. mNode.Nid = int(attr.Value.AsInt64())
  537. case "pid":
  538. mNode.Pid = int(attr.Value.AsInt64())
  539. case "level":
  540. mNode.Level = int(attr.Value.AsInt64())
  541. case "time.start_at":
  542. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  543. case "time.end_at":
  544. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  545. case "time.duration":
  546. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  547. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  548. }
  549. }
  550. return mNode
  551. }
  552. func parseURIToParams(input string) (string, []ParamStruct, error) {
  553. // 解析输入 URI
  554. parsedURL, err := url.Parse(input)
  555. if err != nil {
  556. return "", nil, fmt.Errorf("failed to parse URI: %w", err)
  557. }
  558. // 提取查询参数
  559. queryParams := parsedURL.Query()
  560. // 转换为目标结构
  561. var params []ParamStruct
  562. for key, values := range queryParams {
  563. params = append(params, ParamStruct{
  564. Name: key,
  565. Values: values,
  566. })
  567. }
  568. return parsedURL.Path, params, nil
  569. }
  570. // 构建拼装
  571. //func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
  572. // mNode, mapType := initMapNode(span(sd))
  573. // switch mapType {
  574. // case "APPLICATION":
  575. // buildAppMap(&mNode, traceRoot, sd)
  576. // traceRoot.TheEnd = true
  577. // case "HTTP":
  578. // buildHttpMap(&mNode, sd)
  579. // case "Mysql":
  580. // buildMysqlMap(&mNode, sd)
  581. // case "Redis":
  582. // buildRedisMap(&mNode, sd)
  583. // }
  584. // if mapType != "" {
  585. // mNode.Nid = traceRoot.Index
  586. // traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
  587. // }
  588. // return mNode
  589. //}
  590. //func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT {
  591. // mNode := buildMapNodeFromEvent(event)
  592. // switch mapType {
  593. // case "HTTP":
  594. // buildHttpMapFromEvent(mNode, event)
  595. // //case "Mysql":
  596. // // buildMysqlMap(mNode, sd)
  597. // //case "Redis":
  598. // // buildRedisMap(mNode, sd)
  599. // }
  600. // if mapType != "" {
  601. // //mNode.Nid = traceRoot.Index
  602. // traceRoot.Maps = append(traceRoot.Maps, mNode)
  603. // }
  604. // return mNode
  605. //}
  606. //func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
  607. // mNode.ServiceName = GO_SERVICE_NAME
  608. // mNode.ServiceType = APP_SERVICE_TYPE
  609. // mNode.MethodName = "net/http.(*Transport).roundTrip()"
  610. // mNode.Level = 1
  611. // mNode.Pid = 0
  612. // mNode.Nid = 1
  613. // // 构建root节点
  614. // traceRoot.RootData.RespTime = mNode.PureTime
  615. // traceRoot.RootData.CollTime = mNode.StartTime
  616. // traceRoot.Index = 1
  617. // for _, attr := range sd.Attributes() {
  618. // fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  619. // switch attr.Key {
  620. // case "http.uri":
  621. // traceRoot.RootData.Uri = attr.Value.AsString()
  622. // case "http.method":
  623. // traceRoot.RootData.HttpMethod = attr.Value.AsString()
  624. // case "http.status_code":
  625. // traceRoot.RootData.HttpCode = attr.Value.AsInt64()
  626. // case "net.peer.name":
  627. // traceRoot.RootData.ClientIp = attr.Value.AsString()
  628. // traceRoot.RootData.Sip = attr.Value.AsString()
  629. // traceRoot.RootData.Sn = attr.Value.AsString()
  630. // case "net.peer.port":
  631. // traceRoot.RootData.Sport = attr.Value.AsInt64()
  632. // traceRoot.RootData.LocalPort = attr.Value.AsInt64()
  633. // case "server.trace_id_from":
  634. // traceRoot.RootData.TraceId = attr.Value.AsString()
  635. // case "server.called_id":
  636. // traceRoot.RootData.CalledId = attr.Value.AsInt64()
  637. // case "server.instance_id_from":
  638. // traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
  639. // case "server.app_id_from":
  640. // traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
  641. // case "server.span_id_from":
  642. // traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
  643. // case "server.type_from":
  644. // traceRoot.RootData.TypeFrom = attr.Value.AsString()
  645. // }
  646. // }
  647. //
  648. //}
  649. func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
  650. mNode := MapInfoT{
  651. Exception: 0,
  652. ExceptionMsg: "",
  653. ExceptionStack: "",
  654. Ip: "",
  655. Level: 1,
  656. Pid: 1,
  657. Port: 0,
  658. Ps: []string{},
  659. ServiceName: "",
  660. ServiceType: "",
  661. WallTime: 0,
  662. }
  663. mNode.ServiceName = GO_SERVICE_NAME
  664. mNode.ServiceType = APP_SERVICE_TYPE
  665. mNode.MethodName = "Kernel Endpoint()"
  666. mNode.Level = 1
  667. mNode.Pid = 0
  668. mNode.Nid = 1
  669. var code_type int64
  670. // 构建root节点
  671. //traceRoot.RespTime = mNode.PureTimex
  672. //traceRoot.CollTime = mNode.StartTime
  673. for _, attr := range sd.Attributes() {
  674. klog.Debugln("Appmap:", attr.Key, ":", attr.Value.AsInterface())
  675. switch attr.Key {
  676. case "http.uri":
  677. traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString())
  678. case "http.method":
  679. traceRoot.HttpMethod = attr.Value.AsString()
  680. case "http.status_code":
  681. traceRoot.HttpCode = attr.Value.AsInt64()
  682. case "net.peer.name":
  683. // TODO 修改 ClientIp sip获取方式
  684. traceRoot.ClientIp = attr.Value.AsString()
  685. traceRoot.Sip = attr.Value.AsString()
  686. traceRoot.Sn = attr.Value.AsString()
  687. case "net.peer.port":
  688. traceRoot.Sport = attr.Value.AsInt64()
  689. traceRoot.LocalPort = attr.Value.AsInt64()
  690. case "server.trace_id_from":
  691. traceRoot.TraceId = attr.Value.AsString()
  692. case "server.called_id":
  693. traceRoot.CalledId = attr.Value.AsInt64()
  694. case "server.instance_id_from":
  695. traceRoot.InstanceIdFrom = attr.Value.AsInt64()
  696. case "server.app_id_from":
  697. traceRoot.AppIdFrom = attr.Value.AsInt64()
  698. case "server.span_id_from":
  699. traceRoot.SpanIdFrom = attr.Value.AsString()
  700. case "server.type_from":
  701. traceRoot.TypeFrom = attr.Value.AsString()
  702. case "time.start_at":
  703. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  704. traceRoot.CollTime = mNode.StartTimeMs
  705. case "time.end_at":
  706. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  707. case "time.duration":
  708. traceRoot.RespTime = uint64(attr.Value.AsInt64()) / 1e3
  709. mNode.PureTime = traceRoot.RespTime
  710. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  711. case "server.code_type":
  712. code_type = attr.Value.AsInt64()
  713. case "server.app_name":
  714. traceRoot.AppName = attr.Value.AsString()
  715. case "server.service_name":
  716. traceRoot.ServiceName = attr.Value.AsString()
  717. mNode.ServiceName = attr.Value.AsString()
  718. case "server.app_id":
  719. traceRoot.AppId = attr.Value.AsInt64()
  720. case "server.agent_id":
  721. traceRoot.AgentId = attr.Value.AsInt64()
  722. case "server.instance_id":
  723. traceRoot.InstanceId = attr.Value.AsInt64()
  724. case "server.src_addr":
  725. traceRoot.SrcAddr = attr.Value.AsString()
  726. case "server.dst_addr":
  727. traceRoot.DestinationAddr = attr.Value.AsString()
  728. case "server.pid":
  729. traceRoot.Pid = uint32(attr.Value.AsInt64())
  730. case "server.container_id":
  731. traceRoot.ContainerID = attr.Value.AsString()
  732. }
  733. }
  734. traceRoot.Maps = append(traceRoot.Maps, mNode)
  735. return int(code_type)
  736. }
  737. //func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
  738. // mNode.ServiceName = HTTP_SERVICE_NAME
  739. // mNode.ServiceType = HTTP_SERVICE_TYPE
  740. // mNode.Schema = "http"
  741. // mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
  742. // var descAddr string
  743. // for _, attr := range sd.Attributes() {
  744. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  745. // switch attr.Key {
  746. // case "http.ip":
  747. // mNode.Ip = attr.Value.AsString()
  748. // descAddr += mNode.Ip
  749. // case "http.port":
  750. // mNode.Port = attr.Value.AsInt64()
  751. // descAddr += ":" + attr.Value.AsString()
  752. // case "http.uri":
  753. // mNode.Uri = attr.Value.AsString()
  754. // case "http.assumed_app_id":
  755. // mNode.AssumedAppId = attr.Value.AsInt64()
  756. // case "http.span_id":
  757. // mNode.SpanId = attr.Value.AsString()
  758. // }
  759. // }
  760. // //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  761. //}
  762. func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
  763. mNode.ServiceName = HTTP_SERVICE_NAME
  764. mNode.ServiceType = HTTP_SERVICE_TYPE
  765. mNode.Schema = "http"
  766. //mNode.MethodName = "HTTP"
  767. //var descAddr string
  768. var method string
  769. for _, attr := range event.Attributes {
  770. klog.Debugln("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
  771. switch attr.Key {
  772. case "http.ip":
  773. mNode.Ip = attr.Value.AsString()
  774. //descAddr += mNode.Ip
  775. case "http.port":
  776. mNode.Port = attr.Value.AsInt64()
  777. case "http.method":
  778. //mNode.MethodName += " " + attr.Value.AsString()
  779. method = attr.Value.AsString()
  780. //descAddr += ":" + attr.Value.AsString()
  781. case "http.uri":
  782. mNode.Uri = attr.Value.AsString()
  783. //mNode.MethodName += " " + attr.Value.AsString()
  784. case "http.assumed_app_id":
  785. mNode.AssumedAppId = attr.Value.AsInt64()
  786. case "http.span_id":
  787. mNode.SpanId = attr.Value.AsString()
  788. case "time.start_at":
  789. mNode.StartTime = uint64(attr.Value.AsInt64())
  790. case "time.end_at":
  791. mNode.EndTime = uint64(attr.Value.AsInt64())
  792. case "time.duration":
  793. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  794. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  795. case "http.src_addr":
  796. mNode.SrcAddr = attr.Value.AsString()
  797. case "http.destination_addr":
  798. mNode.DestinationAddr = attr.Value.AsString()
  799. }
  800. }
  801. mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri)
  802. //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  803. }
  804. //func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
  805. // mNode.Dbn = "unknown"
  806. // mNode.ServiceName = MYSQL_SERVICE_NAME
  807. // mNode.ServiceType = SQL_SERVICE_TYPE
  808. // mNode.MethodName = "database/sql.Query()"
  809. // for _, attr := range sd.Attributes() {
  810. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  811. // switch attr.Key {
  812. // case "net.peer.name":
  813. // mNode.Ip = attr.Value.AsString()
  814. // case "net.peer.port":
  815. // mNode.Port = attr.Value.AsInt64()
  816. // case "db.statement":
  817. // query := attr.Value.AsString()
  818. // mNode.Ps = []string{query}
  819. // words := strings.Fields(query)
  820. // if len(words) > 0 {
  821. // mNode.OperType = strings.ToUpper(words[0])
  822. // }
  823. // }
  824. // }
  825. //}
  826. func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  827. mNode.Dbn = "-"
  828. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  829. mNode.ServiceType = SQL_SERVICE_TYPE
  830. //mNode.MethodName = "database/sql.Query()"
  831. for _, attr := range event.Attributes {
  832. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  833. switch attr.Key {
  834. case "net.peer.name":
  835. mNode.Ip = attr.Value.AsString()
  836. case "net.peer.port":
  837. mNode.Port = attr.Value.AsInt64()
  838. case "db.statement":
  839. query := attr.Value.AsString()
  840. mNode.MethodName = query
  841. mNode.Ps = []string{query}
  842. //words := strings.Fields(query)
  843. //if len(words) > 0 {
  844. // mNode.OperType = strings.ToUpper(words[0])
  845. //}
  846. case "sql.exception":
  847. if attr.Value.AsBool() {
  848. mNode.Exception = 1
  849. } else {
  850. mNode.Exception = 0
  851. }
  852. case "sql.exception_msg":
  853. mNode.ExceptionMsg = attr.Value.AsString()
  854. case "sql.src_addr":
  855. mNode.SrcAddr = attr.Value.AsString()
  856. case "sql.destination_addr":
  857. mNode.DestinationAddr = attr.Value.AsString()
  858. }
  859. }
  860. }
  861. func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  862. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  863. mNode.ServiceType = NET_SERVICE_TYPE
  864. var _type string
  865. var fqdn string
  866. var ips string
  867. var ttl int64
  868. for _, attr := range event.Attributes {
  869. switch attr.Key {
  870. case "dns.type":
  871. _type = attr.Value.AsString()
  872. case "dns.fqdn":
  873. fqdn = attr.Value.AsString()
  874. case "dns.ttl":
  875. ttl = attr.Value.AsInt64()
  876. case "dns.ips":
  877. if attr.Value.AsString() != "" {
  878. ips = "Addr: " + attr.Value.AsString()
  879. }
  880. }
  881. }
  882. mNode.MethodName = fmt.Sprintf("DNS Name: %s Type: %s TTL: %d %s", fqdn, _type, ttl, ips)
  883. }
  884. func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  885. mNode.Dbn = "-"
  886. mNode.ServiceName = POSTGRESQL_SERVICE_NAME
  887. mNode.ServiceType = SQL_SERVICE_TYPE
  888. //mNode.MethodName = "database/sql.Query()"
  889. for _, attr := range event.Attributes {
  890. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  891. switch attr.Key {
  892. case "net.peer.name":
  893. mNode.Ip = attr.Value.AsString()
  894. case "net.peer.port":
  895. mNode.Port = attr.Value.AsInt64()
  896. case "db.statement":
  897. query := attr.Value.AsString()
  898. mNode.Ps = []string{query}
  899. //words := strings.Fields(query)
  900. //if len(words) > 0 {
  901. // mNode.OperType = strings.ToUpper(words[0])
  902. //}
  903. case "sql.exception":
  904. if attr.Value.AsBool() {
  905. mNode.Exception = 1
  906. } else {
  907. mNode.Exception = 0
  908. }
  909. case "sql.src_addr":
  910. mNode.SrcAddr = attr.Value.AsString()
  911. case "sql.destination_addr":
  912. mNode.DestinationAddr = attr.Value.AsString()
  913. }
  914. }
  915. }
  916. func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  917. mNode.Dbn = "-"
  918. mNode.ServiceName = MYSQL_SERVICE_NAME
  919. mNode.ServiceType = SQL_SERVICE_TYPE
  920. //mNode.MethodName = "database/sql.Query()"
  921. for _, attr := range event.Attributes {
  922. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  923. switch attr.Key {
  924. case "net.peer.name":
  925. mNode.Ip = attr.Value.AsString()
  926. case "net.peer.port":
  927. mNode.Port = attr.Value.AsInt64()
  928. case "db.statement":
  929. query := attr.Value.AsString()
  930. mNode.MethodName = query
  931. mNode.Ps = []string{query}
  932. //words := strings.Fields(query)
  933. //if len(words) > 0 {
  934. // mNode.OperType = strings.ToUpper(words[0])
  935. //}
  936. case "sql.exception":
  937. if attr.Value.AsBool() {
  938. mNode.Exception = 1
  939. } else {
  940. mNode.Exception = 0
  941. }
  942. case "sql.src_addr":
  943. mNode.SrcAddr = attr.Value.AsString()
  944. case "sql.destination_addr":
  945. mNode.DestinationAddr = attr.Value.AsString()
  946. }
  947. }
  948. }
  949. func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  950. mNode.Dbn = "TEST"
  951. mNode.ServiceName = DM_SERVICE_NAME
  952. mNode.ServiceType = SQL_SERVICE_TYPE
  953. mNode.MethodName = "database/sql.Query()"
  954. for _, attr := range event.Attributes {
  955. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  956. switch attr.Key {
  957. case "net.peer.name":
  958. mNode.Ip = attr.Value.AsString()
  959. case "net.peer.port":
  960. mNode.Port = attr.Value.AsInt64()
  961. case "db.statement":
  962. query := attr.Value.AsString()
  963. mNode.Ps = []string{query}
  964. //words := strings.Fields(query)
  965. //if len(words) > 0 {
  966. // mNode.OperType = strings.ToUpper(words[0])
  967. //}
  968. case "sql.exception":
  969. if attr.Value.AsBool() {
  970. mNode.Exception = 1
  971. } else {
  972. mNode.Exception = 0
  973. }
  974. case "sql.src_addr":
  975. mNode.SrcAddr = attr.Value.AsString()
  976. case "sql.destination_addr":
  977. mNode.DestinationAddr = attr.Value.AsString()
  978. }
  979. }
  980. }
  981. func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  982. mNode.ServiceName = REDIS_SERVICE_NAME
  983. mNode.ServiceType = NOSQL_SERVICE_TYPE
  984. //mNode.MethodName = span(sd).Name + " query"
  985. //mNode.MethodName = "redis.Do()"
  986. for _, attr := range event.Attributes {
  987. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  988. switch attr.Key {
  989. case "net.peer.name":
  990. mNode.Ip = attr.Value.AsString()
  991. case "net.peer.port":
  992. mNode.Port = attr.Value.AsInt64()
  993. case "db.statement":
  994. query := attr.Value.AsString()
  995. mNode.MethodName = query
  996. mNode.Ps = []string{query}
  997. //words := strings.Fields(query)
  998. //if len(words) > 0 {
  999. // mNode.OperType = strings.ToUpper(words[0])
  1000. //}
  1001. case "nosql.src_addr":
  1002. mNode.SrcAddr = attr.Value.AsString()
  1003. case "nosql.destination_addr":
  1004. mNode.DestinationAddr = attr.Value.AsString()
  1005. }
  1006. }
  1007. }
  1008. func buildMongoMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  1009. mNode.ServiceName = MONGO_SERVICE_NAME
  1010. mNode.ServiceType = NOSQL_SERVICE_TYPE
  1011. for _, attr := range event.Attributes {
  1012. switch attr.Key {
  1013. case "net.peer.name":
  1014. mNode.Ip = attr.Value.AsString()
  1015. case "net.peer.port":
  1016. mNode.Port = attr.Value.AsInt64()
  1017. case "db.statement":
  1018. query := attr.Value.AsString()
  1019. mNode.MethodName = query
  1020. mNode.Ps = []string{query}
  1021. case "nosql.src_addr":
  1022. mNode.SrcAddr = attr.Value.AsString()
  1023. case "nosql.destination_addr":
  1024. mNode.DestinationAddr = attr.Value.AsString()
  1025. }
  1026. }
  1027. }
  1028. func isEnter(_type string) bool {
  1029. if _type == "APPLICATION" {
  1030. return true
  1031. }
  1032. return false
  1033. }
  1034. func span(sd apmTraceSpan) *tracepb.Span {
  1035. if sd == nil {
  1036. return nil
  1037. }
  1038. tid := sd.SpanContext().TraceID()
  1039. sid := sd.SpanContext().SpanID()
  1040. s := &tracepb.Span{
  1041. TraceId: tid[:],
  1042. SpanId: sid[:],
  1043. TraceState: sd.SpanContext().TraceState().String(),
  1044. //Status: status(sd.Status().Code, sd.Status().Description),
  1045. StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
  1046. EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
  1047. //Links: links(sd.Links()),
  1048. //Kind: spanKind(sd.SpanKind()),
  1049. Name: sd.Name(),
  1050. Attributes: tracetransform.KeyValues(sd.Attributes()),
  1051. //Events: spanEvents(sd.Events()),
  1052. DroppedAttributesCount: uint32(sd.DroppedAttributes()),
  1053. DroppedEventsCount: uint32(sd.DroppedEvents()),
  1054. DroppedLinksCount: uint32(sd.DroppedLinks()),
  1055. }
  1056. if psid := sd.Parent().SpanID(); psid.IsValid() {
  1057. s.ParentSpanId = psid[:]
  1058. }
  1059. return s
  1060. }
  1061. func Md5ToInt64(strParam string, Len int) int64 {
  1062. sign := md5.Sum([]byte(strParam))
  1063. signStr := fmt.Sprintf("%x", sign)
  1064. charArr := []rune(signStr)
  1065. var intStr string
  1066. for _, value := range charArr {
  1067. intStr += strconv.Itoa(int(value))
  1068. }
  1069. intStr = intStr[:Len]
  1070. int64Data, err := strconv.ParseInt(intStr, 10, 64)
  1071. if err != nil {
  1072. return 0
  1073. }
  1074. return int64Data
  1075. }
  1076. // ns,ms
  1077. func cleanNsTime(time int64) (uint64, uint64) {
  1078. return uint64(time), uint64(math.Round(float64(time) / 1e6))
  1079. }