apm_exporter.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112
  1. package otlptrace
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. . "github.com/coroot/coroot-node-agent/ebpftracer"
  7. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. klog "github.com/sirupsen/logrus"
  10. "math"
  11. "net/url"
  12. "sort"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
  17. tracesdk "go.opentelemetry.io/otel/sdk/trace"
  18. tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
  19. )
  20. const (
  21. APP_SERVICE_TYPE = "APPLICATION"
  22. SQL_SERVICE_TYPE = "SQL"
  23. NOSQL_SERVICE_TYPE = "NOSQL"
  24. HTTP_SERVICE_TYPE = "HTTP"
  25. NET_SERVICE_TYPE = "L7_NET"
  26. )
  27. const (
  28. GO_SERVICE_NAME = "GO"
  29. MYSQL_SERVICE_NAME = "MYSQL"
  30. DM_SERVICE_NAME = "DM"
  31. REDIS_SERVICE_NAME = "REDIS"
  32. HTTP_SERVICE_NAME = "HTTPCLIENT"
  33. POSTGRESQL_SERVICE_NAME = "POSTGRESQL"
  34. )
  35. type apmTraceSpan tracesdk.ReadOnlySpan
  36. // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
  37. type RootDataT struct {
  38. AccountId int `json:"account_id"`
  39. AgentId int64 `json:"agent_id"`
  40. AgentVersion string `json:"agent_version"`
  41. AppId int64 `json:"app_id"`
  42. AppIdFrom int64 `json:"app_id_from"` // from header app_id
  43. AppName string `json:"app_name"`
  44. CalledId int64 `json:"called_id"` // from header assumed_app_id
  45. ClientIp string `json:"client_ip"`
  46. CollTime uint64 `json:"coll_time"`
  47. Cpu int `json:"cpu"`
  48. Custom string `json:"custom"`
  49. HostId int64 `json:"host_id"`
  50. HostName string `json:"host_name"`
  51. HttpCode int64 `json:"http_code"`
  52. HttpMethod string `json:"http_method"`
  53. InstanceId int64 `json:"instance_id"`
  54. InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
  55. LocalPort int64 `json:"local_port"`
  56. Maps []MapInfoT `json:"maps"`
  57. MemU int `json:"mem_u"`
  58. MemUP int `json:"mem_u_p"`
  59. OperType string `json:"oper_type"`
  60. Parameters []ParamStruct `json:"parameters,omitempty"`
  61. ParentTaskName int `json:"parent_task_name"`
  62. Period int `json:"period"`
  63. RespTime uint64 `json:"resp_time"`
  64. Sampling int `json:"sampling"`
  65. ServiceName string `json:"service_name"`
  66. ServiceType string `json:"service_type"`
  67. Sip string `json:"sip"`
  68. Sn string `json:"sn"`
  69. SpanIdFrom string `json:"span_id_from"` // from header span_id
  70. Sport int64 `json:"sport"`
  71. TId int `json:"t_id"`
  72. TName string `json:"t_name"`
  73. TraceId string `json:"trace_id"` // from header trace_id
  74. TransIds []interface{} `json:"trans_ids"`
  75. TypeFrom string `json:"type_from"`
  76. Uri string `json:"uri"`
  77. UserDir int `json:"user_dir"`
  78. VipIds []interface{} `json:"vip_ids"`
  79. SrcAddr string `json:"src_addr"`
  80. DestinationAddr string `json:"destination_addr"`
  81. }
  82. // ParamStruct 定义目标结构
  83. type ParamStruct struct {
  84. Name string `json:"name"`
  85. Values []string `json:"values"`
  86. }
  87. type MapInfoT struct {
  88. Dbn string `json:"dbn,omitempty"`
  89. Exception int `json:"exception,omitempty"`
  90. ExceptionMsg string `json:"exception_msg,omitempty"`
  91. ExceptionStack string `json:"exception_stack,omitempty"`
  92. Ip string `json:"ip,omitempty"`
  93. Level int `json:"level"`
  94. MethodDesc string `json:"method_desc,omitempty"`
  95. MethodName string `json:"method_name"`
  96. Nid int `json:"nid"`
  97. OperType string `json:"oper_type,omitempty"`
  98. Pid int `json:"pid"`
  99. Port int64 `json:"port,omitempty"`
  100. Ps []string `json:"ps,omitempty"`
  101. PureTime uint64 `json:"pure_time"`
  102. ServiceName string `json:"service_name"`
  103. ServiceType string `json:"service_type"`
  104. StartTime uint64 `json:"-"`
  105. EndTime uint64 `json:"-"`
  106. StartTimeMs uint64 `json:"start_time"`
  107. EndTimeMs uint64 `json:"end_time"`
  108. WallTime uint64 `json:"wall_time"`
  109. Schema string `json:"schema,omitempty"`
  110. AssumedAppId int64 `json:"assumed_app_id,omitempty"`
  111. Uri string `json:"uri,omitempty"`
  112. SpanId string `json:"span_id,omitempty"`
  113. SrcAddr string `json:"src_addr,omitempty"`
  114. DestinationAddr string `json:"destination_addr,omitempty"`
  115. }
  116. type TraceMapT struct {
  117. RootData RootDataT
  118. Index int
  119. lock *sync.RWMutex
  120. TheEnd bool
  121. }
  122. var TraceRootMap map[string]*TraceMapT
  123. func init() {
  124. TraceRootMap = make(map[string]*TraceMapT)
  125. //go func() {
  126. // for {
  127. // //fmt.Println(G_sdl)
  128. // time.Sleep(5 * time.Second)
  129. // }
  130. //}()
  131. }
  132. var G_sdl int
  133. func tracetransformData(sdl []tracesdk.ReadOnlySpan) map[int][]RootDataT {
  134. //G_sdl += len(sdl)
  135. if len(sdl) == 0 {
  136. return nil
  137. }
  138. // 多次请求 sdl
  139. sendDataMap := make(map[int][]RootDataT)
  140. //sendData := []RootDataT{}
  141. for _, sd := range sdl {
  142. if sd == nil {
  143. continue
  144. }
  145. //traceId := sd.SpanContext().TraceID().String()
  146. //fmt.Println("------event_num---- "+sd.Name(), "--->", len(sd.Events())) // 一次请求完整数据
  147. // 构建map *RootDataT
  148. var rootData RootDataT
  149. rootData = initRootDataFromEvent()
  150. // build http入口 MapInfoT
  151. code_type := buildAppMapFromEvent(&rootData, sd)
  152. // 构建maps
  153. for _, event := range sd.Events() {
  154. //aaa, _ := json.Marshal(event)
  155. //fmt.Println("event.info", string(aaa))
  156. mNode := buildMapNodeFromEvent(event)
  157. switch EventType(event.EventType) {
  158. // stack
  159. case EventTypeFunEnt:
  160. // l7 event
  161. case EventTypeL7Request:
  162. switch l7.Protocol(event.ProtocolType) {
  163. case l7.ProtocolHTTP:
  164. buildHttpMapFromEvent(&mNode, event)
  165. case l7.ProtocolDNS:
  166. buildDNSMapEvent(&mNode, event)
  167. case l7.ProtocolMysql:
  168. buildSQLMapEvent(&mNode, event)
  169. case l7.ProtocolRedis:
  170. buildRedisMapEvent(&mNode, event)
  171. // dm
  172. case l7.ProtocolDM:
  173. buildSQLMapEvent(&mNode, event)
  174. case l7.ProtocolPostgres:
  175. buildSQLMapEvent(&mNode, event)
  176. }
  177. }
  178. rootData.Maps = append(rootData.Maps, mNode)
  179. //fmt.Println(event.Name)
  180. //buildAndAssemblyMapFromEvent(event, rootData)
  181. }
  182. buildLevelFromEvent(&rootData)
  183. sendDataMap[code_type] = append(sendDataMap[code_type], rootData)
  184. //a, _ := json.Marshal(rootData)
  185. //fmt.Println(string(a))
  186. //sendData = append(sendData, rootData)
  187. //if _, ok := TraceRootMap[traceId]; !ok {
  188. //TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
  189. //}
  190. //TraceRootMap[traceId].Index++
  191. //buildAndAssemblyMap(sd, TraceRootMap[traceId])
  192. }
  193. // 发送完整数据 | 大量长耗时请求会增加内存占用
  194. //sendData := []RootDataT{}
  195. //for traceId, v := range TraceRootMap {
  196. // if v.TheEnd {
  197. // buildLevel(v)
  198. // sendData = append(sendData, v.RootData)
  199. // delete(TraceRootMap, traceId)
  200. // //fmt.Println("the end!")
  201. // } else {
  202. // //fmt.Println("not end!")
  203. // }
  204. //}
  205. //Transform the categorized map into a slice
  206. data, _ := json.Marshal(sendDataMap)
  207. klog.Debug(string(data))
  208. //fmt.Println(len(sendData))
  209. //fmt.Println("sdl len:", len(sdl))
  210. return sendDataMap
  211. }
  212. type TimeMap struct {
  213. Time uint64
  214. Type int
  215. Map *MapInfoT
  216. }
  217. //type TraceMapT struct {
  218. // RootData RootDataT
  219. // Index int
  220. // lock *sync.RWMutex
  221. // TheEnd bool
  222. //}
  223. //func buildLevel(sdl *TraceMapT) {
  224. // nidMap := make(map[int]*MapInfoT)
  225. //
  226. // mapSlice := []TimeMap{}
  227. //
  228. // for i, v := range sdl.RootData.Maps {
  229. // if v.ServiceType == "APPLICATION" {
  230. // continue
  231. // }
  232. // nidMap[v.Nid] = &sdl.RootData.Maps[i]
  233. // timeStartMap := TimeMap{
  234. // Time: v.StartTime,
  235. // Type: 0,
  236. // Map: &sdl.RootData.Maps[i],
  237. // }
  238. // mapSlice = append(mapSlice, timeStartMap)
  239. // timeEndMap := TimeMap{
  240. // Time: v.EndTime,
  241. // Type: 1,
  242. // Map: &sdl.RootData.Maps[i],
  243. // }
  244. // mapSlice = append(mapSlice, timeEndMap)
  245. // }
  246. // sort.Slice(mapSlice, func(i, j int) bool {
  247. // return mapSlice[i].Time < mapSlice[j].Time
  248. // })
  249. //
  250. // funStack := []TimeMap{}
  251. //
  252. // currentNid := 1
  253. // Nid := 2
  254. // level := 2
  255. //
  256. // for _, v := range mapSlice {
  257. // // fmt.Println("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  258. // if v.Type == 0 {
  259. // // 函数入口
  260. // funStack = append(funStack, v)
  261. // v.Map.Pid = currentNid
  262. // v.Map.Level = level
  263. // v.Map.Nid = Nid
  264. // currentNid = Nid
  265. // level += 1
  266. // Nid += 1
  267. // } else if v.Type == 1 {
  268. // // 函数出口
  269. // len := len(funStack)
  270. // funStack = funStack[:len-1]
  271. // if (len - 2) < 0 {
  272. // currentNid = 1
  273. // } else {
  274. // currentNid = funStack[len-2].Map.Nid
  275. // }
  276. //
  277. // level -= 1
  278. // }
  279. // }
  280. //}
  281. func buildLevelFromEvent(sdl *RootDataT) {
  282. nidMap := make(map[int]*MapInfoT)
  283. mapSlice := []TimeMap{}
  284. for i, v := range sdl.Maps {
  285. if v.ServiceType == "APPLICATION" {
  286. continue
  287. }
  288. nidMap[v.Nid] = &sdl.Maps[i]
  289. timeStartMap := TimeMap{
  290. Time: v.StartTime,
  291. Type: 0,
  292. Map: &sdl.Maps[i],
  293. }
  294. mapSlice = append(mapSlice, timeStartMap)
  295. timeEndMap := TimeMap{
  296. Time: v.EndTime,
  297. Type: 1,
  298. Map: &sdl.Maps[i],
  299. }
  300. mapSlice = append(mapSlice, timeEndMap)
  301. }
  302. sort.Slice(mapSlice, func(i, j int) bool {
  303. return mapSlice[i].Time < mapSlice[j].Time
  304. })
  305. funStack := []TimeMap{}
  306. currentNid := 1
  307. Nid := 2
  308. level := 2
  309. for _, v := range mapSlice {
  310. //klog.Debugln("SliceSliceindex", k, "value", v.Time, v.Type, v.Map.MethodName, v.Map.Nid)
  311. if v.Type == 0 {
  312. // 函数入口
  313. funStack = append(funStack, v)
  314. v.Map.Pid = currentNid
  315. v.Map.Level = level
  316. v.Map.Nid = Nid
  317. currentNid = Nid
  318. level += 1
  319. Nid += 1
  320. } else if v.Type == 1 {
  321. // 函数出口
  322. len := len(funStack)
  323. funStack = funStack[:len-1]
  324. if (len - 2) < 0 {
  325. currentNid = 1
  326. } else {
  327. currentNid = funStack[len-2].Map.Nid
  328. }
  329. level -= 1
  330. }
  331. }
  332. }
  333. //func initRootData(traceId string) RootDataT {
  334. // data := RootDataT{
  335. // AccountId: 110,
  336. // AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
  337. // AgentVersion: "2.1.0",
  338. // AppId: 5410049101545798, // TODO 更新 基于appname生成
  339. // AppIdFrom: -1,
  340. // AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
  341. // CalledId: -1,
  342. // ClientIp: "",
  343. // CollTime: 0,
  344. // Cpu: 0,
  345. // Custom: "",
  346. // HostId: 10154813500555812,
  347. // HostName: "localhost",
  348. // HttpCode: 0,
  349. // HttpMethod: "",
  350. // InstanceId: 1005051101515357, // TODO 更新 基于ip:port
  351. // InstanceIdFrom: -1,
  352. // Maps: []MapInfoT{},
  353. // MemU: 0,
  354. // MemUP: 0,
  355. // OperType: "",
  356. // Parameters: []interface{}{},
  357. // ParentTaskName: 0,
  358. // Period: -1,
  359. // RespTime: 0,
  360. // Sampling: 0,
  361. // ServiceName: "GO",
  362. // ServiceType: APP_SERVICE_TYPE,
  363. // Sip: "",
  364. // Sn: "",
  365. // SpanIdFrom: "",
  366. // Sport: 0,
  367. // TId: -1,
  368. // TName: "",
  369. // TraceId: traceId,
  370. // TransIds: []interface{}{},
  371. // TypeFrom: "",
  372. // Uri: "",
  373. // UserDir: 0,
  374. // VipIds: []interface{}{},
  375. // }
  376. // return data
  377. //}
  378. func initRootDataFromEvent() RootDataT {
  379. hostID := utils.GetHostID()
  380. accountID := utils.GetAccountID()
  381. data := RootDataT{
  382. // todo AccountId
  383. AccountId: accountID,
  384. AgentId: 0, // 基于 ip:port + process_name + exe路径生成
  385. AgentVersion: "2.1.0",
  386. AppId: 0, // 基于appname生成
  387. AppIdFrom: -1,
  388. AppName: "eBPF-agent", // server配置
  389. CalledId: -1,
  390. ClientIp: "",
  391. CollTime: 0,
  392. Cpu: 0,
  393. Custom: "",
  394. HostId: hostID,
  395. HostName: "localhost",
  396. HttpCode: 0,
  397. HttpMethod: "",
  398. InstanceId: 0, // 基于ip:port
  399. InstanceIdFrom: -1,
  400. Maps: []MapInfoT{},
  401. MemU: 0,
  402. MemUP: 0,
  403. OperType: "",
  404. Parameters: []ParamStruct{},
  405. ParentTaskName: 0,
  406. Period: -1,
  407. RespTime: 0,
  408. Sampling: 0,
  409. ServiceName: "",
  410. ServiceType: APP_SERVICE_TYPE,
  411. Sip: "",
  412. Sn: "",
  413. SpanIdFrom: "",
  414. Sport: 0,
  415. TId: -1,
  416. TName: "",
  417. TraceId: "",
  418. TransIds: []interface{}{},
  419. TypeFrom: "",
  420. Uri: "",
  421. UserDir: 0,
  422. VipIds: []interface{}{},
  423. SrcAddr: "",
  424. DestinationAddr: "",
  425. }
  426. return data
  427. }
  428. func initRootDataJava() RootDataT {
  429. data := RootDataT{
  430. AccountId: 110,
  431. AgentId: 3934815089541000, // TODO 更新 基于 ip:port + process_name + exe路径生成
  432. AgentVersion: "2.21.0",
  433. AppId: 3365853273187618, // TODO 更新 基于appname生成
  434. AppIdFrom: -1,
  435. AppName: "eBPF-javaApplication", // TODO 更新 ip:port || process_name
  436. CalledId: -1,
  437. ClientIp: "",
  438. CollTime: 0,
  439. Cpu: 0,
  440. Custom: "",
  441. HostId: 2315065183171055,
  442. HostName: "localhost",
  443. HttpCode: 0,
  444. HttpMethod: "",
  445. InstanceId: 1128864082033413, // TODO 更新 基于ip:port
  446. InstanceIdFrom: -1,
  447. Maps: []MapInfoT{},
  448. MemU: 0,
  449. MemUP: 0,
  450. OperType: "",
  451. Parameters: []ParamStruct{},
  452. ParentTaskName: 0,
  453. Period: -1,
  454. RespTime: 0,
  455. Sampling: 0,
  456. ServiceName: "TOMCAT",
  457. ServiceType: APP_SERVICE_TYPE,
  458. Sip: "",
  459. Sn: "",
  460. SpanIdFrom: "",
  461. Sport: 0,
  462. TId: -1,
  463. TName: "",
  464. TraceId: "",
  465. TransIds: []interface{}{},
  466. TypeFrom: "",
  467. Uri: "",
  468. UserDir: 0,
  469. VipIds: []interface{}{},
  470. }
  471. return data
  472. }
  473. //func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
  474. // mNode := MapInfoT{
  475. // Exception: 0,
  476. // ExceptionMsg: "",
  477. // ExceptionStack: "",
  478. // Ip: "",
  479. // Level: 2,
  480. // Pid: 1,
  481. // Port: 0,
  482. // Ps: []string{},
  483. // ServiceName: "",
  484. // ServiceType: "",
  485. // WallTime: 0,
  486. // }
  487. // mNode.MethodName = spanSd.Name
  488. // mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
  489. // mNode.WallTime = mNode.PureTime
  490. // mNode.StartTime = spanSd.StartTimeUnixNano
  491. // mNode.EndTime = spanSd.EndTimeUnixNano
  492. //
  493. // for _, attr := range spanSd.GetAttributes() {
  494. // fmt.Println(attr.Key, ":", attr.Value.GetValue())
  495. //
  496. // switch attr.Key {
  497. // case "nid":
  498. // mNode.Nid = int(attr.Value.GetIntValue())
  499. // case "pid":
  500. // mNode.Pid = int(attr.Value.GetIntValue())
  501. // case "level":
  502. // mNode.Level = int(attr.Value.GetIntValue())
  503. // }
  504. // }
  505. //
  506. // return mNode, spanSd.Name
  507. //}
  508. func buildMapNodeFromEvent(event tracesdk.Event) MapInfoT {
  509. mNode := MapInfoT{
  510. Exception: 0,
  511. ExceptionMsg: "",
  512. ExceptionStack: "",
  513. Ip: "",
  514. Level: 2,
  515. Pid: 1,
  516. Port: 0,
  517. Ps: []string{},
  518. ServiceName: "",
  519. ServiceType: "",
  520. WallTime: 0,
  521. }
  522. mNode.MethodName = event.Name
  523. //mNode.PureTime = (event.EndTimeUnixNano - event.StartTimeUnixNano) / 1e3
  524. //mNode.WallTime = mNode.PureTime
  525. //mNode.StartTime = spanSd.StartTimeUnixNano
  526. //mNode.EndTime = spanSd.EndTimeUnixNano
  527. for _, attr := range event.Attributes {
  528. //fmt.Println(event.Name, "--->buildMapNodeFromEvent--->", attr.Key, ":", attr.Value.AsInterface())
  529. switch attr.Key {
  530. case "nid":
  531. mNode.Nid = int(attr.Value.AsInt64())
  532. case "pid":
  533. mNode.Pid = int(attr.Value.AsInt64())
  534. case "level":
  535. mNode.Level = int(attr.Value.AsInt64())
  536. case "time.start_at":
  537. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  538. case "time.end_at":
  539. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  540. case "time.duration":
  541. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  542. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  543. }
  544. }
  545. return mNode
  546. }
  547. func parseURIToParams(input string) (string, []ParamStruct, error) {
  548. // 解析输入 URI
  549. parsedURL, err := url.Parse(input)
  550. if err != nil {
  551. return "", nil, fmt.Errorf("failed to parse URI: %w", err)
  552. }
  553. // 提取查询参数
  554. queryParams := parsedURL.Query()
  555. // 转换为目标结构
  556. var params []ParamStruct
  557. for key, values := range queryParams {
  558. params = append(params, ParamStruct{
  559. Name: key,
  560. Values: values,
  561. })
  562. }
  563. return parsedURL.Path, params, nil
  564. }
  565. // 构建拼装
  566. //func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
  567. // mNode, mapType := initMapNode(span(sd))
  568. // switch mapType {
  569. // case "APPLICATION":
  570. // buildAppMap(&mNode, traceRoot, sd)
  571. // traceRoot.TheEnd = true
  572. // case "HTTP":
  573. // buildHttpMap(&mNode, sd)
  574. // case "Mysql":
  575. // buildMysqlMap(&mNode, sd)
  576. // case "Redis":
  577. // buildRedisMap(&mNode, sd)
  578. // }
  579. // if mapType != "" {
  580. // mNode.Nid = traceRoot.Index
  581. // traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
  582. // }
  583. // return mNode
  584. //}
  585. //func buildAndAssemblyMapFromEvent(event tracesdk.Event, traceRoot *RootDataT) MapInfoT {
  586. // mNode := buildMapNodeFromEvent(event)
  587. // switch mapType {
  588. // case "HTTP":
  589. // buildHttpMapFromEvent(mNode, event)
  590. // //case "Mysql":
  591. // // buildMysqlMap(mNode, sd)
  592. // //case "Redis":
  593. // // buildRedisMap(mNode, sd)
  594. // }
  595. // if mapType != "" {
  596. // //mNode.Nid = traceRoot.Index
  597. // traceRoot.Maps = append(traceRoot.Maps, mNode)
  598. // }
  599. // return mNode
  600. //}
  601. //func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
  602. // mNode.ServiceName = GO_SERVICE_NAME
  603. // mNode.ServiceType = APP_SERVICE_TYPE
  604. // mNode.MethodName = "net/http.(*Transport).roundTrip()"
  605. // mNode.Level = 1
  606. // mNode.Pid = 0
  607. // mNode.Nid = 1
  608. // // 构建root节点
  609. // traceRoot.RootData.RespTime = mNode.PureTime
  610. // traceRoot.RootData.CollTime = mNode.StartTime
  611. // traceRoot.Index = 1
  612. // for _, attr := range sd.Attributes() {
  613. // fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  614. // switch attr.Key {
  615. // case "http.uri":
  616. // traceRoot.RootData.Uri = attr.Value.AsString()
  617. // case "http.method":
  618. // traceRoot.RootData.HttpMethod = attr.Value.AsString()
  619. // case "http.status_code":
  620. // traceRoot.RootData.HttpCode = attr.Value.AsInt64()
  621. // case "net.peer.name":
  622. // traceRoot.RootData.ClientIp = attr.Value.AsString()
  623. // traceRoot.RootData.Sip = attr.Value.AsString()
  624. // traceRoot.RootData.Sn = attr.Value.AsString()
  625. // case "net.peer.port":
  626. // traceRoot.RootData.Sport = attr.Value.AsInt64()
  627. // traceRoot.RootData.LocalPort = attr.Value.AsInt64()
  628. // case "server.trace_id_from":
  629. // traceRoot.RootData.TraceId = attr.Value.AsString()
  630. // case "server.called_id":
  631. // traceRoot.RootData.CalledId = attr.Value.AsInt64()
  632. // case "server.instance_id_from":
  633. // traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
  634. // case "server.app_id_from":
  635. // traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
  636. // case "server.span_id_from":
  637. // traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
  638. // case "server.type_from":
  639. // traceRoot.RootData.TypeFrom = attr.Value.AsString()
  640. // }
  641. // }
  642. //
  643. //}
  644. func buildAppMapFromEvent(traceRoot *RootDataT, sd apmTraceSpan) int {
  645. mNode := MapInfoT{
  646. Exception: 0,
  647. ExceptionMsg: "",
  648. ExceptionStack: "",
  649. Ip: "",
  650. Level: 1,
  651. Pid: 1,
  652. Port: 0,
  653. Ps: []string{},
  654. ServiceName: "",
  655. ServiceType: "",
  656. WallTime: 0,
  657. }
  658. mNode.ServiceName = GO_SERVICE_NAME
  659. mNode.ServiceType = APP_SERVICE_TYPE
  660. mNode.MethodName = "Kernel Endpoint()"
  661. mNode.Level = 1
  662. mNode.Pid = 0
  663. mNode.Nid = 1
  664. var code_type int64
  665. // 构建root节点
  666. //traceRoot.RespTime = mNode.PureTimex
  667. //traceRoot.CollTime = mNode.StartTime
  668. for _, attr := range sd.Attributes() {
  669. klog.Debugln("Appmap:", attr.Key, ":", attr.Value.AsInterface())
  670. switch attr.Key {
  671. case "http.uri":
  672. traceRoot.Uri, traceRoot.Parameters, _ = parseURIToParams(attr.Value.AsString())
  673. case "http.method":
  674. traceRoot.HttpMethod = attr.Value.AsString()
  675. case "http.status_code":
  676. traceRoot.HttpCode = attr.Value.AsInt64()
  677. case "net.peer.name":
  678. // TODO 修改 ClientIp sip获取方式
  679. traceRoot.ClientIp = attr.Value.AsString()
  680. traceRoot.Sip = attr.Value.AsString()
  681. traceRoot.Sn = attr.Value.AsString()
  682. case "net.peer.port":
  683. traceRoot.Sport = attr.Value.AsInt64()
  684. traceRoot.LocalPort = attr.Value.AsInt64()
  685. case "server.trace_id_from":
  686. traceRoot.TraceId = attr.Value.AsString()
  687. case "server.called_id":
  688. traceRoot.CalledId = attr.Value.AsInt64()
  689. case "server.instance_id_from":
  690. traceRoot.InstanceIdFrom = attr.Value.AsInt64()
  691. case "server.app_id_from":
  692. traceRoot.AppIdFrom = attr.Value.AsInt64()
  693. case "server.span_id_from":
  694. traceRoot.SpanIdFrom = attr.Value.AsString()
  695. case "server.type_from":
  696. traceRoot.TypeFrom = attr.Value.AsString()
  697. case "time.start_at":
  698. mNode.StartTime, mNode.StartTimeMs = cleanNsTime(attr.Value.AsInt64())
  699. traceRoot.CollTime = mNode.StartTimeMs
  700. case "time.end_at":
  701. mNode.EndTime, mNode.EndTimeMs = cleanNsTime(attr.Value.AsInt64())
  702. case "time.duration":
  703. traceRoot.RespTime = uint64(attr.Value.AsInt64()) / 1e3
  704. mNode.PureTime = traceRoot.RespTime
  705. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  706. case "server.code_type":
  707. code_type = attr.Value.AsInt64()
  708. case "server.app_name":
  709. traceRoot.AppName = attr.Value.AsString()
  710. case "server.service_name":
  711. traceRoot.ServiceName = attr.Value.AsString()
  712. mNode.ServiceName = attr.Value.AsString()
  713. case "server.app_id":
  714. traceRoot.AppId = attr.Value.AsInt64()
  715. case "server.agent_id":
  716. traceRoot.AgentId = attr.Value.AsInt64()
  717. case "server.instance_id":
  718. traceRoot.InstanceId = attr.Value.AsInt64()
  719. case "server.src_addr":
  720. traceRoot.SrcAddr = attr.Value.AsString()
  721. case "server.dst_addr":
  722. traceRoot.DestinationAddr = attr.Value.AsString()
  723. }
  724. }
  725. traceRoot.Maps = append(traceRoot.Maps, mNode)
  726. return int(code_type)
  727. }
  728. //func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
  729. // mNode.ServiceName = HTTP_SERVICE_NAME
  730. // mNode.ServiceType = HTTP_SERVICE_TYPE
  731. // mNode.Schema = "http"
  732. // mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
  733. // var descAddr string
  734. // for _, attr := range sd.Attributes() {
  735. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  736. // switch attr.Key {
  737. // case "http.ip":
  738. // mNode.Ip = attr.Value.AsString()
  739. // descAddr += mNode.Ip
  740. // case "http.port":
  741. // mNode.Port = attr.Value.AsInt64()
  742. // descAddr += ":" + attr.Value.AsString()
  743. // case "http.uri":
  744. // mNode.Uri = attr.Value.AsString()
  745. // case "http.assumed_app_id":
  746. // mNode.AssumedAppId = attr.Value.AsInt64()
  747. // case "http.span_id":
  748. // mNode.SpanId = attr.Value.AsString()
  749. // }
  750. // }
  751. // //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  752. //}
  753. func buildHttpMapFromEvent(mNode *MapInfoT, event tracesdk.Event) {
  754. mNode.ServiceName = HTTP_SERVICE_NAME
  755. mNode.ServiceType = HTTP_SERVICE_TYPE
  756. mNode.Schema = "http"
  757. //mNode.MethodName = "HTTP"
  758. //var descAddr string
  759. var method string
  760. for _, attr := range event.Attributes {
  761. klog.Debugln("HTTP--->", attr.Key, ":", attr.Value.AsInterface())
  762. switch attr.Key {
  763. case "http.ip":
  764. mNode.Ip = attr.Value.AsString()
  765. //descAddr += mNode.Ip
  766. case "http.port":
  767. mNode.Port = attr.Value.AsInt64()
  768. case "http.method":
  769. //mNode.MethodName += " " + attr.Value.AsString()
  770. method = attr.Value.AsString()
  771. //descAddr += ":" + attr.Value.AsString()
  772. case "http.uri":
  773. mNode.Uri = attr.Value.AsString()
  774. //mNode.MethodName += " " + attr.Value.AsString()
  775. case "http.assumed_app_id":
  776. mNode.AssumedAppId = attr.Value.AsInt64()
  777. case "http.span_id":
  778. mNode.SpanId = attr.Value.AsString()
  779. case "time.start_at":
  780. mNode.StartTime = uint64(attr.Value.AsInt64())
  781. case "time.end_at":
  782. mNode.EndTime = uint64(attr.Value.AsInt64())
  783. case "time.duration":
  784. //mNode.PureTime = uint64(attr.Value.AsInt64()) / 1e3
  785. mNode.WallTime = uint64(attr.Value.AsInt64()) / 1e3
  786. case "http.src_addr":
  787. mNode.SrcAddr = attr.Value.AsString()
  788. case "http.destination_addr":
  789. mNode.DestinationAddr = attr.Value.AsString()
  790. }
  791. }
  792. mNode.MethodName = fmt.Sprintf("%s %s %s:%d%s", "HTTP", method, mNode.Ip, mNode.Port, mNode.Uri)
  793. //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  794. }
  795. //func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
  796. // mNode.Dbn = "unknown"
  797. // mNode.ServiceName = MYSQL_SERVICE_NAME
  798. // mNode.ServiceType = SQL_SERVICE_TYPE
  799. // mNode.MethodName = "database/sql.Query()"
  800. // for _, attr := range sd.Attributes() {
  801. // //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  802. // switch attr.Key {
  803. // case "net.peer.name":
  804. // mNode.Ip = attr.Value.AsString()
  805. // case "net.peer.port":
  806. // mNode.Port = attr.Value.AsInt64()
  807. // case "db.statement":
  808. // query := attr.Value.AsString()
  809. // mNode.Ps = []string{query}
  810. // words := strings.Fields(query)
  811. // if len(words) > 0 {
  812. // mNode.OperType = strings.ToUpper(words[0])
  813. // }
  814. // }
  815. // }
  816. //}
  817. func buildSQLMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  818. mNode.Dbn = "-"
  819. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  820. mNode.ServiceType = SQL_SERVICE_TYPE
  821. //mNode.MethodName = "database/sql.Query()"
  822. for _, attr := range event.Attributes {
  823. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  824. switch attr.Key {
  825. case "net.peer.name":
  826. mNode.Ip = attr.Value.AsString()
  827. case "net.peer.port":
  828. mNode.Port = attr.Value.AsInt64()
  829. case "db.statement":
  830. query := attr.Value.AsString()
  831. mNode.MethodName = query
  832. mNode.Ps = []string{query}
  833. words := strings.Fields(query)
  834. if len(words) > 0 {
  835. mNode.OperType = strings.ToUpper(words[0])
  836. }
  837. case "sql.exception":
  838. if attr.Value.AsBool() {
  839. mNode.Exception = 1
  840. } else {
  841. mNode.Exception = 0
  842. }
  843. case "sql.src_addr":
  844. mNode.SrcAddr = attr.Value.AsString()
  845. case "sql.destination_addr":
  846. mNode.DestinationAddr = attr.Value.AsString()
  847. }
  848. }
  849. }
  850. func buildDNSMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  851. mNode.ServiceName = l7.Protocol(event.ProtocolType).ServiceNameString()
  852. mNode.ServiceType = NET_SERVICE_TYPE
  853. var _type string
  854. var fqdn string
  855. var ips string
  856. var ttl int64
  857. for _, attr := range event.Attributes {
  858. switch attr.Key {
  859. case "dns.type":
  860. _type = attr.Value.AsString()
  861. case "dns.fqdn":
  862. fqdn = attr.Value.AsString()
  863. case "dns.ttl":
  864. ttl = attr.Value.AsInt64()
  865. case "dns.ips":
  866. if attr.Value.AsString() != "" {
  867. ips = "Addr: " + attr.Value.AsString()
  868. }
  869. }
  870. }
  871. mNode.MethodName = fmt.Sprintf("DNS Name: %s Type: %s TTL: %d %s", fqdn, _type, ttl, ips)
  872. }
  873. func buildPostGreSqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  874. mNode.Dbn = "-"
  875. mNode.ServiceName = POSTGRESQL_SERVICE_NAME
  876. mNode.ServiceType = SQL_SERVICE_TYPE
  877. //mNode.MethodName = "database/sql.Query()"
  878. for _, attr := range event.Attributes {
  879. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  880. switch attr.Key {
  881. case "net.peer.name":
  882. mNode.Ip = attr.Value.AsString()
  883. case "net.peer.port":
  884. mNode.Port = attr.Value.AsInt64()
  885. case "db.statement":
  886. query := attr.Value.AsString()
  887. mNode.Ps = []string{query}
  888. words := strings.Fields(query)
  889. if len(words) > 0 {
  890. mNode.OperType = strings.ToUpper(words[0])
  891. }
  892. case "sql.exception":
  893. if attr.Value.AsBool() {
  894. mNode.Exception = 1
  895. } else {
  896. mNode.Exception = 0
  897. }
  898. case "sql.src_addr":
  899. mNode.SrcAddr = attr.Value.AsString()
  900. case "sql.destination_addr":
  901. mNode.DestinationAddr = attr.Value.AsString()
  902. }
  903. }
  904. }
  905. func buildMysqlMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  906. mNode.Dbn = "-"
  907. mNode.ServiceName = MYSQL_SERVICE_NAME
  908. mNode.ServiceType = SQL_SERVICE_TYPE
  909. //mNode.MethodName = "database/sql.Query()"
  910. for _, attr := range event.Attributes {
  911. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  912. switch attr.Key {
  913. case "net.peer.name":
  914. mNode.Ip = attr.Value.AsString()
  915. case "net.peer.port":
  916. mNode.Port = attr.Value.AsInt64()
  917. case "db.statement":
  918. query := attr.Value.AsString()
  919. mNode.MethodName = query
  920. mNode.Ps = []string{query}
  921. words := strings.Fields(query)
  922. if len(words) > 0 {
  923. mNode.OperType = strings.ToUpper(words[0])
  924. }
  925. case "sql.exception":
  926. if attr.Value.AsBool() {
  927. mNode.Exception = 1
  928. } else {
  929. mNode.Exception = 0
  930. }
  931. case "sql.src_addr":
  932. mNode.SrcAddr = attr.Value.AsString()
  933. case "sql.destination_addr":
  934. mNode.DestinationAddr = attr.Value.AsString()
  935. }
  936. }
  937. }
  938. func buildDMMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  939. mNode.Dbn = "TEST"
  940. mNode.ServiceName = DM_SERVICE_NAME
  941. mNode.ServiceType = SQL_SERVICE_TYPE
  942. mNode.MethodName = "database/sql.Query()"
  943. for _, attr := range event.Attributes {
  944. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  945. switch attr.Key {
  946. case "net.peer.name":
  947. mNode.Ip = attr.Value.AsString()
  948. case "net.peer.port":
  949. mNode.Port = attr.Value.AsInt64()
  950. case "db.statement":
  951. query := attr.Value.AsString()
  952. mNode.Ps = []string{query}
  953. words := strings.Fields(query)
  954. if len(words) > 0 {
  955. mNode.OperType = strings.ToUpper(words[0])
  956. }
  957. case "sql.exception":
  958. if attr.Value.AsBool() {
  959. mNode.Exception = 1
  960. } else {
  961. mNode.Exception = 0
  962. }
  963. case "sql.src_addr":
  964. mNode.SrcAddr = attr.Value.AsString()
  965. case "sql.destination_addr":
  966. mNode.DestinationAddr = attr.Value.AsString()
  967. }
  968. }
  969. }
  970. func buildRedisMapEvent(mNode *MapInfoT, event tracesdk.Event) {
  971. mNode.ServiceName = REDIS_SERVICE_NAME
  972. mNode.ServiceType = NOSQL_SERVICE_TYPE
  973. //mNode.MethodName = span(sd).Name + " query"
  974. //mNode.MethodName = "redis.Do()"
  975. for _, attr := range event.Attributes {
  976. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  977. switch attr.Key {
  978. case "net.peer.name":
  979. mNode.Ip = attr.Value.AsString()
  980. case "net.peer.port":
  981. mNode.Port = attr.Value.AsInt64()
  982. case "db.statement":
  983. query := attr.Value.AsString()
  984. mNode.MethodName = query
  985. mNode.Ps = []string{query}
  986. words := strings.Fields(query)
  987. if len(words) > 0 {
  988. mNode.OperType = strings.ToUpper(words[0])
  989. }
  990. case "nosql.src_addr":
  991. mNode.SrcAddr = attr.Value.AsString()
  992. case "nosql.destination_addr":
  993. mNode.DestinationAddr = attr.Value.AsString()
  994. }
  995. }
  996. }
  997. func isEnter(_type string) bool {
  998. if _type == "APPLICATION" {
  999. return true
  1000. }
  1001. return false
  1002. }
  1003. func span(sd apmTraceSpan) *tracepb.Span {
  1004. if sd == nil {
  1005. return nil
  1006. }
  1007. tid := sd.SpanContext().TraceID()
  1008. sid := sd.SpanContext().SpanID()
  1009. s := &tracepb.Span{
  1010. TraceId: tid[:],
  1011. SpanId: sid[:],
  1012. TraceState: sd.SpanContext().TraceState().String(),
  1013. //Status: status(sd.Status().Code, sd.Status().Description),
  1014. StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
  1015. EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
  1016. //Links: links(sd.Links()),
  1017. //Kind: spanKind(sd.SpanKind()),
  1018. Name: sd.Name(),
  1019. Attributes: tracetransform.KeyValues(sd.Attributes()),
  1020. //Events: spanEvents(sd.Events()),
  1021. DroppedAttributesCount: uint32(sd.DroppedAttributes()),
  1022. DroppedEventsCount: uint32(sd.DroppedEvents()),
  1023. DroppedLinksCount: uint32(sd.DroppedLinks()),
  1024. }
  1025. if psid := sd.Parent().SpanID(); psid.IsValid() {
  1026. s.ParentSpanId = psid[:]
  1027. }
  1028. return s
  1029. }
  1030. func Md5ToInt64(strParam string, Len int) int64 {
  1031. sign := md5.Sum([]byte(strParam))
  1032. signStr := fmt.Sprintf("%x", sign)
  1033. charArr := []rune(signStr)
  1034. var intStr string
  1035. for _, value := range charArr {
  1036. intStr += strconv.Itoa(int(value))
  1037. }
  1038. intStr = intStr[:Len]
  1039. int64Data, err := strconv.ParseInt(intStr, 10, 64)
  1040. if err != nil {
  1041. return 0
  1042. }
  1043. return int64Data
  1044. }
  1045. // ns,ms
  1046. func cleanNsTime(time int64) (uint64, uint64) {
  1047. return uint64(time), uint64(math.Round(float64(time) / 1e6))
  1048. }