apm_exporter.go 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111
  1. package otlptrace
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. . "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. klog "github.com/sirupsen/logrus"
  10. "math"
  11. "net/url"
  12. "sort"
  13. "strconv"
  14. "sync"
  15. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
  16. tracesdk "go.opentelemetry.io/otel/sdk/trace"
  17. tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
  18. )
  19. const (
  20. APP_SERVICE_TYPE = "APPLICATION"
  21. SQL_SERVICE_TYPE = "SQL"
  22. NOSQL_SERVICE_TYPE = "NOSQL"
  23. HTTP_SERVICE_TYPE = "HTTP"
  24. NET_SERVICE_TYPE = "L7_NET"
  25. )
  26. const (
  27. GO_SERVICE_NAME = "GO"
  28. MYSQL_SERVICE_NAME = "MYSQL"
  29. DM_SERVICE_NAME = "DM"
  30. REDIS_SERVICE_NAME = "REDIS"
  31. HTTP_SERVICE_NAME = "HTTPCLIENT"
  32. POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
  33. )
  34. type apmTraceSpan tracesdk.ReadOnlySpan
  35. // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
  36. type RootDataT struct {
  37. AccountId int `json:"account_id"`
  38. AgentId int64 `json:"agent_id"`
  39. AgentVersion string `json:"agent_version"`
  40. AppId int64 `json:"app_id"`
  41. AppIdFrom int64 `json:"app_id_from"` // from header app_id
  42. AppName string `json:"app_name"`
  43. CalledId int64 `json:"called_id"` // from header assumed_app_id
  44. ClientIp string `json:"client_ip"`
  45. CollTime uint64 `json:"coll_time"`
  46. Cpu int `json:"cpu"`
  47. Custom string `json:"custom"`
  48. HostId int64 `json:"host_id"`
  49. HostName string `json:"host_name"`
  50. HttpCode int64 `json:"http_code"`
  51. HttpMethod string `json:"http_method"`
  52. InstanceId int64 `json:"instance_id"`
  53. InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
  54. LocalPort int64 `json:"local_port"`
  55. Maps []MapInfoT `json:"maps"`
  56. MemU int `json:"mem_u"`
  57. MemUP int `json:"mem_u_p"`
  58. OperType string `json:"oper_type"`
  59. Parameters []ParamStruct `json:"parameters,omitempty"`
  60. ParentTaskName int `json:"parent_task_name"`
  61. Period int `json:"period"`
  62. RespTime uint64 `json:"resp_time"`
  63. Sampling int `json:"sampling"`
  64. ServiceName string `json:"service_name"`
  65. ServiceType string `json:"service_type"`
  66. Sip string `json:"sip"`
  67. Sn string `json:"sn"`
  68. SpanIdFrom string `json:"span_id_from"` // from header span_id
  69. Sport int64 `json:"sport"`
  70. TId int `json:"t_id"`
  71. TName string `json:"t_name"`
  72. TraceId string `json:"trace_id"` // from header trace_id
  73. TransIds []interface{} `json:"trans_ids"`
  74. TypeFrom string `json:"type_from"`
  75. Uri string `json:"uri"`
  76. UserDir int `json:"user_dir"`
  77. VipIds []interface{} `json:"vip_ids"`
  78. SrcAddr string `json:"src_addr"`
  79. DestinationAddr string `json:"destination_addr"`
  80. }
  81. // ParamStruct 定义目标结构
  82. type ParamStruct struct {
  83. Name string `json:"name"`
  84. Values []string `json:"values"`
  85. }
  86. type MapInfoT struct {
  87. Dbn string `json:"dbn,omitempty"`
  88. Exception int `json:"exception,omitempty"`
  89. ExceptionMsg string `json:"exception_msg,omitempty"`
  90. ExceptionStack string `json:"exception_stack,omitempty"`
  91. Ip string `json:"ip,omitempty"`
  92. Level int `json:"level"`
  93. MethodDesc string `json:"method_desc,omitempty"`
  94. MethodName string `json:"method_name"`
  95. Nid int `json:"nid"`
  96. OperType string `json:"oper_type,omitempty"`
  97. Pid int `json:"pid"`
  98. Port int64 `json:"port,omitempty"`
  99. Ps []string `json:"ps,omitempty"`
  100. PureTime uint64 `json:"pure_time"`
  101. ServiceName string `json:"service_name"`
  102. ServiceType string `json:"service_type"`
  103. StartTime uint64 `json:"-"`
  104. EndTime uint64 `json:"-"`
  105. StartTimeMs uint64 `json:"start_time"`
  106. EndTimeMs uint64 `json:"end_time"`
  107. WallTime uint64 `json:"wall_time"`
  108. Schema string `json:"schema,omitempty"`
  109. AssumedAppId int64 `json:"assumed_app_id,omitempty"`
  110. Uri string `json:"uri,omitempty"`
  111. SpanId string `json:"span_id,omitempty"`
  112. SrcAddr string `json:"src_addr,omitempty"`
  113. DestinationAddr string `json:"destination_addr,omitempty"`
  114. }
  115. type TraceMapT struct {
  116. RootData RootDataT
  117. Index int
  118. lock *sync.RWMutex
  119. TheEnd bool
  120. }
  121. var TraceRootMap map[string]*TraceMapT
  122. func init() {
  123. TraceRootMap = make(map[string]*TraceMapT)
  124. //go func() {
  125. // for {
  126. // //fmt.Println(G_sdl)
  127. // time.Sleep(5 * time.Second)
  128. // }
  129. //}()
  130. }
  131. var G_sdl int
  132. func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
  133. //G_sdl += len(sdl)
  134. if len(sdl) == 0 {
  135. return nil
  136. }
  137. // 多次请求 sdl
  138. sendDataMap := make(map[int][]RootDataT)
  139. //sendData := []RootDataT{}
  140. for _, sd := range sdl {
  141. if sd == nil {
  142. continue
  143. }
  144. //traceId := sd.SpanContext().TraceID().String()
  145. //fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据
  146. // 构建map *RootDataT
  147. var rootData RootDataT
  148. rootData = initRootDataFromEvent()
  149. // build http入口 MapInfoT
  150. code_type := buildAppMapFromEvent(&rootData, sd)
  151. // 构建maps
  152. for _, event := range sd.Events() {
  153. //aaa, _ := json.Marshal(event)
  154. //fmt.Println("event.info", string(aaa))
  155. mNode := buildMapNodeFromEvent(event)
  156. switch EventType(event.EventType) {
  157. // stack
  158. case EventTypeFunEnt:
  159. // l7 event
  160. case EventTypeL7Request:
  161. switch l7.Protocol(event.ProtocolType) {
  162. case l7.ProtocolHTTP:
  163. buildHttpMapFromEvent(&mNode, event)
  164. case l7.ProtocolDNS:
  165. buildDNSMapEvent(&mNode, event)
  166. case l7.ProtocolMysql:
  167. buildSQLMapEvent(&mNode, event)
  168. case l7.ProtocolRedis:
  169. buildRedisMapEvent(&mNode, event)
  170. // dm
  171. case l7.ProtocolDM:
  172. buildSQLMapEvent(&mNode, event)
  173. case l7.ProtocolPostgres:
  174. buildSQLMapEvent(&mNode, event)
  175. }
  176. }
  177. rootData.Maps = append(rootData.Maps, mNode)
  178. //fmt.Println(event.Name)
  179. //buildAndAssemblyMapFromEvent(event, rootData)
  180. }
  181. buildLevelFromEvent(&rootData)
  182. sendDataMap[code_type] = append(sendDataMap[code_type], rootData)
  183. //a, _ := json.Marshal(rootData)
  184. //fmt.Println(string(a))
  185. //sendData = append(sendData, rootData)
  186. //if _, ok := TraceRootMap[traceId]; !ok {
  187. //TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
  188. //}
  189. //TraceRootMap[traceId].Index++
  190. //buildAndAssemblyMap(sd, TraceRootMap[traceId])
  191. }
  192. // 发送完整数据 | 大量长耗时请求会增加内存占用
  193. //sendData := []RootDataT{}
  194. //for traceId, v := range TraceRootMap {
  195. // if v.TheEnd {
  196. // buildLevel(v)
  197. // sendData = append(sendData, v.RootData)
  198. // delete(TraceRootMap, traceId)
  199. // //fmt.Println("the end!")
  200. // } else {
  201. // //fmt.Println("not end!")
  202. // }
  203. //}
  204. //Transform the categorized map into a slice
  205. data, _ := json.Marshal(sendDataMap)
  206. klog.Debug(string(data))
  207. //fmt.Println(len(sendData))
  208. //fmt.Println("sdl len:", len(sdl))
  209. return sendDataMap
  210. }
  211. type TimeMap struct {
  212. Time uint64
  213. Type int
  214. Map *MapInfoT
  215. }
  216. //type TraceMapT struct {
  217. // RootData RootDataT
  218. // Index int
  219. // lock *sync.RWMutex
  220. // TheEnd bool
  221. //}
  222. //func buildLevel(sdl *TraceMapT) {
  223. // nidMap := make(map[int]*MapInfoT)
  224. //
  225. // mapSlice := []TimeMap{}
  226. //
  227. // for i, v := range sdl.RootData.Maps {
  228. // if v.ServiceType == "APPLICATION" {
  229. // continue
  230. // }
  231. // nidMap[v.Nid] = &sdl.RootData.Maps[i]
  232. // timeStartMap := TimeMap{
  233. // Time: v.StartTime,
  234. // Type: 0,
  235. // Map: &sdl.RootData.Maps[i],
  236. // }
  237. // mapSlice = append(mapSlice, timeStartMap)
  238. // timeEndMap := TimeMap{
  239. // Time: v.EndTime,
  240. // Type: 1,
  241. // Map: &sdl.RootData.Maps[i],
  242. // }
  243. // mapSlice = append(mapSlice, timeEndMap)
  244. // }
  245. // sort.Slice(mapSlice, func(i, j int) bool {
  246. // return mapSlice[i].Time < mapSlice[j].Time
  247. // })
  248. //
  249. // funStack := []TimeMap{}
  250. //
  251. // currentNid := 1
  252. // Nid := 2
  253. // level := 2
  254. //
  255. // for _, v := range mapSlice {
  256. // // fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  257. // if v.Type == 0 {
  258. // // 函数入口
  259. // funStack = append(funStack, v)
  260. // v.Map.Pid = currentNid
  261. // v.Map.Level = level
  262. // v.Map.Nid = Nid
  263. // currentNid = Nid
  264. // level += 1
  265. // Nid += 1
  266. // } else if v.Type == 1 {
  267. // // 函数出口
  268. // len := len(funStack)
  269. // funStack = funStack[:len-1]
  270. // if (len - 2) < 0 {
  271. // currentNid = 1
  272. // } else {
  273. // currentNid = funStack[len-2].Map.Nid
  274. // }
  275. //
  276. // level -= 1
  277. // }
  278. // }
  279. //}
  280. func buildLevelFromEvent(sdl *RootDataT) {
  281. nidMap := make(map[int]*MapInfoT)
  282. mapSlice := []TimeMap{}
  283. for i, v := range sdl.Maps {
  284. if v.ServiceType == "APPLICATION" {
  285. continue
  286. }
  287. nidMap[v.Nid] = &sdl.Maps[i]
  288. timeStartMap := TimeMap{
  289. Time: v.StartTime,
  290. Type: 0,
  291. Map: &sdl.Maps[i],
  292. }
  293. mapSlice = append(mapSlice, timeStartMap)
  294. timeEndMap := TimeMap{
  295. Time: v.EndTime,
  296. Type: 1,
  297. Map: &sdl.Maps[i],
  298. }
  299. mapSlice = append(mapSlice, timeEndMap)
  300. }
  301. sort.Slice(mapSlice, func(i, j int) bool {
  302. return mapSlice[i].Time < mapSlice[j].Time
  303. })
  304. funStack := []TimeMap{}
  305. currentNid := 1
  306. Nid := 2
  307. level := 2
  308. for _, v := range mapSlice {
  309. //klog.Debugln("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  310. if v.Type == 0 {
  311. // 函数入口
  312. funStack = append(funStack, v)
  313. v.Map.Pid = currentNid
  314. v.Map.Level = level
  315. v.Map.Nid = Nid
  316. currentNid = Nid
  317. level += 1
  318. Nid += 1
  319. } else if v.Type == 1 {
  320. // 函数出口
  321. len := len(funStack)
  322. funStack = funStack[:len-1]
  323. if (len - 2) < 0 {
  324. currentNid = 1
  325. } else {
  326. currentNid = funStack[len-2].Map.Nid
  327. }
  328. level -= 1
  329. }
  330. }
  331. }
  332. //func initRootData(traceId string) RootDataT {
  333. // data := RootDataT{
  334. // AccountId: 110,
  335. // AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
  336. // AgentVersion: "2.1.0",
  337. // AppId: 5410049101545798, // TODO 更新 基于appname生成
  338. // AppIdFrom: -1,
  339. // AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
  340. // CalledId: -1,
  341. // ClientIp: "",
  342. // CollTime: 0,
  343. // Cpu: 0,
  344. // Custom: "",
  345. // HostId: 10154813500555812,
  346. // HostName: "localhost",
  347. // HttpCode: 0,
  348. // HttpMethod: "",
  349. // InstanceId: 1005051101515357, // TODO 更新 基于ip:port
  350. // InstanceIdFrom: -1,
  351. // Maps: []MapInfoT{},
  352. // MemU: 0,
  353. // MemUP: 0,
  354. // OperType: "",
  355. // Parameters: []interface{}{},
  356. // ParentTaskName: 0,
  357. // Period: -1,
  358. // RespTime: 0,
  359. // Sampling: 0,
  360. // ServiceName: "GO",
  361. // ServiceType: APP_SERVICE_TYPE,
  362. // Sip: "",
  363. // Sn: "",
  364. // SpanIdFrom: "",
  365. // Sport: 0,
  366. // TId: -1,
  367. // TName: "",
  368. // TraceId: traceId,
  369. // TransIds: []interface{}{},
  370. // TypeFrom: "",
  371. // Uri: "",
  372. // UserDir: 0,
  373. // VipIds: []interface{}{},
  374. // }
  375. // return data
  376. //}
  377. func initRootDataFromEvent() RootDataT {
  378. hostID := utils.GetHostID()
  379. accountID := utils.GetAccountID()
  380. data := RootDataT{
  381. // todo AccountId
  382. AccountId: accountID,
  383. AgentId: 0, // 基于 ip:port + process_name + exe路径生成
  384. AgentVersion: "2.1.0",
  385. AppId: 0, // 基于appname生成
  386. AppIdFrom: -1,
  387. AppName: "eBPF-agent", // server配置
  388. CalledId: -1,
  389. ClientIp: "",
  390. CollTime: 0,
  391. Cpu: 0,
  392. Custom: "",
  393. HostId: hostID,
  394. HostName: "localhost",
  395. HttpCode: 0,
  396. HttpMethod: "",
  397. InstanceId: 0, // 基于ip:port
  398. InstanceIdFrom: -1,
  399. Maps: []MapInfoT{},
  400. MemU: 0,
  401. MemUP: 0,
  402. OperType: "",
  403. Parameters: []ParamStruct{},
  404. ParentTaskName: 0,
  405. Period: -1,
  406. RespTime: 0,
  407. Sampling: 0,
  408. ServiceName: "",
  409. ServiceType: APP_SERVICE_TYPE,
  410. Sip: "",
  411. Sn: "",
  412. SpanIdFrom: "",
  413. Sport: 0,
  414. TId: -1,
  415. TName: "",
  416. TraceId: "",
  417. TransIds: []interface{}{},
  418. TypeFrom: "",
  419. Uri: "",
  420. UserDir: 0,
  421. VipIds: []interface{}{},
  422. SrcAddr: "",
  423. DestinationAddr: "",
  424. }
  425. return data
  426. }
  427. func initRootDataJava() RootDataT {
  428. data := RootDataT{
  429. AccountId: 110,
  430. AgentId: 3934815089541000, // TODO 更新 基于 ip:port + process_name + exe路径生成
  431. AgentVersion: "2.21.0",
  432. AppId: 3365853273187618, // TODO 更新 基于appname生成
  433. AppIdFrom: -1,
  434. AppName: "eBPF-javaApplication", // TODO 更新 ip:port || process_name
  435. CalledId: -1,
  436. ClientIp: "",
  437. CollTime: 0,
  438. Cpu: 0,
  439. Custom: "",
  440. HostId: 2315065183171055,
  441. HostName: "localhost",
  442. HttpCode: 0,
  443. HttpMethod: "",
  444. InstanceId: 1128864082033413, // TODO 更新 基于ip:port
  445. InstanceIdFrom: -1,
  446. Maps: []MapInfoT{},
  447. MemU: 0,
  448. MemUP: 0,
  449. OperType: "",
  450. Parameters: []ParamStruct{},
  451. ParentTaskName: 0,
  452. Period: -1,
  453. RespTime: 0,
  454. Sampling: 0,
  455. ServiceName: "TOMCAT",
  456. ServiceType: APP_SERVICE_TYPE,
  457. Sip: "",
  458. Sn: "",
  459. SpanIdFrom: "",
  460. Sport: 0,
  461. TId: -1,
  462. TName: "",
  463. TraceId: "",
  464. TransIds: []interface{}{},
  465. TypeFrom: "",
  466. Uri: "",
  467. UserDir: 0,
  468. VipIds: []interface{}{},
  469. }
  470. return data
  471. }
  472. //func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
  473. // mNode := MapInfoT{
  474. // Exception: 0,
  475. // ExceptionMsg: "",
  476. // ExceptionStack: "",
  477. // Ip: "",
  478. // Level: 2,
  479. // Pid: 1,
  480. // Port: 0,
  481. // Ps: []string{},
  482. // ServiceName: "",
  483. // ServiceType: "",
  484. // WallTime: 0,
  485. // }
  486. // mNode.MethodName = spanSd.Name
  487. // mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
  488. // mNode.WallTime = mNode.PureTime
  489. // mNode.StartTime = spanSd.StartTimeUnixNano
  490. // mNode.EndTime = spanSd.EndTimeUnixNano
  491. //
  492. // for _, attr := range spanSd.GetAttributes() {
  493. // fmt.Println(attr.Key, ":", attr.Value.GetValue())
  494. //
  495. // switch attr.Key {
  496. // case "nid":
  497. // mNode.Nid = int(attr.Value.GetIntValue())
  498. // case "pid":
  499. // mNode.Pid = int(attr.Value.GetIntValue())
  500. // case "level":
  501. // mNode.Level = int(attr.Value.GetIntValue())
  502. // }
  503. // }
  504. //
  505. // return mNode, spanSd.Name
  506. //}
  507. func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT {
  508. mNode := MapInfoT{
  509. Exception: 0,
  510. ExceptionMsg: "",
  511. ExceptionStack: "",
  512. Ip: "",
  513. Level: 2,
  514. Pid: 1,
  515. Port: 0,
  516. Ps: []string{},
  517. ServiceName: "",
  518. ServiceType: "",
  519. WallTime: 0,
  520. }
  521. mNode.MethodName = event.Name
  522. //mNode.PureTime = (event.EndTimeUnixNano - event.StartTimeUnixNano) / 1e3
  523. //mNode.WallTime = mNode.PureTime
  524. //mNode.StartTime = spanSd.StartTimeUnixNano
  525. //mNode.EndTime = spanSd.EndTimeUnixNano
  526. for _, attr := range event.Attributes {
  527. //fmt.Println(event.Name, "--->buildMapNodeFromEvent--->", attr.Key, ":", attr.Value.AsInterface())
  528. switch attr.Key {
  529. case "nid":
  530. mNode.Nid = int(attr.Value.AsInt64())
  531. case "pid":
  532. mNode.Pid = int(attr.Value.AsInt64())
  533. case "level":
  534. mNode.Level = int(attr.Value.AsInt64())
  535. case "time.start_at":
  536. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  537. case "time.end_at":
  538. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  539. case "time.duration":
  540. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  541. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  542. }
  543. }
  544. return mNode
  545. }
  546. func parseURIToParams(input string) (string, []ParamStruct, error) {
  547. // 解析输入 URI
  548. parsedURL, err := url.Parse(input)
  549. if err != nil {
  550. return "", nil, fmt.Errorf("failed to parse URI: %w", err)
  551. }
  552. // 提取查询参数
  553. queryParams := parsedURL.Query()
  554. // 转换为目标结构
  555. var params []ParamStruct
  556. for key, values := range queryParams {
  557. params = append(params, ParamStruct{
  558. Name: key,
  559. Values: values,
  560. })
  561. }
  562. return parsedURL.Path, params, nil
  563. }
  564. // 构建拼装
  565. //func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
  566. // mNode, mapType := initMapNode(span(sd))
  567. // switch mapType {
  568. // case "APPLICATION":
  569. // buildAppMap(&mNode, traceRoot, sd)
  570. // traceRoot.TheEnd = true
  571. // case "HTTP":
  572. // buildHttpMap(&mNode, sd)
  573. // case "Mysql":
  574. // buildMysqlMap(&mNode, sd)
  575. // case "Redis":
  576. // buildRedisMap(&mNode, sd)
  577. // }
  578. // if mapType != "" {
  579. // mNode.Nid = traceRoot.Index
  580. // traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
  581. // }
  582. // return mNode
  583. //}
  584. //func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT {
  585. // mNode := buildMapNodeFromEvent(event)
  586. // switch mapType {
  587. // case "HTTP":
  588. // buildHttpMapFromEvent(mNode, event)
  589. // //case "Mysql":
  590. // // buildMysqlMap(mNode, sd)
  591. // //case "Redis":
  592. // // buildRedisMap(mNode, sd)
  593. // }
  594. // if mapType != "" {
  595. // //mNode.Nid = traceRoot.Index
  596. // traceRoot.Maps = append(traceRoot.Maps, mNode)
  597. // }
  598. // return mNode
  599. //}
  600. //func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
  601. // mNode.ServiceName = GO_SERVICE_NAME
  602. // mNode.ServiceType = APP_SERVICE_TYPE
  603. // mNode.MethodName = "net/http.(*Transport).roundTrip()"
  604. // mNode.Level = 1
  605. // mNode.Pid = 0
  606. // mNode.Nid = 1
  607. // // 构建root节点
  608. // traceRoot.RootData.RespTime = mNode.PureTime
  609. // traceRoot.RootData.CollTime = mNode.StartTime
  610. // traceRoot.Index = 1
  611. // for _, attr := range sd.Attributes() {
  612. // fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  613. // switch attr.Key {
  614. // case "http.uri":
  615. // traceRoot.RootData.Uri = attr.Value.AsString()
  616. // case "http.method":
  617. // traceRoot.RootData.HttpMethod = attr.Value.AsString()
  618. // case "http.status_code":
  619. // traceRoot.RootData.HttpCode = attr.Value.AsInt64()
  620. // case "net.peer.name":
  621. // traceRoot.RootData.ClientIp = attr.Value.AsString()
  622. // traceRoot.RootData.Sip = attr.Value.AsString()
  623. // traceRoot.RootData.Sn = attr.Value.AsString()
  624. // case "net.peer.port":
  625. // traceRoot.RootData.Sport = attr.Value.AsInt64()
  626. // traceRoot.RootData.LocalPort = attr.Value.AsInt64()
  627. // case "server.trace_id_from":
  628. // traceRoot.RootData.TraceId = attr.Value.AsString()
  629. // case "server.called_id":
  630. // traceRoot.RootData.CalledId = attr.Value.AsInt64()
  631. // case "server.instance_id_from":
  632. // traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
  633. // case "server.app_id_from":
  634. // traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
  635. // case "server.span_id_from":
  636. // traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
  637. // case "server.type_from":
  638. // traceRoot.RootData.TypeFrom = attr.Value.AsString()
  639. // }
  640. // }
  641. //
  642. //}
  643. func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
  644. mNode := MapInfoT{
  645. Exception: 0,
  646. ExceptionMsg: "",
  647. ExceptionStack: "",
  648. Ip: "",
  649. Level: 1,
  650. Pid: 1,
  651. Port: 0,
  652. Ps: []string{},
  653. ServiceName: "",
  654. ServiceType: "",
  655. WallTime: 0,
  656. }
  657. mNode.ServiceName = GO_SERVICE_NAME
  658. mNode.ServiceType = APP_SERVICE_TYPE
  659. mNode.MethodName = "Kernel Endpoint()"
  660. mNode.Level = 1
  661. mNode.Pid = 0
  662. mNode.Nid = 1
  663. var code_type int64
  664. // 构建root节点
  665. //traceRoot.RespTime = mNode.PureTimex
  666. //traceRoot.CollTime = mNode.StartTime
  667. for _, attr := range sd.Attributes() {
  668. klog.Debugln("Appmap:", attr.Key, ":", attr.Value.AsInterface())
  669. switch attr.Key {
  670. case "http.uri":
  671. traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString())
  672. case "http.method":
  673. traceRoot.HttpMethod = attr.Value.AsString()
  674. case "http.status_code":
  675. traceRoot.HttpCode = attr.Value.AsInt64()
  676. case "net.peer.name":
  677. // TODO 修改 ClientIp sip获取方式
  678. traceRoot.ClientIp = attr.Value.AsString()
  679. traceRoot.Sip = attr.Value.AsString()
  680. traceRoot.Sn = attr.Value.AsString()
  681. case "net.peer.port":
  682. traceRoot.Sport = attr.Value.AsInt64()
  683. traceRoot.LocalPort = attr.Value.AsInt64()
  684. case "server.trace_id_from":
  685. traceRoot.TraceId = attr.Value.AsString()
  686. case "server.called_id":
  687. traceRoot.CalledId = attr.Value.AsInt64()
  688. case "server.instance_id_from":
  689. traceRoot.InstanceIdFrom = attr.Value.AsInt64()
  690. case "server.app_id_from":
  691. traceRoot.AppIdFrom = attr.Value.AsInt64()
  692. case "server.span_id_from":
  693. traceRoot.SpanIdFrom = attr.Value.AsString()
  694. case "server.type_from":
  695. traceRoot.TypeFrom = attr.Value.AsString()
  696. case "time.start_at":
  697. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  698. traceRoot.CollTime = mNode.StartTimeMs
  699. case "time.end_at":
  700. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  701. case "time.duration":
  702. traceRoot.RespTime = uint64(attr.Value.AsInt64()) / 1e3
  703. mNode.PureTime = traceRoot.RespTime
  704. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  705. case "server.code_type":
  706. code_type = attr.Value.AsInt64()
  707. case "server.app_name":
  708. traceRoot.AppName = attr.Value.AsString()
  709. case "server.service_name":
  710. traceRoot.ServiceName = attr.Value.AsString()
  711. mNode.ServiceName = attr.Value.AsString()
  712. case "server.app_id":
  713. traceRoot.AppId = attr.Value.AsInt64()
  714. case "server.agent_id":
  715. traceRoot.AgentId = attr.Value.AsInt64()
  716. case "server.instance_id":
  717. traceRoot.InstanceId = attr.Value.AsInt64()
  718. case "server.src_addr":
  719. traceRoot.SrcAddr = attr.Value.AsString()
  720. case "server.dst_addr":
  721. traceRoot.DestinationAddr = attr.Value.AsString()
  722. }
  723. }
  724. traceRoot.Maps = append(traceRoot.Maps, mNode)
  725. return int(code_type)
  726. }
  727. //func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
  728. // mNode.ServiceName = HTTP_SERVICE_NAME
  729. // mNode.ServiceType = HTTP_SERVICE_TYPE
  730. // mNode.Schema = "http"
  731. // mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
  732. // var descAddr string
  733. // for _, attr := range sd.Attributes() {
  734. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  735. // switch attr.Key {
  736. // case "http.ip":
  737. // mNode.Ip = attr.Value.AsString()
  738. // descAddr += mNode.Ip
  739. // case "http.port":
  740. // mNode.Port = attr.Value.AsInt64()
  741. // descAddr += ":" + attr.Value.AsString()
  742. // case "http.uri":
  743. // mNode.Uri = attr.Value.AsString()
  744. // case "http.assumed_app_id":
  745. // mNode.AssumedAppId = attr.Value.AsInt64()
  746. // case "http.span_id":
  747. // mNode.SpanId = attr.Value.AsString()
  748. // }
  749. // }
  750. // //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  751. //}
  752. func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
  753. mNode.ServiceName = HTTP_SERVICE_NAME
  754. mNode.ServiceType = HTTP_SERVICE_TYPE
  755. mNode.Schema = "http"
  756. //mNode.MethodName = "HTTP"
  757. //var descAddr string
  758. var method string
  759. for _, attr := range event.Attributes {
  760. klog.Debugln("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
  761. switch attr.Key {
  762. case "http.ip":
  763. mNode.Ip = attr.Value.AsString()
  764. //descAddr += mNode.Ip
  765. case "http.port":
  766. mNode.Port = attr.Value.AsInt64()
  767. case "http.method":
  768. //mNode.MethodName += " " + attr.Value.AsString()
  769. method = attr.Value.AsString()
  770. //descAddr += ":" + attr.Value.AsString()
  771. case "http.uri":
  772. mNode.Uri = attr.Value.AsString()
  773. //mNode.MethodName += " " + attr.Value.AsString()
  774. case "http.assumed_app_id":
  775. mNode.AssumedAppId = attr.Value.AsInt64()
  776. case "http.span_id":
  777. mNode.SpanId = attr.Value.AsString()
  778. case "time.start_at":
  779. mNode.StartTime = uint64(attr.Value.AsInt64())
  780. case "time.end_at":
  781. mNode.EndTime = uint64(attr.Value.AsInt64())
  782. case "time.duration":
  783. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  784. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  785. case "http.src_addr":
  786. mNode.SrcAddr = attr.Value.AsString()
  787. case "http.destination_addr":
  788. mNode.DestinationAddr = attr.Value.AsString()
  789. }
  790. }
  791. mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri)
  792. //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  793. }
  794. //func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
  795. // mNode.Dbn = "unknown"
  796. // mNode.ServiceName = MYSQL_SERVICE_NAME
  797. // mNode.ServiceType = SQL_SERVICE_TYPE
  798. // mNode.MethodName = "database/sql.Query()"
  799. // for _, attr := range sd.Attributes() {
  800. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  801. // switch attr.Key {
  802. // case "net.peer.name":
  803. // mNode.Ip = attr.Value.AsString()
  804. // case "net.peer.port":
  805. // mNode.Port = attr.Value.AsInt64()
  806. // case "db.statement":
  807. // query := attr.Value.AsString()
  808. // mNode.Ps = []string{query}
  809. // words := strings.Fields(query)
  810. // if len(words) > 0 {
  811. // mNode.OperType = strings.ToUpper(words[0])
  812. // }
  813. // }
  814. // }
  815. //}
  816. func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  817. mNode.Dbn = "-"
  818. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  819. mNode.ServiceType = SQL_SERVICE_TYPE
  820. //mNode.MethodName = "database/sql.Query()"
  821. for _, attr := range event.Attributes {
  822. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  823. switch attr.Key {
  824. case "net.peer.name":
  825. mNode.Ip = attr.Value.AsString()
  826. case "net.peer.port":
  827. mNode.Port = attr.Value.AsInt64()
  828. case "db.statement":
  829. query := attr.Value.AsString()
  830. mNode.MethodName = query
  831. mNode.Ps = []string{query}
  832. //words := strings.Fields(query)
  833. //if len(words) > 0 {
  834. // mNode.OperType = strings.ToUpper(words[0])
  835. //}
  836. case "sql.exception":
  837. if attr.Value.AsBool() {
  838. mNode.Exception = 1
  839. } else {
  840. mNode.Exception = 0
  841. }
  842. case "sql.src_addr":
  843. mNode.SrcAddr = attr.Value.AsString()
  844. case "sql.destination_addr":
  845. mNode.DestinationAddr = attr.Value.AsString()
  846. }
  847. }
  848. }
  849. func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  850. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  851. mNode.ServiceType = NET_SERVICE_TYPE
  852. var _type string
  853. var fqdn string
  854. var ips string
  855. var ttl int64
  856. for _, attr := range event.Attributes {
  857. switch attr.Key {
  858. case "dns.type":
  859. _type = attr.Value.AsString()
  860. case "dns.fqdn":
  861. fqdn = attr.Value.AsString()
  862. case "dns.ttl":
  863. ttl = attr.Value.AsInt64()
  864. case "dns.ips":
  865. if attr.Value.AsString() != "" {
  866. ips = "Addr: " + attr.Value.AsString()
  867. }
  868. }
  869. }
  870. mNode.MethodName = fmt.Sprintf("DNS Name: %s Type: %s TTL: %d %s", fqdn, _type, ttl, ips)
  871. }
  872. func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  873. mNode.Dbn = "-"
  874. mNode.ServiceName = POSTGRESQL_SERVICE_NAME
  875. mNode.ServiceType = SQL_SERVICE_TYPE
  876. //mNode.MethodName = "database/sql.Query()"
  877. for _, attr := range event.Attributes {
  878. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  879. switch attr.Key {
  880. case "net.peer.name":
  881. mNode.Ip = attr.Value.AsString()
  882. case "net.peer.port":
  883. mNode.Port = attr.Value.AsInt64()
  884. case "db.statement":
  885. query := attr.Value.AsString()
  886. mNode.Ps = []string{query}
  887. //words := strings.Fields(query)
  888. //if len(words) > 0 {
  889. // mNode.OperType = strings.ToUpper(words[0])
  890. //}
  891. case "sql.exception":
  892. if attr.Value.AsBool() {
  893. mNode.Exception = 1
  894. } else {
  895. mNode.Exception = 0
  896. }
  897. case "sql.src_addr":
  898. mNode.SrcAddr = attr.Value.AsString()
  899. case "sql.destination_addr":
  900. mNode.DestinationAddr = attr.Value.AsString()
  901. }
  902. }
  903. }
  904. func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  905. mNode.Dbn = "-"
  906. mNode.ServiceName = MYSQL_SERVICE_NAME
  907. mNode.ServiceType = SQL_SERVICE_TYPE
  908. //mNode.MethodName = "database/sql.Query()"
  909. for _, attr := range event.Attributes {
  910. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  911. switch attr.Key {
  912. case "net.peer.name":
  913. mNode.Ip = attr.Value.AsString()
  914. case "net.peer.port":
  915. mNode.Port = attr.Value.AsInt64()
  916. case "db.statement":
  917. query := attr.Value.AsString()
  918. mNode.MethodName = query
  919. mNode.Ps = []string{query}
  920. //words := strings.Fields(query)
  921. //if len(words) > 0 {
  922. // mNode.OperType = strings.ToUpper(words[0])
  923. //}
  924. case "sql.exception":
  925. if attr.Value.AsBool() {
  926. mNode.Exception = 1
  927. } else {
  928. mNode.Exception = 0
  929. }
  930. case "sql.src_addr":
  931. mNode.SrcAddr = attr.Value.AsString()
  932. case "sql.destination_addr":
  933. mNode.DestinationAddr = attr.Value.AsString()
  934. }
  935. }
  936. }
  937. func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  938. mNode.Dbn = "TEST"
  939. mNode.ServiceName = DM_SERVICE_NAME
  940. mNode.ServiceType = SQL_SERVICE_TYPE
  941. mNode.MethodName = "database/sql.Query()"
  942. for _, attr := range event.Attributes {
  943. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  944. switch attr.Key {
  945. case "net.peer.name":
  946. mNode.Ip = attr.Value.AsString()
  947. case "net.peer.port":
  948. mNode.Port = attr.Value.AsInt64()
  949. case "db.statement":
  950. query := attr.Value.AsString()
  951. mNode.Ps = []string{query}
  952. //words := strings.Fields(query)
  953. //if len(words) > 0 {
  954. // mNode.OperType = strings.ToUpper(words[0])
  955. //}
  956. case "sql.exception":
  957. if attr.Value.AsBool() {
  958. mNode.Exception = 1
  959. } else {
  960. mNode.Exception = 0
  961. }
  962. case "sql.src_addr":
  963. mNode.SrcAddr = attr.Value.AsString()
  964. case "sql.destination_addr":
  965. mNode.DestinationAddr = attr.Value.AsString()
  966. }
  967. }
  968. }
  969. func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  970. mNode.ServiceName = REDIS_SERVICE_NAME
  971. mNode.ServiceType = NOSQL_SERVICE_TYPE
  972. //mNode.MethodName = span(sd).Name + " query"
  973. //mNode.MethodName = "redis.Do()"
  974. for _, attr := range event.Attributes {
  975. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  976. switch attr.Key {
  977. case "net.peer.name":
  978. mNode.Ip = attr.Value.AsString()
  979. case "net.peer.port":
  980. mNode.Port = attr.Value.AsInt64()
  981. case "db.statement":
  982. query := attr.Value.AsString()
  983. mNode.MethodName = query
  984. mNode.Ps = []string{query}
  985. //words := strings.Fields(query)
  986. //if len(words) > 0 {
  987. // mNode.OperType = strings.ToUpper(words[0])
  988. //}
  989. case "nosql.src_addr":
  990. mNode.SrcAddr = attr.Value.AsString()
  991. case "nosql.destination_addr":
  992. mNode.DestinationAddr = attr.Value.AsString()
  993. }
  994. }
  995. }
  996. func isEnter(_type string) bool {
  997. if _type == "APPLICATION" {
  998. return true
  999. }
  1000. return false
  1001. }
  1002. func span(sd apmTraceSpan) *tracepb.Span {
  1003. if sd == nil {
  1004. return nil
  1005. }
  1006. tid := sd.SpanContext().TraceID()
  1007. sid := sd.SpanContext().SpanID()
  1008. s := &tracepb.Span{
  1009. TraceId: tid[:],
  1010. SpanId: sid[:],
  1011. TraceState: sd.SpanContext().TraceState().String(),
  1012. //Status: status(sd.Status().Code, sd.Status().Description),
  1013. StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
  1014. EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
  1015. //Links: links(sd.Links()),
  1016. //Kind: spanKind(sd.SpanKind()),
  1017. Name: sd.Name(),
  1018. Attributes: tracetransform.KeyValues(sd.Attributes()),
  1019. //Events: spanEvents(sd.Events()),
  1020. DroppedAttributesCount: uint32(sd.DroppedAttributes()),
  1021. DroppedEventsCount: uint32(sd.DroppedEvents()),
  1022. DroppedLinksCount: uint32(sd.DroppedLinks()),
  1023. }
  1024. if psid := sd.Parent().SpanID(); psid.IsValid() {
  1025. s.ParentSpanId = psid[:]
  1026. }
  1027. return s
  1028. }
  1029. func Md5ToInt64(strParam string, Len int) int64 {
  1030. sign := md5.Sum([]byte(strParam))
  1031. signStr := fmt.Sprintf("%x", sign)
  1032. charArr := []rune(signStr)
  1033. var intStr string
  1034. for _, value := range charArr {
  1035. intStr += strconv.Itoa(int(value))
  1036. }
  1037. intStr = intStr[:Len]
  1038. int64Data, err := strconv.ParseInt(intStr, 10, 64)
  1039. if err != nil {
  1040. return 0
  1041. }
  1042. return int64Data
  1043. }
  1044. // ns,ms
  1045. func cleanNsTime(time int64) (uint64, uint64) {
  1046. return uint64(time), uint64(math.Round(float64(time) / 1e6))
  1047. }