apm_exporter.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. package otlptrace
  2. import (
  3. "crypto/md5"
  4. "encoding/json"
  5. "fmt"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "go.opentelemetry.io/otel/exporters/otlp/otlptrace/internal/tracetransform"
  10. tracesdk "go.opentelemetry.io/otel/sdk/trace"
  11. tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
  12. "time"
  13. )
  14. const (
  15. APP_SERVICE_TYPE = "APPLICATION"
  16. SQL_SERVICE_TYPE = "SQL"
  17. NOSQL_SERVICE_TYPE = "NOSQL"
  18. HTTP_SERVICE_TYPE = "HTTP"
  19. )
  20. const (
  21. GO_SERVICE_NAME = "GO"
  22. MYSQL_SERVICE_NAME = "MYSQL"
  23. REDIS_SERVICE_NAME = "REDIS"
  24. HTTP_SERVICE_NAME = "HTTPCLIENT"
  25. )
  26. type apmTraceSpan tracesdk.ReadOnlySpan
  27. // GO:0:10154813500555812:5450531005555981:5610250100539899:ee022542c3940f1b:1001025098564810:888ceb3df1bdbe2c:110
  28. type RootDataT struct {
  29. AccountId int `json:"account_id"`
  30. AgentId int64 `json:"agent_id"`
  31. AgentVersion string `json:"agent_version"`
  32. AppId int64 `json:"app_id"`
  33. AppIdFrom int64 `json:"app_id_from"` // from header app_id
  34. AppName string `json:"app_name"`
  35. CalledId int64 `json:"called_id"` // from header assumed_app_id
  36. ClientIp string `json:"client_ip"`
  37. CollTime uint64 `json:"coll_time"`
  38. Cpu int `json:"cpu"`
  39. Custom string `json:"custom"`
  40. HostId int64 `json:"host_id"`
  41. HostName string `json:"host_name"`
  42. HttpCode int64 `json:"http_code"`
  43. HttpMethod string `json:"http_method"`
  44. InstanceId int64 `json:"instance_id"`
  45. InstanceIdFrom int64 `json:"instance_id_from"` // from header instance_id
  46. LocalPort int64 `json:"local_port"`
  47. Maps []MapInfoT `json:"maps"`
  48. MemU int `json:"mem_u"`
  49. MemUP int `json:"mem_u_p"`
  50. OperType string `json:"oper_type"`
  51. Parameters []interface{} `json:"parameters"`
  52. ParentTaskName int `json:"parent_task_name"`
  53. Period int `json:"period"`
  54. RespTime uint64 `json:"resp_time"`
  55. Sampling int `json:"sampling"`
  56. ServiceName string `json:"service_name"`
  57. ServiceType string `json:"service_type"`
  58. Sip string `json:"sip"`
  59. Sn string `json:"sn"`
  60. SpanIdFrom string `json:"span_id_from"` // from header span_id
  61. Sport int64 `json:"sport"`
  62. TId int `json:"t_id"`
  63. TName string `json:"t_name"`
  64. TraceId string `json:"trace_id"` // from header trace_id
  65. TransIds []interface{} `json:"trans_ids"`
  66. TypeFrom string `json:"type_from"`
  67. Uri string `json:"uri"`
  68. UserDir int `json:"user_dir"`
  69. VipIds []interface{} `json:"vip_ids"`
  70. }
  71. type MapInfoT struct {
  72. Dbn string `json:"dbn,omitempty"`
  73. Exception int `json:"exception,omitempty"`
  74. ExceptionMsg string `json:"exception_msg,omitempty"`
  75. ExceptionStack string `json:"exception_stack,omitempty"`
  76. Ip string `json:"ip,omitempty"`
  77. Level int `json:"level"`
  78. MethodDesc string `json:"method_desc,omitempty"`
  79. MethodName string `json:"method_name"`
  80. Nid int `json:"nid"`
  81. OperType string `json:"oper_type,omitempty"`
  82. Pid int `json:"pid"`
  83. Port int64 `json:"port,omitempty"`
  84. Ps []string `json:"ps,omitempty"`
  85. PureTime uint64 `json:"pure_time"`
  86. ServiceName string `json:"service_name"`
  87. ServiceType string `json:"service_type"`
  88. StartTime uint64 `json:"start_time"`
  89. WallTime uint64 `json:"wall_time"`
  90. Schema string `json:"schema,omitempty"`
  91. AssumedAppId int64 `json:"assumed_app_id,omitempty"`
  92. Uri string `json:"uri,omitempty"`
  93. SpanId string `json:"span_id,omitempty"`
  94. }
  95. type TraceMapT struct {
  96. RootData RootDataT
  97. Index int
  98. lock *sync.RWMutex
  99. TheEnd bool
  100. }
  101. var TraceRootMap map[string]*TraceMapT
  102. func init() {
  103. TraceRootMap = make(map[string]*TraceMapT)
  104. go func() {
  105. for {
  106. //fmt.Println(G_sdl)
  107. time.Sleep(5 * time.Second)
  108. }
  109. }()
  110. }
  111. var G_sdl int
  112. func tracetransformData(sdl []tracesdk.ReadOnlySpan) []RootDataT {
  113. G_sdl += len(sdl)
  114. if len(sdl) == 0 {
  115. return nil
  116. }
  117. for _, sd := range sdl {
  118. if sd == nil {
  119. continue
  120. }
  121. traceId := sd.SpanContext().TraceID().String()
  122. if _, ok := TraceRootMap[traceId]; !ok {
  123. TraceRootMap[traceId] = &TraceMapT{RootData: initRootData(traceId), Index: 1}
  124. }
  125. TraceRootMap[traceId].Index++
  126. buildAndAssemblyMap(sd, TraceRootMap[traceId])
  127. }
  128. // 发送完整数据 | 大量长耗时请求会增加内存占用
  129. sendData := []RootDataT{}
  130. for traceId, v := range TraceRootMap {
  131. if v.TheEnd {
  132. sendData = append(sendData, v.RootData)
  133. delete(TraceRootMap, traceId)
  134. //fmt.Println("the end!")
  135. } else {
  136. //fmt.Println("not end!")
  137. }
  138. }
  139. // Transform the categorized map into a slice
  140. aa, err := json.Marshal(sendData)
  141. fmt.Println(err)
  142. fmt.Println(string(aa))
  143. fmt.Println(len(sendData))
  144. fmt.Println(len(sdl))
  145. return sendData
  146. }
  147. func initRootData(traceId string) RootDataT {
  148. data := RootDataT{
  149. AccountId: 110,
  150. AgentId: 1011005252979954, // TODO 更新 基于 ip:port + process_name + exe路径生成
  151. AgentVersion: "2.1.0",
  152. AppId: 5410049101545798, // TODO 更新 基于appname生成
  153. AppIdFrom: -1,
  154. AppName: "eBPF-agent", // TODO 更新 ip:port || process_name
  155. CalledId: -1,
  156. ClientIp: "",
  157. CollTime: 0,
  158. Cpu: 0,
  159. Custom: "",
  160. HostId: 10154813500555812,
  161. HostName: "localhost",
  162. HttpCode: 0,
  163. HttpMethod: "",
  164. InstanceId: 1005051101515357, // TODO 更新 基于ip:port
  165. InstanceIdFrom: -1,
  166. Maps: []MapInfoT{},
  167. MemU: 0,
  168. MemUP: 0,
  169. OperType: "",
  170. Parameters: []interface{}{},
  171. ParentTaskName: 0,
  172. Period: -1,
  173. RespTime: 0,
  174. Sampling: 0,
  175. ServiceName: "GO",
  176. ServiceType: APP_SERVICE_TYPE,
  177. Sip: "",
  178. Sn: "",
  179. SpanIdFrom: "",
  180. Sport: 0,
  181. TId: -1,
  182. TName: "",
  183. TraceId: traceId,
  184. TransIds: []interface{}{},
  185. TypeFrom: "",
  186. Uri: "",
  187. UserDir: 0,
  188. VipIds: []interface{}{},
  189. }
  190. return data
  191. }
  192. func initMapNode(spanSd *tracepb.Span) (MapInfoT, string) {
  193. mNode := MapInfoT{
  194. Exception: 0,
  195. ExceptionMsg: "",
  196. ExceptionStack: "",
  197. Ip: "",
  198. Level: 2,
  199. Pid: 1,
  200. Port: 0,
  201. Ps: []string{},
  202. ServiceName: "",
  203. ServiceType: "",
  204. WallTime: 0,
  205. }
  206. mNode.MethodName = spanSd.Name
  207. mNode.PureTime = (spanSd.EndTimeUnixNano - spanSd.StartTimeUnixNano) / 1e3
  208. mNode.WallTime = mNode.PureTime
  209. mNode.StartTime = spanSd.StartTimeUnixNano / 1e6
  210. for _, attr := range spanSd.GetAttributes() {
  211. switch attr.Key {
  212. case "nid":
  213. mNode.Nid = int(attr.Value.GetIntValue())
  214. case "pid":
  215. mNode.Pid = int(attr.Value.GetIntValue())
  216. case "level":
  217. mNode.Level = int(attr.Value.GetIntValue())
  218. }
  219. }
  220. return mNode, spanSd.Name
  221. }
  222. // 构建拼装
  223. func buildAndAssemblyMap(sd apmTraceSpan, traceRoot *TraceMapT) MapInfoT {
  224. mNode, mapType := initMapNode(span(sd))
  225. switch mapType {
  226. case "APPLICATION":
  227. buildAppMap(&mNode, traceRoot, sd)
  228. traceRoot.TheEnd = true
  229. case "HTTP":
  230. buildHttpMap(&mNode, sd)
  231. case "Mysql":
  232. buildMysqlMap(&mNode, sd)
  233. case "Redis":
  234. buildRedisMap(&mNode, sd)
  235. }
  236. if mapType != "" {
  237. // mNode.Nid = traceRoot.Index
  238. traceRoot.RootData.Maps = append(traceRoot.RootData.Maps, mNode)
  239. }
  240. return mNode
  241. }
  242. func buildAppMap(mNode *MapInfoT, traceRoot *TraceMapT, sd apmTraceSpan) {
  243. mNode.ServiceName = GO_SERVICE_NAME
  244. mNode.ServiceType = APP_SERVICE_TYPE
  245. mNode.MethodName = "net/http.(*Transport).roundTrip()"
  246. mNode.Level = 1
  247. mNode.Pid = 0
  248. // 构建root节点
  249. traceRoot.RootData.RespTime = mNode.PureTime
  250. traceRoot.RootData.CollTime = mNode.StartTime
  251. traceRoot.Index = 1
  252. for _, attr := range sd.Attributes() {
  253. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  254. switch attr.Key {
  255. case "http.uri":
  256. traceRoot.RootData.Uri = attr.Value.AsString()
  257. case "http.method":
  258. traceRoot.RootData.HttpMethod = attr.Value.AsString()
  259. case "http.status_code":
  260. traceRoot.RootData.HttpCode = attr.Value.AsInt64()
  261. case "net.peer.name":
  262. traceRoot.RootData.ClientIp = attr.Value.AsString()
  263. traceRoot.RootData.Sip = attr.Value.AsString()
  264. traceRoot.RootData.Sn = attr.Value.AsString()
  265. case "net.peer.port":
  266. traceRoot.RootData.Sport = attr.Value.AsInt64()
  267. traceRoot.RootData.LocalPort = attr.Value.AsInt64()
  268. case "server.trace_id_from":
  269. traceRoot.RootData.TraceId = attr.Value.AsString()
  270. case "server.called_id":
  271. traceRoot.RootData.CalledId = attr.Value.AsInt64()
  272. case "server.instance_id_from":
  273. traceRoot.RootData.InstanceIdFrom = attr.Value.AsInt64()
  274. case "server.app_id_from":
  275. traceRoot.RootData.AppIdFrom = attr.Value.AsInt64()
  276. case "server.span_id_from":
  277. traceRoot.RootData.SpanIdFrom = attr.Value.AsString()
  278. case "server.type_from":
  279. traceRoot.RootData.TypeFrom = attr.Value.AsString()
  280. }
  281. }
  282. }
  283. func buildHttpMap(mNode *MapInfoT, sd apmTraceSpan) {
  284. mNode.ServiceName = HTTP_SERVICE_NAME
  285. mNode.ServiceType = HTTP_SERVICE_TYPE
  286. mNode.Schema = "http"
  287. mNode.MethodName = "net/http.serverHandler.ServeHTTP()"
  288. var descAddr string
  289. for _, attr := range sd.Attributes() {
  290. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  291. switch attr.Key {
  292. case "http.ip":
  293. mNode.Ip = attr.Value.AsString()
  294. descAddr += mNode.Ip
  295. case "http.port":
  296. mNode.Port = attr.Value.AsInt64()
  297. descAddr += ":" + attr.Value.AsString()
  298. case "http.uri":
  299. mNode.Uri = attr.Value.AsString()
  300. case "http.assumed_app_id":
  301. mNode.AssumedAppId = attr.Value.AsInt64()
  302. case "http.span_id":
  303. mNode.SpanId = attr.Value.AsString()
  304. }
  305. }
  306. //mNode.AssumedAppId = Md5ToInt64(descAddr, 16)
  307. }
  308. func buildMysqlMap(mNode *MapInfoT, sd apmTraceSpan) {
  309. mNode.Dbn = "unknown"
  310. mNode.ServiceName = MYSQL_SERVICE_NAME
  311. mNode.ServiceType = SQL_SERVICE_TYPE
  312. mNode.MethodName = "database/sql.Query()"
  313. for _, attr := range sd.Attributes() {
  314. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  315. switch attr.Key {
  316. case "net.peer.name":
  317. mNode.Ip = attr.Value.AsString()
  318. case "net.peer.port":
  319. mNode.Port = attr.Value.AsInt64()
  320. case "db.statement":
  321. query := attr.Value.AsString()
  322. mNode.Ps = []string{query}
  323. words := strings.Fields(query)
  324. if len(words) > 0 {
  325. mNode.OperType = strings.ToUpper(words[0])
  326. }
  327. }
  328. }
  329. }
  330. func buildRedisMap(mNode *MapInfoT, sd apmTraceSpan) {
  331. mNode.ServiceName = REDIS_SERVICE_NAME
  332. mNode.ServiceType = NOSQL_SERVICE_TYPE
  333. //mNode.MethodName = span(sd).Name + " query"
  334. mNode.MethodName = "redis.Do()"
  335. for _, attr := range sd.Attributes() {
  336. //fmt.Println(attr.Key, ":", attr.Value.AsInterface())
  337. switch attr.Key {
  338. case "net.peer.name":
  339. mNode.Ip = attr.Value.AsString()
  340. case "net.peer.port":
  341. mNode.Port = attr.Value.AsInt64()
  342. case "db.statement":
  343. query := attr.Value.AsString()
  344. mNode.Ps = []string{query}
  345. words := strings.Fields(query)
  346. if len(words) > 0 {
  347. mNode.OperType = strings.ToUpper(words[0])
  348. }
  349. }
  350. }
  351. }
  352. func isEnter(_type string) bool {
  353. if _type == "APPLICATION" {
  354. return true
  355. }
  356. return false
  357. }
  358. func span(sd apmTraceSpan) *tracepb.Span {
  359. if sd == nil {
  360. return nil
  361. }
  362. tid := sd.SpanContext().TraceID()
  363. sid := sd.SpanContext().SpanID()
  364. s := &tracepb.Span{
  365. TraceId: tid[:],
  366. SpanId: sid[:],
  367. TraceState: sd.SpanContext().TraceState().String(),
  368. //Status: status(sd.Status().Code, sd.Status().Description),
  369. StartTimeUnixNano: uint64(sd.StartTime().UnixNano()),
  370. EndTimeUnixNano: uint64(sd.EndTime().UnixNano()),
  371. //Links: links(sd.Links()),
  372. //Kind: spanKind(sd.SpanKind()),
  373. Name: sd.Name(),
  374. Attributes: tracetransform.KeyValues(sd.Attributes()),
  375. //Events: spanEvents(sd.Events()),
  376. DroppedAttributesCount: uint32(sd.DroppedAttributes()),
  377. DroppedEventsCount: uint32(sd.DroppedEvents()),
  378. DroppedLinksCount: uint32(sd.DroppedLinks()),
  379. }
  380. if psid := sd.Parent().SpanID(); psid.IsValid() {
  381. s.ParentSpanId = psid[:]
  382. }
  383. return s
  384. }
  385. func Md5ToInt64(strParam string, Len int) int64 {
  386. sign := md5.Sum([]byte(strParam))
  387. signStr := fmt.Sprintf("%x", sign)
  388. charArr := []rune(signStr)
  389. var intStr string
  390. for _, value := range charArr {
  391. intStr += strconv.Itoa(int(value))
  392. }
  393. intStr = intStr[:Len]
  394. int64Data, err := strconv.ParseInt(intStr, 10, 64)
  395. if err != nil {
  396. return 0
  397. }
  398. return int64Data
  399. }