apm_exporter.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135
  1. package otlptrace
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. . "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. klog "github.com/sirupsen/logrus"
  10. "math"
  11. "net/url"
  12. "sort"
  13. "strconv"
  14. "sync"
  15. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
  16. tracesdk "go.opentelemetry.io/otel/sdk/trace"
  17. tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
  18. )
  19. const (
  20. APP_SERVICE_TYPE = "APPLICATION"
  21. SQL_SERVICE_TYPE = "SQL"
  22. NOSQL_SERVICE_TYPE = "NOSQL"
  23. HTTP_SERVICE_TYPE = "HTTP"
  24. NET_SERVICE_TYPE = "L7_NET"
  25. )
  26. const (
  27. GO_SERVICE_NAME = "GO"
  28. MYSQL_SERVICE_NAME = "MYSQL"
  29. DM_SERVICE_NAME = "DM"
  30. REDIS_SERVICE_NAME = "REDIS"
  31. MONGO_SERVICE_NAME = "MONGODB"
  32. HTTP_SERVICE_NAME = "HTTPCLIENT"
  33. POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
  34. )
  35. type apmTraceSpan tracesdk.ReadOnlySpan
  36. // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
  37. type RootDataT struct {
  38. AccountId int `json:"account_id"`
  39. AgentId int64 `json:"agent_id"`
  40. AgentVersion string `json:"agent_version"`
  41. AppId int64 `json:"app_id"`
  42. AppIdFrom int64 `json:"app_id_from"` // from header app_id
  43. AppName string `json:"app_name"`
  44. CalledId int64 `json:"called_id"` // from header assumed_app_id
  45. ClientIp string `json:"client_ip"`
  46. CollTime uint64 `json:"coll_time"`
  47. Cpu int `json:"cpu"`
  48. Custom string `json:"custom"`
  49. HostId int64 `json:"host_id"`
  50. HostName string `json:"host_name"`
  51. HttpCode int64 `json:"http_code"`
  52. HttpMethod string `json:"http_method"`
  53. InstanceId int64 `json:"instance_id"`
  54. InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
  55. LocalPort int64 `json:"local_port"`
  56. Maps []MapInfoT `json:"maps"`
  57. MemU int `json:"mem_u"`
  58. MemUP int `json:"mem_u_p"`
  59. OperType string `json:"oper_type"`
  60. Parameters []ParamStruct `json:"parameters,omitempty"`
  61. ParentTaskName int `json:"parent_task_name"`
  62. Period int `json:"period"`
  63. RespTime uint64 `json:"resp_time"`
  64. Sampling int `json:"sampling"`
  65. ServiceName string `json:"service_name"`
  66. ServiceType string `json:"service_type"`
  67. Sip string `json:"sip"`
  68. Sn string `json:"sn"`
  69. SpanIdFrom string `json:"span_id_from"` // from header span_id
  70. Sport int64 `json:"sport"`
  71. TId int `json:"t_id"`
  72. TName string `json:"t_name"`
  73. TraceId string `json:"trace_id"` // from header trace_id
  74. TransIds []interface{} `json:"trans_ids"`
  75. TypeFrom string `json:"type_from"`
  76. Uri string `json:"uri"`
  77. UserDir int `json:"user_dir"`
  78. VipIds []interface{} `json:"vip_ids"`
  79. SrcAddr string `json:"src_addr"`
  80. DestinationAddr string `json:"destination_addr"`
  81. }
  82. // ParamStruct 定义目标结构
  83. type ParamStruct struct {
  84. Name string `json:"name"`
  85. Values []string `json:"values"`
  86. }
  87. type MapInfoT struct {
  88. Dbn string `json:"dbn,omitempty"`
  89. Exception int `json:"exception,omitempty"`
  90. ExceptionMsg string `json:"exception_msg,omitempty"`
  91. ExceptionStack string `json:"exception_stack,omitempty"`
  92. Ip string `json:"ip,omitempty"`
  93. Level int `json:"level"`
  94. MethodDesc string `json:"method_desc,omitempty"`
  95. MethodName string `json:"method_name"`
  96. Nid int `json:"nid"`
  97. OperType string `json:"oper_type,omitempty"`
  98. Pid int `json:"pid"`
  99. Port int64 `json:"port,omitempty"`
  100. Ps []string `json:"ps,omitempty"`
  101. PureTime uint64 `json:"pure_time"`
  102. ServiceName string `json:"service_name"`
  103. ServiceType string `json:"service_type"`
  104. StartTime uint64 `json:"-"`
  105. EndTime uint64 `json:"-"`
  106. StartTimeMs uint64 `json:"start_time"`
  107. EndTimeMs uint64 `json:"end_time"`
  108. WallTime uint64 `json:"wall_time"`
  109. Schema string `json:"schema,omitempty"`
  110. AssumedAppId int64 `json:"assumed_app_id,omitempty"`
  111. Uri string `json:"uri,omitempty"`
  112. SpanId string `json:"span_id,omitempty"`
  113. SrcAddr string `json:"src_addr,omitempty"`
  114. DestinationAddr string `json:"destination_addr,omitempty"`
  115. }
  116. type TraceMapT struct {
  117. RootData RootDataT
  118. Index int
  119. lock *sync.RWMutex
  120. TheEnd bool
  121. }
  122. var TraceRootMap map[string]*TraceMapT
  123. func init() {
  124. TraceRootMap = make(map[string]*TraceMapT)
  125. //go func() {
  126. // for {
  127. // //fmt.Println(G_sdl)
  128. // time.Sleep(5 * time.Second)
  129. // }
  130. //}()
  131. }
  132. var G_sdl int
  133. func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
  134. //G_sdl += len(sdl)
  135. if len(sdl) == 0 {
  136. return nil
  137. }
  138. // 多次请求 sdl
  139. sendDataMap := make(map[int][]RootDataT)
  140. //sendData := []RootDataT{}
  141. for _, sd := range sdl {
  142. if sd == nil {
  143. continue
  144. }
  145. //traceId := sd.SpanContext().TraceID().String()
  146. //fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据
  147. // 构建map *RootDataT
  148. var rootData RootDataT
  149. rootData = initRootDataFromEvent()
  150. // build http入口 MapInfoT
  151. code_type := buildAppMapFromEvent(&rootData, sd)
  152. // 构建maps
  153. for _, event := range sd.Events() {
  154. //aaa, _ := json.Marshal(event)
  155. //fmt.Println("event.info", string(aaa))
  156. mNode := buildMapNodeFromEvent(event)
  157. switch EventType(event.EventType) {
  158. // stack
  159. case EventTypeFunEnt:
  160. // l7 event
  161. case EventTypeL7Request:
  162. switch l7.Protocol(event.ProtocolType) {
  163. case l7.ProtocolHTTP:
  164. buildHttpMapFromEvent(&mNode, event)
  165. case l7.ProtocolDNS:
  166. buildDNSMapEvent(&mNode, event)
  167. case l7.ProtocolMysql:
  168. buildSQLMapEvent(&mNode, event)
  169. case l7.ProtocolRedis:
  170. buildRedisMapEvent(&mNode, event)
  171. case l7.ProtocolMongo:
  172. buildMongoMapEvent(&mNode, event)
  173. // dm
  174. case l7.ProtocolDM:
  175. buildSQLMapEvent(&mNode, event)
  176. case l7.ProtocolPostgres:
  177. buildSQLMapEvent(&mNode, event)
  178. }
  179. }
  180. rootData.Maps = append(rootData.Maps, mNode)
  181. //fmt.Println(event.Name)
  182. //buildAndAssemblyMapFromEvent(event, rootData)
  183. }
  184. buildLevelFromEvent(&rootData)
  185. sendDataMap[code_type] = append(sendDataMap[code_type], rootData)
  186. //a, _ := json.Marshal(rootData)
  187. //fmt.Println(string(a))
  188. //sendData = append(sendData, rootData)
  189. //if _, ok := TraceRootMap[traceId]; !ok {
  190. //TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
  191. //}
  192. //TraceRootMap[traceId].Index++
  193. //buildAndAssemblyMap(sd, TraceRootMap[traceId])
  194. }
  195. // 发送完整数据 | 大量长耗时请求会增加内存占用
  196. //sendData := []RootDataT{}
  197. //for traceId, v := range TraceRootMap {
  198. // if v.TheEnd {
  199. // buildLevel(v)
  200. // sendData = append(sendData, v.RootData)
  201. // delete(TraceRootMap, traceId)
  202. // //fmt.Println("the end!")
  203. // } else {
  204. // //fmt.Println("not end!")
  205. // }
  206. //}
  207. //Transform the categorized map into a slice
  208. data, _ := json.Marshal(sendDataMap)
  209. klog.Debug(string(data))
  210. //fmt.Println(len(sendData))
  211. //fmt.Println("sdl len:", len(sdl))
  212. return sendDataMap
  213. }
  214. type TimeMap struct {
  215. Time uint64
  216. Type int
  217. Map *MapInfoT
  218. }
  219. //type TraceMapT struct {
  220. // RootData RootDataT
  221. // Index int
  222. // lock *sync.RWMutex
  223. // TheEnd bool
  224. //}
  225. //func buildLevel(sdl *TraceMapT) {
  226. // nidMap := make(map[int]*MapInfoT)
  227. //
  228. // mapSlice := []TimeMap{}
  229. //
  230. // for i, v := range sdl.RootData.Maps {
  231. // if v.ServiceType == "APPLICATION" {
  232. // continue
  233. // }
  234. // nidMap[v.Nid] = &sdl.RootData.Maps[i]
  235. // timeStartMap := TimeMap{
  236. // Time: v.StartTime,
  237. // Type: 0,
  238. // Map: &sdl.RootData.Maps[i],
  239. // }
  240. // mapSlice = append(mapSlice, timeStartMap)
  241. // timeEndMap := TimeMap{
  242. // Time: v.EndTime,
  243. // Type: 1,
  244. // Map: &sdl.RootData.Maps[i],
  245. // }
  246. // mapSlice = append(mapSlice, timeEndMap)
  247. // }
  248. // sort.Slice(mapSlice, func(i, j int) bool {
  249. // return mapSlice[i].Time < mapSlice[j].Time
  250. // })
  251. //
  252. // funStack := []TimeMap{}
  253. //
  254. // currentNid := 1
  255. // Nid := 2
  256. // level := 2
  257. //
  258. // for _, v := range mapSlice {
  259. // // fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  260. // if v.Type == 0 {
  261. // // 函数入口
  262. // funStack = append(funStack, v)
  263. // v.Map.Pid = currentNid
  264. // v.Map.Level = level
  265. // v.Map.Nid = Nid
  266. // currentNid = Nid
  267. // level += 1
  268. // Nid += 1
  269. // } else if v.Type == 1 {
  270. // // 函数出口
  271. // len := len(funStack)
  272. // funStack = funStack[:len-1]
  273. // if (len - 2) < 0 {
  274. // currentNid = 1
  275. // } else {
  276. // currentNid = funStack[len-2].Map.Nid
  277. // }
  278. //
  279. // level -= 1
  280. // }
  281. // }
  282. //}
  283. func buildLevelFromEvent(sdl *RootDataT) {
  284. nidMap := make(map[int]*MapInfoT)
  285. mapSlice := []TimeMap{}
  286. for i, v := range sdl.Maps {
  287. if v.ServiceType == "APPLICATION" {
  288. continue
  289. }
  290. nidMap[v.Nid] = &sdl.Maps[i]
  291. timeStartMap := TimeMap{
  292. Time: v.StartTime,
  293. Type: 0,
  294. Map: &sdl.Maps[i],
  295. }
  296. mapSlice = append(mapSlice, timeStartMap)
  297. timeEndMap := TimeMap{
  298. Time: v.EndTime,
  299. Type: 1,
  300. Map: &sdl.Maps[i],
  301. }
  302. mapSlice = append(mapSlice, timeEndMap)
  303. }
  304. sort.Slice(mapSlice, func(i, j int) bool {
  305. return mapSlice[i].Time < mapSlice[j].Time
  306. })
  307. funStack := []TimeMap{}
  308. currentNid := 1
  309. Nid := 2
  310. level := 2
  311. for _, v := range mapSlice {
  312. //klog.Debugln("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  313. if v.Type == 0 {
  314. // 函数入口
  315. funStack = append(funStack, v)
  316. v.Map.Pid = currentNid
  317. v.Map.Level = level
  318. v.Map.Nid = Nid
  319. currentNid = Nid
  320. level += 1
  321. Nid += 1
  322. } else if v.Type == 1 {
  323. // 函数出口
  324. len := len(funStack)
  325. funStack = funStack[:len-1]
  326. if (len - 2) < 0 {
  327. currentNid = 1
  328. } else {
  329. currentNid = funStack[len-2].Map.Nid
  330. }
  331. level -= 1
  332. }
  333. }
  334. }
  335. //func initRootData(traceId string) RootDataT {
  336. // data := RootDataT{
  337. // AccountId: 110,
  338. // AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
  339. // AgentVersion: "2.1.0",
  340. // AppId: 5410049101545798, // TODO 更新 基于appname生成
  341. // AppIdFrom: -1,
  342. // AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
  343. // CalledId: -1,
  344. // ClientIp: "",
  345. // CollTime: 0,
  346. // Cpu: 0,
  347. // Custom: "",
  348. // HostId: 10154813500555812,
  349. // HostName: "localhost",
  350. // HttpCode: 0,
  351. // HttpMethod: "",
  352. // InstanceId: 1005051101515357, // TODO 更新 基于ip:port
  353. // InstanceIdFrom: -1,
  354. // Maps: []MapInfoT{},
  355. // MemU: 0,
  356. // MemUP: 0,
  357. // OperType: "",
  358. // Parameters: []interface{}{},
  359. // ParentTaskName: 0,
  360. // Period: -1,
  361. // RespTime: 0,
  362. // Sampling: 0,
  363. // ServiceName: "GO",
  364. // ServiceType: APP_SERVICE_TYPE,
  365. // Sip: "",
  366. // Sn: "",
  367. // SpanIdFrom: "",
  368. // Sport: 0,
  369. // TId: -1,
  370. // TName: "",
  371. // TraceId: traceId,
  372. // TransIds: []interface{}{},
  373. // TypeFrom: "",
  374. // Uri: "",
  375. // UserDir: 0,
  376. // VipIds: []interface{}{},
  377. // }
  378. // return data
  379. //}
  380. func initRootDataFromEvent() RootDataT {
  381. hostID := utils.GetHostID()
  382. accountID := utils.GetAccountID()
  383. data := RootDataT{
  384. // todo AccountId
  385. AccountId: accountID,
  386. AgentId: 0, // 基于 ip:port + process_name + exe路径生成
  387. AgentVersion: "2.1.0",
  388. AppId: 0, // 基于appname生成
  389. AppIdFrom: -1,
  390. AppName: "eBPF-agent", // server配置
  391. CalledId: -1,
  392. ClientIp: "",
  393. CollTime: 0,
  394. Cpu: 0,
  395. Custom: "",
  396. HostId: hostID,
  397. HostName: "localhost",
  398. HttpCode: 0,
  399. HttpMethod: "",
  400. InstanceId: 0, // 基于ip:port
  401. InstanceIdFrom: -1,
  402. Maps: []MapInfoT{},
  403. MemU: 0,
  404. MemUP: 0,
  405. OperType: "",
  406. Parameters: []ParamStruct{},
  407. ParentTaskName: 0,
  408. Period: -1,
  409. RespTime: 0,
  410. Sampling: 0,
  411. ServiceName: "",
  412. ServiceType: APP_SERVICE_TYPE,
  413. Sip: "",
  414. Sn: "",
  415. SpanIdFrom: "",
  416. Sport: 0,
  417. TId: -1,
  418. TName: "",
  419. TraceId: "",
  420. TransIds: []interface{}{},
  421. TypeFrom: "",
  422. Uri: "",
  423. UserDir: 0,
  424. VipIds: []interface{}{},
  425. SrcAddr: "",
  426. DestinationAddr: "",
  427. }
  428. return data
  429. }
  430. func initRootDataJava() RootDataT {
  431. data := RootDataT{
  432. AccountId: 110,
  433. AgentId: 3934815089541000, // TODO 更新 基于 ip:port + process_name + exe路径生成
  434. AgentVersion: "2.21.0",
  435. AppId: 3365853273187618, // TODO 更新 基于appname生成
  436. AppIdFrom: -1,
  437. AppName: "eBPF-javaApplication", // TODO 更新 ip:port || process_name
  438. CalledId: -1,
  439. ClientIp: "",
  440. CollTime: 0,
  441. Cpu: 0,
  442. Custom: "",
  443. HostId: 2315065183171055,
  444. HostName: "localhost",
  445. HttpCode: 0,
  446. HttpMethod: "",
  447. InstanceId: 1128864082033413, // TODO 更新 基于ip:port
  448. InstanceIdFrom: -1,
  449. Maps: []MapInfoT{},
  450. MemU: 0,
  451. MemUP: 0,
  452. OperType: "",
  453. Parameters: []ParamStruct{},
  454. ParentTaskName: 0,
  455. Period: -1,
  456. RespTime: 0,
  457. Sampling: 0,
  458. ServiceName: "TOMCAT",
  459. ServiceType: APP_SERVICE_TYPE,
  460. Sip: "",
  461. Sn: "",
  462. SpanIdFrom: "",
  463. Sport: 0,
  464. TId: -1,
  465. TName: "",
  466. TraceId: "",
  467. TransIds: []interface{}{},
  468. TypeFrom: "",
  469. Uri: "",
  470. UserDir: 0,
  471. VipIds: []interface{}{},
  472. }
  473. return data
  474. }
  475. //func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
  476. // mNode := MapInfoT{
  477. // Exception: 0,
  478. // ExceptionMsg: "",
  479. // ExceptionStack: "",
  480. // Ip: "",
  481. // Level: 2,
  482. // Pid: 1,
  483. // Port: 0,
  484. // Ps: []string{},
  485. // ServiceName: "",
  486. // ServiceType: "",
  487. // WallTime: 0,
  488. // }
  489. // mNode.MethodName = spanSd.Name
  490. // mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
  491. // mNode.WallTime = mNode.PureTime
  492. // mNode.StartTime = spanSd.StartTimeUnixNano
  493. // mNode.EndTime = spanSd.EndTimeUnixNano
  494. //
  495. // for _, attr := range spanSd.GetAttributes() {
  496. // fmt.Println(attr.Key, ":", attr.Value.GetValue())
  497. //
  498. // switch attr.Key {
  499. // case "nid":
  500. // mNode.Nid = int(attr.Value.GetIntValue())
  501. // case "pid":
  502. // mNode.Pid = int(attr.Value.GetIntValue())
  503. // case "level":
  504. // mNode.Level = int(attr.Value.GetIntValue())
  505. // }
  506. // }
  507. //
  508. // return mNode, spanSd.Name
  509. //}
  510. func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT {
  511. mNode := MapInfoT{
  512. Exception: 0,
  513. ExceptionMsg: "",
  514. ExceptionStack: "",
  515. Ip: "",
  516. Level: 2,
  517. Pid: 1,
  518. Port: 0,
  519. Ps: []string{},
  520. ServiceName: "",
  521. ServiceType: "",
  522. WallTime: 0,
  523. }
  524. mNode.MethodName = event.Name
  525. //mNode.PureTime = (event.EndTimeUnixNano - event.StartTimeUnixNano) / 1e3
  526. //mNode.WallTime = mNode.PureTime
  527. //mNode.StartTime = spanSd.StartTimeUnixNano
  528. //mNode.EndTime = spanSd.EndTimeUnixNano
  529. for _, attr := range event.Attributes {
  530. //fmt.Println(event.Name, "--->buildMapNodeFromEvent--->", attr.Key, ":", attr.Value.AsInterface())
  531. switch attr.Key {
  532. case "nid":
  533. mNode.Nid = int(attr.Value.AsInt64())
  534. case "pid":
  535. mNode.Pid = int(attr.Value.AsInt64())
  536. case "level":
  537. mNode.Level = int(attr.Value.AsInt64())
  538. case "time.start_at":
  539. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  540. case "time.end_at":
  541. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  542. case "time.duration":
  543. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  544. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  545. }
  546. }
  547. return mNode
  548. }
  549. func parseURIToParams(input string) (string, []ParamStruct, error) {
  550. // 解析输入 URI
  551. parsedURL, err := url.Parse(input)
  552. if err != nil {
  553. return "", nil, fmt.Errorf("failed to parse URI: %w", err)
  554. }
  555. // 提取查询参数
  556. queryParams := parsedURL.Query()
  557. // 转换为目标结构
  558. var params []ParamStruct
  559. for key, values := range queryParams {
  560. params = append(params, ParamStruct{
  561. Name: key,
  562. Values: values,
  563. })
  564. }
  565. return parsedURL.Path, params, nil
  566. }
  567. // 构建拼装
  568. //func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
  569. // mNode, mapType := initMapNode(span(sd))
  570. // switch mapType {
  571. // case "APPLICATION":
  572. // buildAppMap(&mNode, traceRoot, sd)
  573. // traceRoot.TheEnd = true
  574. // case "HTTP":
  575. // buildHttpMap(&mNode, sd)
  576. // case "Mysql":
  577. // buildMysqlMap(&mNode, sd)
  578. // case "Redis":
  579. // buildRedisMap(&mNode, sd)
  580. // }
  581. // if mapType != "" {
  582. // mNode.Nid = traceRoot.Index
  583. // traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
  584. // }
  585. // return mNode
  586. //}
  587. //func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT {
  588. // mNode := buildMapNodeFromEvent(event)
  589. // switch mapType {
  590. // case "HTTP":
  591. // buildHttpMapFromEvent(mNode, event)
  592. // //case "Mysql":
  593. // // buildMysqlMap(mNode, sd)
  594. // //case "Redis":
  595. // // buildRedisMap(mNode, sd)
  596. // }
  597. // if mapType != "" {
  598. // //mNode.Nid = traceRoot.Index
  599. // traceRoot.Maps = append(traceRoot.Maps, mNode)
  600. // }
  601. // return mNode
  602. //}
  603. //func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
  604. // mNode.ServiceName = GO_SERVICE_NAME
  605. // mNode.ServiceType = APP_SERVICE_TYPE
  606. // mNode.MethodName = "net/http.(*Transport).roundTrip()"
  607. // mNode.Level = 1
  608. // mNode.Pid = 0
  609. // mNode.Nid = 1
  610. // // 构建root节点
  611. // traceRoot.RootData.RespTime = mNode.PureTime
  612. // traceRoot.RootData.CollTime = mNode.StartTime
  613. // traceRoot.Index = 1
  614. // for _, attr := range sd.Attributes() {
  615. // fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  616. // switch attr.Key {
  617. // case "http.uri":
  618. // traceRoot.RootData.Uri = attr.Value.AsString()
  619. // case "http.method":
  620. // traceRoot.RootData.HttpMethod = attr.Value.AsString()
  621. // case "http.status_code":
  622. // traceRoot.RootData.HttpCode = attr.Value.AsInt64()
  623. // case "net.peer.name":
  624. // traceRoot.RootData.ClientIp = attr.Value.AsString()
  625. // traceRoot.RootData.Sip = attr.Value.AsString()
  626. // traceRoot.RootData.Sn = attr.Value.AsString()
  627. // case "net.peer.port":
  628. // traceRoot.RootData.Sport = attr.Value.AsInt64()
  629. // traceRoot.RootData.LocalPort = attr.Value.AsInt64()
  630. // case "server.trace_id_from":
  631. // traceRoot.RootData.TraceId = attr.Value.AsString()
  632. // case "server.called_id":
  633. // traceRoot.RootData.CalledId = attr.Value.AsInt64()
  634. // case "server.instance_id_from":
  635. // traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
  636. // case "server.app_id_from":
  637. // traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
  638. // case "server.span_id_from":
  639. // traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
  640. // case "server.type_from":
  641. // traceRoot.RootData.TypeFrom = attr.Value.AsString()
  642. // }
  643. // }
  644. //
  645. //}
  646. func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
  647. mNode := MapInfoT{
  648. Exception: 0,
  649. ExceptionMsg: "",
  650. ExceptionStack: "",
  651. Ip: "",
  652. Level: 1,
  653. Pid: 1,
  654. Port: 0,
  655. Ps: []string{},
  656. ServiceName: "",
  657. ServiceType: "",
  658. WallTime: 0,
  659. }
  660. mNode.ServiceName = GO_SERVICE_NAME
  661. mNode.ServiceType = APP_SERVICE_TYPE
  662. mNode.MethodName = "Kernel Endpoint()"
  663. mNode.Level = 1
  664. mNode.Pid = 0
  665. mNode.Nid = 1
  666. var code_type int64
  667. // 构建root节点
  668. //traceRoot.RespTime = mNode.PureTimex
  669. //traceRoot.CollTime = mNode.StartTime
  670. for _, attr := range sd.Attributes() {
  671. klog.Debugln("Appmap:", attr.Key, ":", attr.Value.AsInterface())
  672. switch attr.Key {
  673. case "http.uri":
  674. traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString())
  675. case "http.method":
  676. traceRoot.HttpMethod = attr.Value.AsString()
  677. case "http.status_code":
  678. traceRoot.HttpCode = attr.Value.AsInt64()
  679. case "net.peer.name":
  680. // TODO 修改 ClientIp sip获取方式
  681. traceRoot.ClientIp = attr.Value.AsString()
  682. traceRoot.Sip = attr.Value.AsString()
  683. traceRoot.Sn = attr.Value.AsString()
  684. case "net.peer.port":
  685. traceRoot.Sport = attr.Value.AsInt64()
  686. traceRoot.LocalPort = attr.Value.AsInt64()
  687. case "server.trace_id_from":
  688. traceRoot.TraceId = attr.Value.AsString()
  689. case "server.called_id":
  690. traceRoot.CalledId = attr.Value.AsInt64()
  691. case "server.instance_id_from":
  692. traceRoot.InstanceIdFrom = attr.Value.AsInt64()
  693. case "server.app_id_from":
  694. traceRoot.AppIdFrom = attr.Value.AsInt64()
  695. case "server.span_id_from":
  696. traceRoot.SpanIdFrom = attr.Value.AsString()
  697. case "server.type_from":
  698. traceRoot.TypeFrom = attr.Value.AsString()
  699. case "time.start_at":
  700. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  701. traceRoot.CollTime = mNode.StartTimeMs
  702. case "time.end_at":
  703. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  704. case "time.duration":
  705. traceRoot.RespTime = uint64(attr.Value.AsInt64()) / 1e3
  706. mNode.PureTime = traceRoot.RespTime
  707. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  708. case "server.code_type":
  709. code_type = attr.Value.AsInt64()
  710. case "server.app_name":
  711. traceRoot.AppName = attr.Value.AsString()
  712. case "server.service_name":
  713. traceRoot.ServiceName = attr.Value.AsString()
  714. mNode.ServiceName = attr.Value.AsString()
  715. case "server.app_id":
  716. traceRoot.AppId = attr.Value.AsInt64()
  717. case "server.agent_id":
  718. traceRoot.AgentId = attr.Value.AsInt64()
  719. case "server.instance_id":
  720. traceRoot.InstanceId = attr.Value.AsInt64()
  721. case "server.src_addr":
  722. traceRoot.SrcAddr = attr.Value.AsString()
  723. case "server.dst_addr":
  724. traceRoot.DestinationAddr = attr.Value.AsString()
  725. }
  726. }
  727. traceRoot.Maps = append(traceRoot.Maps, mNode)
  728. return int(code_type)
  729. }
  730. //func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
  731. // mNode.ServiceName = HTTP_SERVICE_NAME
  732. // mNode.ServiceType = HTTP_SERVICE_TYPE
  733. // mNode.Schema = "http"
  734. // mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
  735. // var descAddr string
  736. // for _, attr := range sd.Attributes() {
  737. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  738. // switch attr.Key {
  739. // case "http.ip":
  740. // mNode.Ip = attr.Value.AsString()
  741. // descAddr += mNode.Ip
  742. // case "http.port":
  743. // mNode.Port = attr.Value.AsInt64()
  744. // descAddr += ":" + attr.Value.AsString()
  745. // case "http.uri":
  746. // mNode.Uri = attr.Value.AsString()
  747. // case "http.assumed_app_id":
  748. // mNode.AssumedAppId = attr.Value.AsInt64()
  749. // case "http.span_id":
  750. // mNode.SpanId = attr.Value.AsString()
  751. // }
  752. // }
  753. // //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  754. //}
  755. func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
  756. mNode.ServiceName = HTTP_SERVICE_NAME
  757. mNode.ServiceType = HTTP_SERVICE_TYPE
  758. mNode.Schema = "http"
  759. //mNode.MethodName = "HTTP"
  760. //var descAddr string
  761. var method string
  762. for _, attr := range event.Attributes {
  763. klog.Debugln("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
  764. switch attr.Key {
  765. case "http.ip":
  766. mNode.Ip = attr.Value.AsString()
  767. //descAddr += mNode.Ip
  768. case "http.port":
  769. mNode.Port = attr.Value.AsInt64()
  770. case "http.method":
  771. //mNode.MethodName += " " + attr.Value.AsString()
  772. method = attr.Value.AsString()
  773. //descAddr += ":" + attr.Value.AsString()
  774. case "http.uri":
  775. mNode.Uri = attr.Value.AsString()
  776. //mNode.MethodName += " " + attr.Value.AsString()
  777. case "http.assumed_app_id":
  778. mNode.AssumedAppId = attr.Value.AsInt64()
  779. case "http.span_id":
  780. mNode.SpanId = attr.Value.AsString()
  781. case "time.start_at":
  782. mNode.StartTime = uint64(attr.Value.AsInt64())
  783. case "time.end_at":
  784. mNode.EndTime = uint64(attr.Value.AsInt64())
  785. case "time.duration":
  786. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  787. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  788. case "http.src_addr":
  789. mNode.SrcAddr = attr.Value.AsString()
  790. case "http.destination_addr":
  791. mNode.DestinationAddr = attr.Value.AsString()
  792. }
  793. }
  794. mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri)
  795. //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  796. }
  797. //func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
  798. // mNode.Dbn = "unknown"
  799. // mNode.ServiceName = MYSQL_SERVICE_NAME
  800. // mNode.ServiceType = SQL_SERVICE_TYPE
  801. // mNode.MethodName = "database/sql.Query()"
  802. // for _, attr := range sd.Attributes() {
  803. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  804. // switch attr.Key {
  805. // case "net.peer.name":
  806. // mNode.Ip = attr.Value.AsString()
  807. // case "net.peer.port":
  808. // mNode.Port = attr.Value.AsInt64()
  809. // case "db.statement":
  810. // query := attr.Value.AsString()
  811. // mNode.Ps = []string{query}
  812. // words := strings.Fields(query)
  813. // if len(words) > 0 {
  814. // mNode.OperType = strings.ToUpper(words[0])
  815. // }
  816. // }
  817. // }
  818. //}
  819. func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  820. mNode.Dbn = "-"
  821. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  822. mNode.ServiceType = SQL_SERVICE_TYPE
  823. //mNode.MethodName = "database/sql.Query()"
  824. for _, attr := range event.Attributes {
  825. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  826. switch attr.Key {
  827. case "net.peer.name":
  828. mNode.Ip = attr.Value.AsString()
  829. case "net.peer.port":
  830. mNode.Port = attr.Value.AsInt64()
  831. case "db.statement":
  832. query := attr.Value.AsString()
  833. mNode.MethodName = query
  834. mNode.Ps = []string{query}
  835. //words := strings.Fields(query)
  836. //if len(words) > 0 {
  837. // mNode.OperType = strings.ToUpper(words[0])
  838. //}
  839. case "sql.exception":
  840. if attr.Value.AsBool() {
  841. mNode.Exception = 1
  842. } else {
  843. mNode.Exception = 0
  844. }
  845. case "sql.src_addr":
  846. mNode.SrcAddr = attr.Value.AsString()
  847. case "sql.destination_addr":
  848. mNode.DestinationAddr = attr.Value.AsString()
  849. }
  850. }
  851. }
  852. func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  853. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  854. mNode.ServiceType = NET_SERVICE_TYPE
  855. var _type string
  856. var fqdn string
  857. var ips string
  858. var ttl int64
  859. for _, attr := range event.Attributes {
  860. switch attr.Key {
  861. case "dns.type":
  862. _type = attr.Value.AsString()
  863. case "dns.fqdn":
  864. fqdn = attr.Value.AsString()
  865. case "dns.ttl":
  866. ttl = attr.Value.AsInt64()
  867. case "dns.ips":
  868. if attr.Value.AsString() != "" {
  869. ips = "Addr: " + attr.Value.AsString()
  870. }
  871. }
  872. }
  873. mNode.MethodName = fmt.Sprintf("DNS Name: %s Type: %s TTL: %d %s", fqdn, _type, ttl, ips)
  874. }
  875. func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  876. mNode.Dbn = "-"
  877. mNode.ServiceName = POSTGRESQL_SERVICE_NAME
  878. mNode.ServiceType = SQL_SERVICE_TYPE
  879. //mNode.MethodName = "database/sql.Query()"
  880. for _, attr := range event.Attributes {
  881. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  882. switch attr.Key {
  883. case "net.peer.name":
  884. mNode.Ip = attr.Value.AsString()
  885. case "net.peer.port":
  886. mNode.Port = attr.Value.AsInt64()
  887. case "db.statement":
  888. query := attr.Value.AsString()
  889. mNode.Ps = []string{query}
  890. //words := strings.Fields(query)
  891. //if len(words) > 0 {
  892. // mNode.OperType = strings.ToUpper(words[0])
  893. //}
  894. case "sql.exception":
  895. if attr.Value.AsBool() {
  896. mNode.Exception = 1
  897. } else {
  898. mNode.Exception = 0
  899. }
  900. case "sql.src_addr":
  901. mNode.SrcAddr = attr.Value.AsString()
  902. case "sql.destination_addr":
  903. mNode.DestinationAddr = attr.Value.AsString()
  904. }
  905. }
  906. }
  907. func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  908. mNode.Dbn = "-"
  909. mNode.ServiceName = MYSQL_SERVICE_NAME
  910. mNode.ServiceType = SQL_SERVICE_TYPE
  911. //mNode.MethodName = "database/sql.Query()"
  912. for _, attr := range event.Attributes {
  913. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  914. switch attr.Key {
  915. case "net.peer.name":
  916. mNode.Ip = attr.Value.AsString()
  917. case "net.peer.port":
  918. mNode.Port = attr.Value.AsInt64()
  919. case "db.statement":
  920. query := attr.Value.AsString()
  921. mNode.MethodName = query
  922. mNode.Ps = []string{query}
  923. //words := strings.Fields(query)
  924. //if len(words) > 0 {
  925. // mNode.OperType = strings.ToUpper(words[0])
  926. //}
  927. case "sql.exception":
  928. if attr.Value.AsBool() {
  929. mNode.Exception = 1
  930. } else {
  931. mNode.Exception = 0
  932. }
  933. case "sql.src_addr":
  934. mNode.SrcAddr = attr.Value.AsString()
  935. case "sql.destination_addr":
  936. mNode.DestinationAddr = attr.Value.AsString()
  937. }
  938. }
  939. }
  940. func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  941. mNode.Dbn = "TEST"
  942. mNode.ServiceName = DM_SERVICE_NAME
  943. mNode.ServiceType = SQL_SERVICE_TYPE
  944. mNode.MethodName = "database/sql.Query()"
  945. for _, attr := range event.Attributes {
  946. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  947. switch attr.Key {
  948. case "net.peer.name":
  949. mNode.Ip = attr.Value.AsString()
  950. case "net.peer.port":
  951. mNode.Port = attr.Value.AsInt64()
  952. case "db.statement":
  953. query := attr.Value.AsString()
  954. mNode.Ps = []string{query}
  955. //words := strings.Fields(query)
  956. //if len(words) > 0 {
  957. // mNode.OperType = strings.ToUpper(words[0])
  958. //}
  959. case "sql.exception":
  960. if attr.Value.AsBool() {
  961. mNode.Exception = 1
  962. } else {
  963. mNode.Exception = 0
  964. }
  965. case "sql.src_addr":
  966. mNode.SrcAddr = attr.Value.AsString()
  967. case "sql.destination_addr":
  968. mNode.DestinationAddr = attr.Value.AsString()
  969. }
  970. }
  971. }
  972. func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  973. mNode.ServiceName = REDIS_SERVICE_NAME
  974. mNode.ServiceType = NOSQL_SERVICE_TYPE
  975. //mNode.MethodName = span(sd).Name + " query"
  976. //mNode.MethodName = "redis.Do()"
  977. for _, attr := range event.Attributes {
  978. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  979. switch attr.Key {
  980. case "net.peer.name":
  981. mNode.Ip = attr.Value.AsString()
  982. case "net.peer.port":
  983. mNode.Port = attr.Value.AsInt64()
  984. case "db.statement":
  985. query := attr.Value.AsString()
  986. mNode.MethodName = query
  987. mNode.Ps = []string{query}
  988. //words := strings.Fields(query)
  989. //if len(words) > 0 {
  990. // mNode.OperType = strings.ToUpper(words[0])
  991. //}
  992. case "nosql.src_addr":
  993. mNode.SrcAddr = attr.Value.AsString()
  994. case "nosql.destination_addr":
  995. mNode.DestinationAddr = attr.Value.AsString()
  996. }
  997. }
  998. }
  999. func buildMongoMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  1000. mNode.ServiceName = MONGO_SERVICE_NAME
  1001. mNode.ServiceType = NOSQL_SERVICE_TYPE
  1002. for _, attr := range event.Attributes {
  1003. switch attr.Key {
  1004. case "net.peer.name":
  1005. mNode.Ip = attr.Value.AsString()
  1006. case "net.peer.port":
  1007. mNode.Port = attr.Value.AsInt64()
  1008. case "db.statement":
  1009. query := attr.Value.AsString()
  1010. mNode.MethodName = query
  1011. mNode.Ps = []string{query}
  1012. case "nosql.src_addr":
  1013. mNode.SrcAddr = attr.Value.AsString()
  1014. case "nosql.destination_addr":
  1015. mNode.DestinationAddr = attr.Value.AsString()
  1016. }
  1017. }
  1018. }
  1019. func isEnter(_type string) bool {
  1020. if _type == "APPLICATION" {
  1021. return true
  1022. }
  1023. return false
  1024. }
  1025. func span(sd apmTraceSpan) *tracepb.Span {
  1026. if sd == nil {
  1027. return nil
  1028. }
  1029. tid := sd.SpanContext().TraceID()
  1030. sid := sd.SpanContext().SpanID()
  1031. s := &tracepb.Span{
  1032. TraceId: tid[:],
  1033. SpanId: sid[:],
  1034. TraceState: sd.SpanContext().TraceState().String(),
  1035. //Status: status(sd.Status().Code, sd.Status().Description),
  1036. StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
  1037. EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
  1038. //Links: links(sd.Links()),
  1039. //Kind: spanKind(sd.SpanKind()),
  1040. Name: sd.Name(),
  1041. Attributes: tracetransform.KeyValues(sd.Attributes()),
  1042. //Events: spanEvents(sd.Events()),
  1043. DroppedAttributesCount: uint32(sd.DroppedAttributes()),
  1044. DroppedEventsCount: uint32(sd.DroppedEvents()),
  1045. DroppedLinksCount: uint32(sd.DroppedLinks()),
  1046. }
  1047. if psid := sd.Parent().SpanID(); psid.IsValid() {
  1048. s.ParentSpanId = psid[:]
  1049. }
  1050. return s
  1051. }
  1052. func Md5ToInt64(strParam string, Len int) int64 {
  1053. sign := md5.Sum([]byte(strParam))
  1054. signStr := fmt.Sprintf("%x", sign)
  1055. charArr := []rune(signStr)
  1056. var intStr string
  1057. for _, value := range charArr {
  1058. intStr += strconv.Itoa(int(value))
  1059. }
  1060. intStr = intStr[:Len]
  1061. int64Data, err := strconv.ParseInt(intStr, 10, 64)
  1062. if err != nil {
  1063. return 0
  1064. }
  1065. return int64Data
  1066. }
  1067. // ns,ms
  1068. func cleanNsTime(time int64) (uint64, uint64) {
  1069. return uint64(time), uint64(math.Round(float64(time) / 1e6))
  1070. }