tracer.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/coroot/coroot-node-agent/utils"
  14. "github.com/coroot/coroot-node-agent/utils/try"
  15. "github.com/cilium/ebpf"
  16. "github.com/cilium/ebpf/link"
  17. "github.com/cilium/ebpf/perf"
  18. "github.com/coroot/coroot-node-agent/common"
  19. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  20. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  21. "github.com/coroot/coroot-node-agent/proc"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. klog "github.com/sirupsen/logrus"
  24. "golang.org/x/sys/unix"
  25. "inet.af/netaddr"
  26. )
  27. /*
  28. #define TASK_COMM_LEN 16
  29. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  30. #include <linux/types.h>
  31. struct __tuple_t {
  32. __u8 daddr[16];
  33. __u8 rcv_saddr[16];
  34. __u8 addr_len;
  35. __u8 l4_protocol;
  36. __u16 dport;
  37. __u16 num;
  38. };
  39. struct __socket_data {
  40. __u32 pid;
  41. __u32 tgid;
  42. __u64 coroutine_id;
  43. __u8 source;
  44. __u8 comm[TASK_COMM_LEN];
  45. __u64 socket_id;
  46. struct __tuple_t tuple;
  47. __u32 extra_data;
  48. __u32 extra_data_count;
  49. __u32 tcp_seq;
  50. __u64 thread_trace_id;
  51. __u64 timestamp;
  52. __u8 direction: 1;
  53. __u8 msg_type: 7;
  54. __u64 syscall_len;
  55. __u64 data_seq;
  56. __u16 data_type;
  57. __u16 data_len;
  58. char data[BURST_DATA_BUF_SIZE];
  59. } __attribute__((packed));
  60. struct __socket_data_buffer {
  61. __u32 events_num;
  62. __u32 len;
  63. char data[32760];
  64. };
  65. */
  66. import "C"
  67. type SocketData C.struct___socket_data
  68. type SocketDataBuffer C.struct___socket_data_buffer
  69. const MaxPayloadSize = 1024
  70. type EventType uint32
  71. type EventReason uint32
  72. const (
  73. EventTypeProcessStart EventType = 1
  74. EventTypeProcessExit EventType = 2
  75. EventTypeConnectionOpen EventType = 3
  76. EventTypeConnectionClose EventType = 4
  77. EventTypeConnectionError EventType = 5
  78. EventTypeListenOpen EventType = 6
  79. EventTypeListenClose EventType = 7
  80. EventTypeFileOpen EventType = 8
  81. EventTypeTCPRetransmit EventType = 9
  82. EventTypeL7Request EventType = 10
  83. EventTypeFunEnt EventType = 11
  84. EventTypeFunRet EventType = 12
  85. EventTypeAcceptOpen EventType = 13
  86. EventTypeAcceptClose EventType = 14
  87. EventReasonNone EventReason = 0
  88. EventReasonOOMKill EventReason = 1
  89. )
  90. type TrafficStats struct {
  91. BytesSent uint64
  92. BytesReceived uint64
  93. }
  94. type Event struct {
  95. StackEvent *StackEvent
  96. Type EventType
  97. Reason EventReason
  98. Pid uint32
  99. SrcAddr netaddr.IPPort
  100. DstAddr netaddr.IPPort
  101. Fd uint64
  102. Timestamp uint64
  103. Duration time.Duration
  104. L7Request *l7.RequestData
  105. TrafficStats *TrafficStats
  106. FirstReadTime uint64
  107. FirstWriteTime uint64
  108. NewReadTime uint64
  109. }
  110. type perfMapType uint8
  111. const (
  112. perfMapTypeProcEvents perfMapType = 1
  113. perfMapTypeTCPEvents perfMapType = 2
  114. perfMapTypeFileEvents perfMapType = 3
  115. perfMapTypeL7Events perfMapType = 4
  116. perfMapTypeSocketEvents perfMapType = 5
  117. perfMapTypeEventQueue perfMapType = 6
  118. perfMapTypePythonThreadEvents perfMapType = 7
  119. )
  120. type Tracer struct {
  121. kernelVersion string
  122. disableL7Tracing bool
  123. disableE2ETracing bool
  124. disableStackTracing bool
  125. collection *ebpf.Collection
  126. collectionSpec *ebpf.CollectionSpec
  127. readers map[string]*perf.Reader
  128. links []link.Link
  129. uprobes map[string]*ebpf.Program
  130. Symbols []debugelf.Symbol
  131. Uprobes []tracer.Uprobe
  132. UprobesMap map[string]tracer.Uprobe
  133. }
  134. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  135. if disableL7Tracing {
  136. klog.Infoln("L7 tracing is disabled")
  137. } else {
  138. klog.Infoln("L7 tracing is enabled")
  139. }
  140. if disableE2ETracing {
  141. klog.Infoln("e2e is disabled")
  142. } else {
  143. klog.Infoln("e2e is enabled")
  144. }
  145. if disableStackTracing {
  146. klog.Infoln("L7 stack is disabled")
  147. } else {
  148. klog.Infoln("L7 stack is enabled")
  149. }
  150. return &Tracer{
  151. kernelVersion: kernelVersion,
  152. disableL7Tracing: disableL7Tracing,
  153. disableE2ETracing: disableE2ETracing,
  154. disableStackTracing: disableStackTracing,
  155. readers: map[string]*perf.Reader{},
  156. uprobes: map[string]*ebpf.Program{},
  157. links: []link.Link{},
  158. }
  159. }
  160. func (t *Tracer) Run(events chan<- Event) error {
  161. if err := t.ebpf(events); err != nil {
  162. return err
  163. }
  164. if err := t.init(events); err != nil {
  165. return err
  166. }
  167. return nil
  168. }
  169. func (t *Tracer) Close() {
  170. for k, p := range t.uprobes {
  171. if p != nil {
  172. err := p.Close()
  173. klog.WithError(err).Infof("[close] uprobes %s", k)
  174. }
  175. }
  176. for _, l := range t.links {
  177. if l != nil {
  178. err := l.Close()
  179. klog.WithError(err).Infof("[close] links")
  180. }
  181. }
  182. for k, r := range t.readers {
  183. if r != nil {
  184. err := r.Close()
  185. klog.WithError(err).Infof("[close] readers %s", k)
  186. }
  187. }
  188. if t.collection != nil {
  189. t.collection.Close()
  190. }
  191. }
  192. func (t *Tracer) init(ch chan<- Event) error {
  193. pids, err := proc.ListPids()
  194. if err != nil {
  195. return fmt.Errorf("failed to list pids: %w", err)
  196. }
  197. for _, pid := range pids {
  198. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  199. }
  200. fds, sockets := readFds(pids)
  201. for _, fd := range fds {
  202. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  203. }
  204. listens := map[uint64]bool{}
  205. for _, s := range sockets {
  206. if s.Listen {
  207. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  208. }
  209. }
  210. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  211. timestamp := uint64(time.Now().UnixNano())
  212. for _, s := range sockets {
  213. typ := EventTypeConnectionOpen
  214. if s.Listen {
  215. typ = EventTypeListenOpen
  216. //} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  217. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] { // 存在误判
  218. continue
  219. }
  220. ch <- Event{
  221. Type: typ,
  222. Pid: s.pid,
  223. Timestamp: timestamp,
  224. Fd: s.fd,
  225. SrcAddr: s.SAddr,
  226. DstAddr: s.DAddr,
  227. }
  228. if typ == EventTypeConnectionOpen {
  229. id := ConnectionId{FD: s.fd, PID: s.pid}
  230. sip := s.SAddr.IP()
  231. sipbytes := sip.As16()
  232. dip := s.DAddr.IP()
  233. dipbytes := dip.As16()
  234. conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()}
  235. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  236. klog.Warningln(err)
  237. }
  238. }
  239. }
  240. return nil
  241. }
  242. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  243. return t.collection.Maps["active_connections"].Iterate()
  244. }
  245. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  246. return t.collection.Maps["active_accepts"].Iterate()
  247. }
  248. type perfMap struct {
  249. name string
  250. perCPUBufferSizePages int
  251. typ perfMapType
  252. }
  253. func (t *Tracer) ebpf(ch chan<- Event) error {
  254. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  255. path, prg, err := EbpfCode(kv)
  256. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  257. if len(prg) == 0 || err != nil {
  258. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  259. }
  260. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  261. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  262. if debugFsErr != nil && traceFsErr != nil {
  263. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  264. }
  265. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  266. if err != nil {
  267. return fmt.Errorf("failed to load collection spec: %w", err)
  268. }
  269. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  270. tracer.PidFilter(collectionSpec)
  271. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  272. klog.Infof("[start] Look eBPF .maps")
  273. for _, spec := range collectionSpec.Maps {
  274. klog.Infoln(spec.Name)
  275. }
  276. klog.Infof("[end] Look eBPF .maps")
  277. tracer.MapInit(collectionSpec, opts)
  278. // TODO 多进程
  279. // tracer.SetConstants(collectionSpec)
  280. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  281. if err != nil {
  282. var verr *ebpf.VerifierError
  283. if errors.As(err, &verr) {
  284. klog.Errorf("----%+v", verr)
  285. }
  286. return fmt.Errorf("failed to load collection: %w", err)
  287. }
  288. tracer.Offset()
  289. t.collectionSpec = collectionSpec
  290. t.collection = c
  291. perfMaps := []perfMap{
  292. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  293. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  294. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  295. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  296. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  297. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  298. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  299. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  300. }
  301. tracer.MapInsert(c)
  302. if !t.DisableL7Tracing() {
  303. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  304. }
  305. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  306. klog.Infof("[start] Look eBPF perf_maps")
  307. for _, pm := range perfMaps {
  308. klog.Infoln(pm.name)
  309. m, ok := t.collection.Maps[pm.name]
  310. if ok {
  311. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  312. if err != nil {
  313. t.Close()
  314. return fmt.Errorf("failed to create ebpf reader: %w", err)
  315. }
  316. t.readers[pm.name] = r
  317. // event监听
  318. //go runEventsReader(pm.name, r, ch, pm.typ)
  319. try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ)
  320. }
  321. }
  322. klog.Infof("[end] Look eBPF perf_maps")
  323. klog.Infof("[start] Look eBPF specPrograms")
  324. if err = t.LinkEbpfProg(); err != nil {
  325. return err
  326. }
  327. klog.Infof("[end] Look eBPF specPrograms")
  328. return nil
  329. }
  330. func (t *Tracer) LinkEbpfProg() error {
  331. klog.Infof("[start] Look eBPF specPrograms")
  332. var (
  333. l link.Link
  334. err error
  335. lastErr error
  336. )
  337. for _, programSpec := range t.collectionSpec.Programs {
  338. program := t.collection.Programs[programSpec.Name]
  339. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  340. if t.DisableL7Tracing() {
  341. switch programSpec.Name {
  342. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  343. continue
  344. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  345. continue
  346. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  347. continue
  348. }
  349. }
  350. switch programSpec.Type {
  351. case ebpf.TracePoint:
  352. if strings.Contains(programSpec.SectionName, "prog") {
  353. continue
  354. }
  355. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  356. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  357. case ebpf.Kprobe:
  358. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  359. t.uprobes[programSpec.Name] = program
  360. continue
  361. }
  362. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  363. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  364. if err == nil {
  365. t.links = append(t.links, l)
  366. }
  367. continue
  368. }
  369. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  370. }
  371. if err != nil {
  372. lastErr = err
  373. t.Close()
  374. klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
  375. //return fmt.Errorf("failed to link program: %w", err)
  376. } else {
  377. t.links = append(t.links, l)
  378. }
  379. }
  380. klog.Infof("[end] Look eBPF specPrograms")
  381. if lastErr != nil {
  382. return fmt.Errorf("failed to link program: %w", lastErr)
  383. }
  384. return nil
  385. }
  386. func (t *Tracer) UnlinkEbpfProg() error {
  387. var (
  388. lastErr error
  389. err error
  390. )
  391. /* 此处不应该处理 t.uprobes 中 ebpf-program
  392. for pName, p := range t.uprobes {
  393. if err = p.Close(); err != nil {
  394. lastErr = err
  395. klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
  396. }
  397. }
  398. */
  399. for _, l := range t.links {
  400. if err = l.Close(); err != nil {
  401. lastErr = err
  402. klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
  403. }
  404. }
  405. return lastErr
  406. }
  407. func (t EventType) Int() int {
  408. return int(t)
  409. }
  410. func (t EventType) String() string {
  411. switch t {
  412. case EventTypeProcessStart:
  413. return "process-start"
  414. case EventTypeProcessExit:
  415. return "process-exit"
  416. case EventTypeConnectionOpen:
  417. return "connection-open"
  418. case EventTypeConnectionClose:
  419. return "connection-close"
  420. case EventTypeConnectionError:
  421. return "connection-error"
  422. case EventTypeListenOpen:
  423. return "listen-open"
  424. case EventTypeListenClose:
  425. return "listen-close"
  426. case EventTypeFileOpen:
  427. return "file-open"
  428. case EventTypeTCPRetransmit:
  429. return "tcp-retransmit"
  430. case EventTypeL7Request:
  431. return "l7-request"
  432. }
  433. return "unknown: " + strconv.Itoa(int(t))
  434. }
  435. func (t EventReason) String() string {
  436. switch t {
  437. case EventReasonNone:
  438. return "none"
  439. case EventReasonOOMKill:
  440. return "oom-kill"
  441. }
  442. return "unknown: " + strconv.Itoa(int(t))
  443. }
  444. type procEvent struct {
  445. Type EventType
  446. Pid uint32
  447. Reason uint32
  448. }
  449. type tcpEvent struct {
  450. Fd uint64
  451. Timestamp uint64
  452. Duration uint64
  453. FirstReadTime uint64
  454. FirstWriteTime uint64
  455. NewReadTime uint64
  456. Type EventType
  457. Pid uint32
  458. BytesSent uint64
  459. BytesReceived uint64
  460. SPort uint16
  461. DPort uint16
  462. SAddr [16]byte
  463. DAddr [16]byte
  464. }
  465. type fileEvent struct {
  466. Type EventType
  467. Pid uint32
  468. Fd uint64
  469. }
  470. // struct l7_event in l7.c
  471. type l7Event struct {
  472. Fd uint64
  473. ConnectionTimestamp uint64
  474. Pid uint32
  475. Status uint32
  476. Duration uint64
  477. Protocol uint8
  478. Method uint8
  479. Padding uint16
  480. StatementId uint32
  481. PayloadSize uint64
  482. TraceId uint64
  483. StartAt uint64 // ns
  484. EndtAt uint64 // ns
  485. TraceStart uint32
  486. TraceEnd uint32
  487. TraceType uint32
  488. EventCount uint32
  489. Sport uint16
  490. Dport uint16
  491. SAddr HashByte16
  492. DAddr HashByte16
  493. ComponentSport uint16
  494. ComponentDport uint16
  495. IsTls uint16
  496. ComponentSAddr HashByte16
  497. ComponentDAddr HashByte16
  498. AssumedAppId HashByte
  499. SpanId HashByte
  500. TraceIdFrom HashByte16
  501. CalledId HashByte
  502. InstanceIdFrom HashByte
  503. AppIdFrom HashByte
  504. SpanIdFrom HashByte
  505. TypeFrom [1]byte
  506. SysvcFrom [76]byte // cwother header value: NN:ParentServiceName:MM:ParentServiceSysTag
  507. RPCTarget [64]byte
  508. ErrorMsg HashByte128
  509. MQ struct {
  510. Topic [256]byte // MQ topic (e.g., Kafka topic)
  511. Key [256]byte // MQ key (e.g., Kafka message key)
  512. }
  513. }
  514. type SocketDataBufferddd struct {
  515. EventsNum uint32
  516. Len uint32
  517. Data [32760]byte
  518. }
  519. const (
  520. TASK_COMM_LEN = 16
  521. BURST_DATA_BUF_SIZE = 8192
  522. )
  523. type Tuple struct {
  524. Daddr [16]uint8
  525. RcvSaddr [16]uint8
  526. AddrLen uint8
  527. L4Protocol uint8
  528. Dport uint16
  529. Num uint16
  530. }
  531. type SocketDatadddd struct {
  532. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  533. Tgid uint32 // 进程号
  534. CoroutineID uint64
  535. Source uint8
  536. Comm [TASK_COMM_LEN]byte
  537. SocketID uint64
  538. Tuple Tuple
  539. ExtraData uint32
  540. ExtraDataCount uint32
  541. TcpSeq uint32
  542. ThreadTraceID uint64
  543. Timestamp uint64
  544. Direction uint8
  545. MsgType uint8
  546. SyscallLen uint64
  547. DataSeq uint64
  548. DataType uint16
  549. DataLen uint16
  550. Data [BURST_DATA_BUF_SIZE]byte
  551. }
  552. type StackEvent struct {
  553. Type uint64
  554. Pid uint64
  555. TraceId uint64
  556. Goid uint64
  557. Ip uint64
  558. Bp uint64
  559. CallerIp uint64
  560. CallerBp uint64
  561. TimeNsStart uint64
  562. TimeNsEnd uint64
  563. // Nid uint64
  564. // Fpid uint64
  565. // Level uint64
  566. Location byte
  567. ClassName [100]byte
  568. MethedName [100]byte
  569. }
  570. type StackFunEvent struct {
  571. StackEvent StackEvent
  572. Uprobe *tracer.Uprobe
  573. }
  574. type pythonThreadEvent struct {
  575. Type EventType
  576. Pid uint32
  577. Duration uint64
  578. }
  579. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  580. for {
  581. rec, err := r.Read()
  582. if err != nil {
  583. if errors.Is(err, perf.ErrClosed) {
  584. break
  585. }
  586. continue
  587. }
  588. if rec.LostSamples > 0 {
  589. klog.Errorln(name, "lost samples:", rec.LostSamples)
  590. continue
  591. }
  592. var event Event
  593. switch typ {
  594. case perfMapTypeSocketEvents:
  595. //fmt.Println("perfMapTypeSocketEvents")
  596. // 假设 rec.RawSample 包含数据,类型为 []byte
  597. //rawData := rec.RawSample
  598. //fmt.Println("perfMapTypeSocketEvents2")
  599. //
  600. //// 创建一个 SocketDataBuffer 结构体实例
  601. //var buffer SocketDataBuffer
  602. //
  603. //// 创建一个字节缓冲区,并将数据填充到其中
  604. //reader := bytes.NewReader(rawData)
  605. //fmt.Println("perfMapTypeSocketEvents3")
  606. //fmt.Println(len(rawData))
  607. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  608. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  609. // fmt.Println(reader.Len())
  610. // fmt.Println("Failed to read data:", err)
  611. // continue
  612. //}
  613. //fmt.Println("perfMapTypeSocketEvents4")
  614. //
  615. //// 打印解析后的数据
  616. //fmt.Println("EventsNum:", buffer.EventsNum)
  617. //fmt.Println("Len:", buffer.Len)
  618. //
  619. //// 打印 char data 的内容
  620. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  621. //socketDataBuffer := rec.RawSample
  622. // 解析 __socket_data_buffer
  623. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  624. //
  625. //// 获取 char data[32760];
  626. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  627. //// todo
  628. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  629. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  630. //
  631. //// 解析C结构体中的data字段
  632. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  633. //// 打印或处理包含的数据
  634. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  635. /*todo */
  636. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  637. //dataPtr := unsafe.Pointer(&buf.data[0])
  638. //socketData := (*SocketData)(dataPtr)
  639. //reader2 := bytes.NewBuffer(rec.RawSample)
  640. // 222222
  641. //fmt.Println("socketData.Pid:", socketData.pid)
  642. //fmt.Println("socketData.Tgid:", socketData.tgid)
  643. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  644. //fmt.Println("socketData.Source:", socketData.source)
  645. //
  646. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  647. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  648. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  649. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  650. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  651. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  652. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  653. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  654. //fmt.Println("socketData.Direction:", socketData.Direction)
  655. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  656. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  657. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  658. //socketData := &SocketData{}
  659. //reader := bytes.NewBuffer(rec.RawSample)
  660. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  661. // klog.Warningln("failed1 to read msg:", err)
  662. // continue
  663. //}
  664. //
  665. //var data []byte
  666. //payload := reader.Bytes()
  667. //switch {
  668. //case v.Len == 0:
  669. //case v.Len > 32760:
  670. // data = payload[:32760]
  671. //default:
  672. // data = payload[:v.Len]
  673. //}
  674. //////data2 := data[:v.Len]
  675. ////fmt.Println("perfMapTypeSocketEvents")
  676. //fmt.Println(v.EventsNum)
  677. //fmt.Println(v.Len)
  678. //fmt.Println(string(data))
  679. //
  680. //var data2 SocketData
  681. //reader2 := bytes.NewBuffer(data)
  682. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  683. // klog.Warningln("failed2 to read msg:", err)
  684. // continue
  685. //}
  686. //
  687. //fmt.Println(data2.Pid)
  688. //fmt.Println(data2.Tgid)
  689. //fmt.Println(string(v.Data))
  690. //continue
  691. case perfMapTypeL7Events:
  692. v := &l7Event{}
  693. reader := bytes.NewBuffer(rec.RawSample)
  694. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  695. klog.Warningln("failed to read msg:", err)
  696. continue
  697. }
  698. //fmt.Println("v.TraceIdFrom")
  699. //fmt.Println(v.TraceIdFrom)
  700. //a := hex.EncodeToString(v.TraceIdFrom[:])
  701. //for _, b := range v.AssumedAppId {
  702. // fmt.Printf("v.AssumedAppId- %02\n", b)
  703. //}
  704. //fmt.Println(a)
  705. payload := reader.Bytes()
  706. req := &l7.RequestData{
  707. Protocol: l7.Protocol(v.Protocol),
  708. Pid: v.Pid,
  709. Status: l7.Status(v.Status),
  710. Duration: time.Duration(v.Duration),
  711. Method: l7.Method(v.Method),
  712. StatementId: v.StatementId,
  713. TraceId: v.TraceId,
  714. TraceStart: v.TraceStart,
  715. TraceEnd: v.TraceEnd,
  716. TraceType: v.TraceType,
  717. EventCount: v.EventCount,
  718. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  719. SpanId: hex.EncodeToString(v.SpanId[:]),
  720. StartAt: v.StartAt,
  721. EndAt: v.EndtAt,
  722. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  723. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  724. DestAddrString: utils.BytesToString(v.RPCTarget[:]),
  725. ErrorMsg: utils.BytesToString(v.ErrorMsg[:]),
  726. IsTls: v.IsTls > 0,
  727. MQTopic: utils.BytesToString(v.MQ.Topic[:]),
  728. MQKey: utils.BytesToString(v.MQ.Key[:]),
  729. }
  730. if req.Protocol == l7.ProtocolHTTP {
  731. klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  732. klog.Debugf("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  733. }
  734. if v.TraceEnd == TRACE_STATUS {
  735. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  736. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  737. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  738. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  739. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  740. req.ParentSpanContext.TypeFrom = l7.TypeFrom(hex.EncodeToString(v.TypeFrom[:]))
  741. req.SysvcFrom = utils.BytesToString(v.SysvcFrom[:])
  742. klog.Debugf("cwother '%s'", req.SysvcFrom)
  743. // klog.Debugf("req.ParentSpanContext.TraceIdFrom %s", req.ParentSpanContext.TraceIdFrom)
  744. // klog.Debugf("req.ParentSpanContext.TypeFrom %s", req.ParentSpanContext.TypeFrom)
  745. req.SAddr = ipPort(v.SAddr, v.Sport)
  746. req.DAddr = ipPort(v.DAddr, v.Dport)
  747. // klog.Debugf("runEventsReader SAddr.String %s", req.SAddr.String())
  748. // klog.Debugf("runEventsReader DAddr.String %s", req.DAddr.String())
  749. }
  750. switch {
  751. case v.PayloadSize == 0:
  752. case v.PayloadSize > MaxPayloadSize:
  753. req.Payload = payload[:MaxPayloadSize]
  754. default:
  755. req.Payload = payload[:v.PayloadSize]
  756. }
  757. //fmt.Println("==========")
  758. //fmt.Println("req.Payload:", string(req.Payload))
  759. //fmt.Println("==========")
  760. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  761. case perfMapTypeFileEvents:
  762. v := &fileEvent{}
  763. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  764. klog.Warningln("failed to read msg:", err)
  765. continue
  766. }
  767. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  768. case perfMapTypeProcEvents:
  769. v := &procEvent{}
  770. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  771. klog.Warningln("failed to read msg:", err)
  772. continue
  773. }
  774. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  775. case perfMapTypeTCPEvents:
  776. v := &tcpEvent{}
  777. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  778. klog.Warningln("failed to read msg:", err)
  779. continue
  780. }
  781. event = Event{
  782. Type: v.Type,
  783. Pid: v.Pid,
  784. SrcAddr: ipPort(v.SAddr, v.SPort),
  785. DstAddr: ipPort(v.DAddr, v.DPort),
  786. Fd: v.Fd,
  787. Timestamp: v.Timestamp,
  788. Duration: time.Duration(v.Duration),
  789. }
  790. if v.Type == EventTypeConnectionClose {
  791. event.TrafficStats = &TrafficStats{
  792. BytesSent: v.BytesSent,
  793. BytesReceived: v.BytesReceived,
  794. }
  795. }
  796. event.FirstReadTime = v.FirstReadTime
  797. event.FirstWriteTime = v.FirstWriteTime
  798. event.NewReadTime = v.NewReadTime
  799. if v.Type == EventTypeAcceptClose {
  800. event.TrafficStats = &TrafficStats{
  801. BytesSent: v.BytesSent,
  802. BytesReceived: v.BytesReceived,
  803. }
  804. }
  805. case perfMapTypePythonThreadEvents:
  806. v := &pythonThreadEvent{}
  807. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  808. klog.Warningln("failed to read msg:", err)
  809. continue
  810. }
  811. event = Event{
  812. Type: v.Type,
  813. Pid: v.Pid,
  814. Duration: time.Duration(v.Duration),
  815. }
  816. case perfMapTypeEventQueue:
  817. v := &StackEvent{}
  818. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  819. klog.Warningln("failed to read msg:", err)
  820. continue
  821. }
  822. event = Event{
  823. Type: EventTypeFunEnt,
  824. StackEvent: v,
  825. }
  826. default:
  827. continue
  828. }
  829. ch <- event
  830. }
  831. }
  832. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  833. i, _ := netaddr.FromStdIP(ip[:])
  834. return netaddr.IPPortFrom(i, port)
  835. }
  836. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  837. fmt.Println("InitKProcInfo111111111111111111111111111111111")
  838. var err error
  839. var info EbpfProcInfo
  840. if appInfo.EBPFProcInfo == nil {
  841. info = EbpfProcInfo{
  842. InstanceId: appInfo.InstanceIdHash.HashtVal,
  843. AppId: appInfo.AppIdHash.HashtVal,
  844. CodeType: uint16(appInfo.CodeType),
  845. }
  846. } else {
  847. info = *appInfo.EBPFProcInfo
  848. info.AppId = appInfo.AppIdHash.HashtVal
  849. }
  850. // 构建 sysvc 值:{app_name_len}:app_name:{appServiceType_len}:appServiceType:{SysTagLen}[:SysTag]
  851. // appServiceType 固定为 "APPLICATION"
  852. appName := appInfo.AppName
  853. nodeInfo := NODE_INFO.GetNodeInfo()
  854. sysTag := ""
  855. if nodeInfo != nil {
  856. sysTag = nodeInfo.SysTag
  857. }
  858. appServiceType := "APPLICATION"
  859. appNameLen := len(appName)
  860. appServiceTypeLen := len(appServiceType)
  861. sysTagLen := len(sysTag)
  862. // 限制长度,避免超出数组大小
  863. if appNameLen > 32 {
  864. appNameLen = 32
  865. appName = appName[:32]
  866. }
  867. if sysTagLen > 32 {
  868. sysTagLen = 32
  869. sysTag = sysTag[:32]
  870. }
  871. // 格式:{app_name_len}:app_name:{appServiceType_len}:appServiceType[:{SysTagLen}:SysTag]
  872. // 如果 sysTag 为空,则为 {app_name_len}:app_name:{appServiceType_len}:appServiceType
  873. // 如果 sysTag 不为空,则为 {app_name_len}:app_name:{appServiceType_len}:appServiceType:{SysTagLen}:SysTag
  874. // 长度不足2位时前面补0,例如:08:service:12:APPLICATION 或 08:service:12:APPLICATION:03:tag
  875. var sysvcStr string
  876. if sysTagLen == 0 {
  877. sysvcStr = fmt.Sprintf("%02d:%s:%02d:%s", appNameLen, appName, appServiceTypeLen, appServiceType)
  878. } else {
  879. sysvcStr = fmt.Sprintf("%02d:%s:%02d:%s:%02d:%s", appNameLen, appName, appServiceTypeLen, appServiceType, sysTagLen, sysTag)
  880. }
  881. // 复制到字节数组,确保不超过 76 字节
  882. sysvcBytes := []byte(sysvcStr)
  883. if len(sysvcBytes) > 76 {
  884. sysvcBytes = sysvcBytes[:76]
  885. }
  886. copy(info.Sysvc[:], sysvcBytes)
  887. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  888. if err != nil {
  889. klog.Error("failed to update program info", err)
  890. }
  891. appInfo.EBPFProcInfo = &info
  892. return err
  893. }
  894. func (t *Tracer) DelKProcInfo(pid uint32) error {
  895. _, err := tracer.DelProcInfoFromMap(t.collection, pid)
  896. if err != nil {
  897. klog.WithField("pid", pid).Error("failed to delete proc info", err)
  898. }
  899. return err
  900. }
  901. // TODO check language
  902. func (t *Tracer) DisableL7Tracing() bool {
  903. return t.disableL7Tracing
  904. }
  905. func (t *Tracer) DisableE2ETracing() bool {
  906. return t.disableE2ETracing
  907. }
  908. func (t *Tracer) DisableStackTracing() bool {
  909. return t.disableStackTracing
  910. }