tracer.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "github.com/coroot/coroot-node-agent/utils"
  10. "github.com/coroot/coroot-node-agent/utils/try"
  11. "os"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/cilium/ebpf"
  16. "github.com/cilium/ebpf/link"
  17. "github.com/cilium/ebpf/perf"
  18. "github.com/coroot/coroot-node-agent/common"
  19. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  20. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  21. "github.com/coroot/coroot-node-agent/proc"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. klog "github.com/sirupsen/logrus"
  24. "golang.org/x/sys/unix"
  25. "inet.af/netaddr"
  26. )
  27. /*
  28. #define TASK_COMM_LEN 16
  29. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  30. #include <linux/types.h>
  31. struct __tuple_t {
  32. __u8 daddr[16];
  33. __u8 rcv_saddr[16];
  34. __u8 addr_len;
  35. __u8 l4_protocol;
  36. __u16 dport;
  37. __u16 num;
  38. };
  39. struct __socket_data {
  40. __u32 pid;
  41. __u32 tgid;
  42. __u64 coroutine_id;
  43. __u8 source;
  44. __u8 comm[TASK_COMM_LEN];
  45. __u64 socket_id;
  46. struct __tuple_t tuple;
  47. __u32 extra_data;
  48. __u32 extra_data_count;
  49. __u32 tcp_seq;
  50. __u64 thread_trace_id;
  51. __u64 timestamp;
  52. __u8 direction: 1;
  53. __u8 msg_type: 7;
  54. __u64 syscall_len;
  55. __u64 data_seq;
  56. __u16 data_type;
  57. __u16 data_len;
  58. char data[BURST_DATA_BUF_SIZE];
  59. } __attribute__((packed));
  60. struct __socket_data_buffer {
  61. __u32 events_num;
  62. __u32 len;
  63. char data[32760];
  64. };
  65. */
  66. import "C"
  67. type SocketData C.struct___socket_data
  68. type SocketDataBuffer C.struct___socket_data_buffer
  69. const MaxPayloadSize = 1024
  70. type EventType uint32
  71. type EventReason uint32
  72. const (
  73. EventTypeProcessStart EventType = 1
  74. EventTypeProcessExit EventType = 2
  75. EventTypeConnectionOpen EventType = 3
  76. EventTypeConnectionClose EventType = 4
  77. EventTypeConnectionError EventType = 5
  78. EventTypeListenOpen EventType = 6
  79. EventTypeListenClose EventType = 7
  80. EventTypeFileOpen EventType = 8
  81. EventTypeTCPRetransmit EventType = 9
  82. EventTypeL7Request EventType = 10
  83. EventTypeFunEnt EventType = 11
  84. EventTypeFunRet EventType = 12
  85. EventTypeAcceptOpen EventType = 13
  86. EventTypeAcceptClose EventType = 14
  87. EventTypeOffCpuTIme EventType = 15
  88. EventReasonNone EventReason = 0
  89. EventReasonOOMKill EventReason = 1
  90. )
  91. type TrafficStats struct {
  92. BytesSent uint64
  93. BytesReceived uint64
  94. }
  95. type Event struct {
  96. StackEvent *StackEvent
  97. Type EventType
  98. Reason EventReason
  99. Pid uint32
  100. SrcAddr netaddr.IPPort
  101. DstAddr netaddr.IPPort
  102. Fd uint64
  103. Timestamp uint64
  104. Duration time.Duration
  105. L7Request *l7.RequestData
  106. OffCPU *l7.OffCPUData
  107. TrafficStats *TrafficStats
  108. FirstReadTime uint64
  109. FirstWriteTime uint64
  110. NewReadTime uint64
  111. }
  112. type perfMapType uint8
  113. const (
  114. perfMapTypeProcEvents perfMapType = 1
  115. perfMapTypeTCPEvents perfMapType = 2
  116. perfMapTypeFileEvents perfMapType = 3
  117. perfMapTypeL7Events perfMapType = 4
  118. perfMapTypeSocketEvents perfMapType = 5
  119. perfMapTypeEventQueue perfMapType = 6
  120. perfMapTypePythonThreadEvents perfMapType = 7
  121. perfMapTypeOffCpuEvents perfMapType = 8
  122. )
  123. type Tracer struct {
  124. kernelVersion string
  125. disableL7Tracing bool
  126. disableE2ETracing bool
  127. disableStackTracing bool
  128. collection *ebpf.Collection
  129. collectionSpec *ebpf.CollectionSpec
  130. readers map[string]*perf.Reader
  131. links []link.Link
  132. uprobes map[string]*ebpf.Program
  133. Symbols []debugelf.Symbol
  134. Uprobes []tracer.Uprobe
  135. UprobesMap map[string]tracer.Uprobe
  136. }
  137. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  138. if disableL7Tracing {
  139. klog.Infoln("L7 tracing is disabled")
  140. }
  141. if disableE2ETracing {
  142. klog.Infoln("L7 tracing is disabled")
  143. }
  144. if disableStackTracing {
  145. klog.Infoln("L7 stack is disabled")
  146. }
  147. return &Tracer{
  148. kernelVersion: kernelVersion,
  149. disableL7Tracing: disableL7Tracing,
  150. disableE2ETracing: disableE2ETracing,
  151. disableStackTracing: disableStackTracing,
  152. readers: map[string]*perf.Reader{},
  153. uprobes: map[string]*ebpf.Program{},
  154. }
  155. }
  156. func (t *Tracer) Run(events chan<- Event) error {
  157. if err := t.ebpf(events); err != nil {
  158. return err
  159. }
  160. if err := t.init(events); err != nil {
  161. return err
  162. }
  163. return nil
  164. }
  165. func (t *Tracer) Close() {
  166. for k, p := range t.uprobes {
  167. err := p.Close()
  168. klog.WithError(err).Infof("[close] uprobes %s", k)
  169. }
  170. for _, l := range t.links {
  171. err := l.Close()
  172. klog.WithError(err).Infof("[close] links")
  173. }
  174. for k, r := range t.readers {
  175. err := r.Close()
  176. klog.WithError(err).Infof("[close] readers %s", k)
  177. }
  178. t.collection.Close()
  179. }
  180. func (t *Tracer) init(ch chan<- Event) error {
  181. pids, err := proc.ListPids()
  182. if err != nil {
  183. return fmt.Errorf("failed to list pids: %w", err)
  184. }
  185. for _, pid := range pids {
  186. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  187. }
  188. fds, sockets := readFds(pids)
  189. for _, fd := range fds {
  190. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  191. }
  192. listens := map[uint64]bool{}
  193. for _, s := range sockets {
  194. if s.Listen {
  195. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  196. }
  197. }
  198. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  199. timestamp := uint64(time.Now().UnixNano())
  200. for _, s := range sockets {
  201. typ := EventTypeConnectionOpen
  202. if s.Listen {
  203. typ = EventTypeListenOpen
  204. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  205. continue
  206. }
  207. ch <- Event{
  208. Type: typ,
  209. Pid: s.pid,
  210. Timestamp: timestamp,
  211. Fd: s.fd,
  212. SrcAddr: s.SAddr,
  213. DstAddr: s.DAddr,
  214. }
  215. if typ == EventTypeConnectionOpen {
  216. id := ConnectionId{FD: s.fd, PID: s.pid}
  217. sip := s.SAddr.IP()
  218. sipbytes := sip.As16()
  219. dip := s.DAddr.IP()
  220. dipbytes := dip.As16()
  221. conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()}
  222. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  223. klog.Warningln(err)
  224. }
  225. }
  226. }
  227. return nil
  228. }
  229. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  230. return t.collection.Maps["active_connections"].Iterate()
  231. }
  232. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  233. return t.collection.Maps["active_accepts"].Iterate()
  234. }
  235. type perfMap struct {
  236. name string
  237. perCPUBufferSizePages int
  238. typ perfMapType
  239. }
  240. func (t *Tracer) ebpf(ch chan<- Event) error {
  241. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  242. path, prg, err := EbpfCode(kv)
  243. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  244. if len(prg) == 0 || err != nil {
  245. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  246. }
  247. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  248. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  249. if debugFsErr != nil && traceFsErr != nil {
  250. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  251. }
  252. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  253. if err != nil {
  254. return fmt.Errorf("failed to load collection spec: %w", err)
  255. }
  256. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  257. tracer.PidFilter(collectionSpec)
  258. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  259. klog.Infof("[start] Look eBPF .maps")
  260. for _, spec := range collectionSpec.Maps {
  261. klog.Infoln(spec.Name)
  262. }
  263. klog.Infof("[end] Look eBPF .maps")
  264. tracer.MapInit(collectionSpec, opts)
  265. // TODO 多进程
  266. // tracer.SetConstants(collectionSpec)
  267. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  268. if err != nil {
  269. var verr *ebpf.VerifierError
  270. if errors.As(err, &verr) {
  271. klog.Errorf("----%+v", verr)
  272. }
  273. return fmt.Errorf("failed to load collection: %w", err)
  274. }
  275. tracer.Offset()
  276. t.collectionSpec = collectionSpec
  277. t.collection = c
  278. perfMaps := []perfMap{
  279. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  280. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  281. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  282. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  283. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  284. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  285. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  286. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  287. }
  288. tracer.MapInsert(c)
  289. if !t.DisableL7Tracing() {
  290. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  291. perfMaps = append(perfMaps, perfMap{name: "offcpu_events", typ: perfMapTypeOffCpuEvents, perCPUBufferSizePages: 32})
  292. }
  293. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  294. klog.Infof("[start] Look eBPF perf_maps")
  295. for _, pm := range perfMaps {
  296. klog.Infoln(pm.name)
  297. m, ok := t.collection.Maps[pm.name]
  298. if ok {
  299. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  300. if err != nil {
  301. t.Close()
  302. return fmt.Errorf("failed to create ebpf reader: %w", err)
  303. }
  304. t.readers[pm.name] = r
  305. // event监听
  306. //go runEventsReader(pm.name, r, ch, pm.typ)
  307. try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ)
  308. }
  309. }
  310. klog.Infof("[end] Look eBPF perf_maps")
  311. klog.Infof("[start] Look eBPF specPrograms")
  312. if err = t.LinkEbpfProg(); err != nil {
  313. return err
  314. }
  315. klog.Infof("[end] Look eBPF specPrograms")
  316. return nil
  317. }
  318. func (t *Tracer) LinkEbpfProg() error {
  319. klog.Infof("[start] Look eBPF specPrograms")
  320. var (
  321. l link.Link
  322. err error
  323. lastErr error
  324. )
  325. for _, programSpec := range t.collectionSpec.Programs {
  326. program := t.collection.Programs[programSpec.Name]
  327. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  328. if t.DisableL7Tracing() {
  329. switch programSpec.Name {
  330. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  331. continue
  332. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  333. continue
  334. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  335. continue
  336. }
  337. }
  338. switch programSpec.Type {
  339. case ebpf.TracePoint:
  340. if strings.Contains(programSpec.SectionName, "prog") {
  341. continue
  342. }
  343. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  344. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  345. case ebpf.Kprobe:
  346. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  347. t.uprobes[programSpec.Name] = program
  348. continue
  349. }
  350. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  351. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  352. t.links = append(t.links, l)
  353. continue
  354. }
  355. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  356. }
  357. if err != nil {
  358. lastErr = err
  359. t.Close()
  360. klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
  361. //return fmt.Errorf("failed to link program: %w", err)
  362. } else {
  363. t.links = append(t.links, l)
  364. }
  365. }
  366. klog.Infof("[end] Look eBPF specPrograms")
  367. if lastErr != nil {
  368. return fmt.Errorf("failed to link program: %w", lastErr)
  369. }
  370. return nil
  371. }
  372. func (t *Tracer) UnlinkEbpfProg() error {
  373. var (
  374. lastErr error
  375. err error
  376. )
  377. /* 此处不应该处理 t.uprobes 中 ebpf-program
  378. for pName, p := range t.uprobes {
  379. if err = p.Close(); err != nil {
  380. lastErr = err
  381. klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
  382. }
  383. }
  384. */
  385. for _, l := range t.links {
  386. if err = l.Close(); err != nil {
  387. lastErr = err
  388. klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
  389. }
  390. }
  391. return lastErr
  392. }
  393. func (t EventType) Int() int {
  394. return int(t)
  395. }
  396. func (t EventType) String() string {
  397. switch t {
  398. case EventTypeProcessStart:
  399. return "process-start"
  400. case EventTypeProcessExit:
  401. return "process-exit"
  402. case EventTypeConnectionOpen:
  403. return "connection-open"
  404. case EventTypeConnectionClose:
  405. return "connection-close"
  406. case EventTypeConnectionError:
  407. return "connection-error"
  408. case EventTypeListenOpen:
  409. return "listen-open"
  410. case EventTypeListenClose:
  411. return "listen-close"
  412. case EventTypeFileOpen:
  413. return "file-open"
  414. case EventTypeTCPRetransmit:
  415. return "tcp-retransmit"
  416. case EventTypeL7Request:
  417. return "l7-request"
  418. }
  419. return "unknown: " + strconv.Itoa(int(t))
  420. }
  421. func (t EventReason) String() string {
  422. switch t {
  423. case EventReasonNone:
  424. return "none"
  425. case EventReasonOOMKill:
  426. return "oom-kill"
  427. }
  428. return "unknown: " + strconv.Itoa(int(t))
  429. }
  430. type procEvent struct {
  431. Type EventType
  432. Pid uint32
  433. Reason uint32
  434. }
  435. type tcpEvent struct {
  436. Fd uint64
  437. Timestamp uint64
  438. Duration uint64
  439. FirstReadTime uint64
  440. FirstWriteTime uint64
  441. NewReadTime uint64
  442. Type EventType
  443. Pid uint32
  444. BytesSent uint64
  445. BytesReceived uint64
  446. SPort uint16
  447. DPort uint16
  448. SAddr [16]byte
  449. DAddr [16]byte
  450. }
  451. type fileEvent struct {
  452. Type EventType
  453. Pid uint32
  454. Fd uint64
  455. }
  456. // struct l7_event in l7.c
  457. type l7Event struct {
  458. Fd uint64
  459. ConnectionTimestamp uint64
  460. Pid uint32
  461. Status uint32
  462. Duration uint64
  463. Protocol uint8
  464. Method uint8
  465. Padding uint16
  466. StatementId uint32
  467. PayloadSize uint64
  468. TraceId uint64
  469. StartAt uint64 // ns
  470. EndtAt uint64 // ns
  471. TraceStart uint32
  472. TraceEnd uint32
  473. EventCount uint32
  474. Sport uint16
  475. Dport uint16
  476. SAddr [16]byte
  477. DAddr [16]byte
  478. ComponentSport uint16
  479. ComponentDport uint16
  480. ComponentSAddr [16]byte
  481. ComponentDAddr [16]byte
  482. AssumedAppId HashByte
  483. SpanId HashByte
  484. TraceIdFrom HashByte16
  485. CalledId HashByte
  486. InstanceIdFrom HashByte
  487. AppIdFrom HashByte
  488. SpanIdFrom HashByte
  489. }
  490. // struct offcpu_event in offcpu.c
  491. type OffCpuEvent struct{
  492. Tid uint32
  493. Pid uint32
  494. OffCpuTime uint64
  495. OffNetTime uint64
  496. OffEpollTime uint64
  497. OffFileTime uint64
  498. OffFutexTime uint64
  499. OffMemTime uint64
  500. OffMemmapTime uint64
  501. OffMmapTime uint64
  502. RunqTime uint64
  503. CPUOnTime uint64
  504. TraceID uint64
  505. Sport uint16
  506. SAddr [16]byte
  507. PayloadSize uint64
  508. // Payload []byte
  509. }
  510. type SocketDataBufferddd struct {
  511. EventsNum uint32
  512. Len uint32
  513. Data [32760]byte
  514. }
  515. const (
  516. TASK_COMM_LEN = 16
  517. BURST_DATA_BUF_SIZE = 8192
  518. )
  519. type Tuple struct {
  520. Daddr [16]uint8
  521. RcvSaddr [16]uint8
  522. AddrLen uint8
  523. L4Protocol uint8
  524. Dport uint16
  525. Num uint16
  526. }
  527. type SocketDatadddd struct {
  528. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  529. Tgid uint32 // 进程号
  530. CoroutineID uint64
  531. Source uint8
  532. Comm [TASK_COMM_LEN]byte
  533. SocketID uint64
  534. Tuple Tuple
  535. ExtraData uint32
  536. ExtraDataCount uint32
  537. TcpSeq uint32
  538. ThreadTraceID uint64
  539. Timestamp uint64
  540. Direction uint8
  541. MsgType uint8
  542. SyscallLen uint64
  543. DataSeq uint64
  544. DataType uint16
  545. DataLen uint16
  546. Data [BURST_DATA_BUF_SIZE]byte
  547. }
  548. type StackEvent struct {
  549. Type uint64
  550. Pid uint64
  551. TraceId uint64
  552. Goid uint64
  553. Ip uint64
  554. Bp uint64
  555. CallerIp uint64
  556. CallerBp uint64
  557. TimeNsStart uint64
  558. TimeNsEnd uint64
  559. // Nid uint64
  560. // Fpid uint64
  561. // Level uint64
  562. Location byte
  563. ClassName [100]byte
  564. MethedName [100]byte
  565. }
  566. type StackFunEvent struct {
  567. StackEvent StackEvent
  568. Uprobe *tracer.Uprobe
  569. }
  570. type pythonThreadEvent struct {
  571. Type EventType
  572. Pid uint32
  573. Duration uint64
  574. }
  575. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  576. for {
  577. rec, err := r.Read()
  578. if err != nil {
  579. if errors.Is(err, perf.ErrClosed) {
  580. break
  581. }
  582. continue
  583. }
  584. if rec.LostSamples > 0 {
  585. klog.Errorln(name, "lost samples:", rec.LostSamples)
  586. continue
  587. }
  588. var event Event
  589. switch typ {
  590. case perfMapTypeSocketEvents:
  591. //fmt.Println("perfMapTypeSocketEvents")
  592. // 假设 rec.RawSample 包含数据,类型为 []byte
  593. //rawData := rec.RawSample
  594. //fmt.Println("perfMapTypeSocketEvents2")
  595. //
  596. //// 创建一个 SocketDataBuffer 结构体实例
  597. //var buffer SocketDataBuffer
  598. //
  599. //// 创建一个字节缓冲区,并将数据填充到其中
  600. //reader := bytes.NewReader(rawData)
  601. //fmt.Println("perfMapTypeSocketEvents3")
  602. //fmt.Println(len(rawData))
  603. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  604. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  605. // fmt.Println(reader.Len())
  606. // fmt.Println("Failed to read data:", err)
  607. // continue
  608. //}
  609. //fmt.Println("perfMapTypeSocketEvents4")
  610. //
  611. //// 打印解析后的数据
  612. //fmt.Println("EventsNum:", buffer.EventsNum)
  613. //fmt.Println("Len:", buffer.Len)
  614. //
  615. //// 打印 char data 的内容
  616. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  617. //socketDataBuffer := rec.RawSample
  618. // 解析 __socket_data_buffer
  619. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  620. //
  621. //// 获取 char data[32760];
  622. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  623. //// todo
  624. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  625. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  626. //
  627. //// 解析C结构体中的data字段
  628. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  629. //// 打印或处理包含的数据
  630. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  631. /*todo */
  632. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  633. //dataPtr := unsafe.Pointer(&buf.data[0])
  634. //socketData := (*SocketData)(dataPtr)
  635. //reader2 := bytes.NewBuffer(rec.RawSample)
  636. // 222222
  637. //fmt.Println("socketData.Pid:", socketData.pid)
  638. //fmt.Println("socketData.Tgid:", socketData.tgid)
  639. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  640. //fmt.Println("socketData.Source:", socketData.source)
  641. //
  642. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  643. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  644. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  645. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  646. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  647. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  648. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  649. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  650. //fmt.Println("socketData.Direction:", socketData.Direction)
  651. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  652. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  653. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  654. //socketData := &SocketData{}
  655. //reader := bytes.NewBuffer(rec.RawSample)
  656. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  657. // klog.Warningln("failed1 to read msg:", err)
  658. // continue
  659. //}
  660. //
  661. //var data []byte
  662. //payload := reader.Bytes()
  663. //switch {
  664. //case v.Len == 0:
  665. //case v.Len > 32760:
  666. // data = payload[:32760]
  667. //default:
  668. // data = payload[:v.Len]
  669. //}
  670. //////data2 := data[:v.Len]
  671. ////fmt.Println("perfMapTypeSocketEvents")
  672. //fmt.Println(v.EventsNum)
  673. //fmt.Println(v.Len)
  674. //fmt.Println(string(data))
  675. //
  676. //var data2 SocketData
  677. //reader2 := bytes.NewBuffer(data)
  678. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  679. // klog.Warningln("failed2 to read msg:", err)
  680. // continue
  681. //}
  682. //
  683. //fmt.Println(data2.Pid)
  684. //fmt.Println(data2.Tgid)
  685. //fmt.Println(string(v.Data))
  686. //continue
  687. case perfMapTypeOffCpuEvents:
  688. //接收 offcpu数据
  689. v := &OffCpuEvent{}
  690. reader := bytes.NewBuffer(rec.RawSample)
  691. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  692. klog.Warningln("failed to read msg:", err)
  693. continue
  694. }
  695. klog.Infoln("read the OffCpuEvent data is : ", v)
  696. payload := reader.Bytes()
  697. offcpu := &l7.OffCPUData{
  698. Tid: v.Tid,
  699. Pid: v.Pid,
  700. OffCpuTime: v.OffCpuTime,
  701. OffNetTime: v.OffNetTime,
  702. OffEpollTime: v.OffEpollTime,
  703. OffFileTime: v.OffFileTime,
  704. OffFutexTime: v.OffFutexTime,
  705. OffMemTime: v.OffMemTime,
  706. OffMemmapTime: v.OffMemmapTime,
  707. OffMmapTime: v.OffMmapTime,
  708. RunqTime: v.RunqTime,
  709. CPUOnTime: v.CPUOnTime,
  710. TraceID: v.TraceID,
  711. SAddr: ipPort(v.SAddr, v.Sport),
  712. }
  713. offcpu.PayloadSize = v.PayloadSize
  714. switch {
  715. case v.PayloadSize == 0:
  716. case v.PayloadSize > MaxPayloadSize:
  717. offcpu.Payload = payload[:MaxPayloadSize]
  718. default:
  719. offcpu.Payload = payload[:v.PayloadSize]
  720. }
  721. // fmt.Println("==========")
  722. klog.Infoln("read the OffCpuEvent Payload size is :", v.PayloadSize)
  723. klog.Infoln("read the OffCpuEvent Payload:", string(offcpu.Payload))
  724. klog.Infoln("read the OffCpuEvent saddr is ", offcpu.SAddr.String())
  725. // _, path := l7.ParseHttpOffCPU(offcpu.Payload)
  726. // klog.Infoln("read the OffCpuEvent url is ", path)
  727. // fmt.Println("==========")
  728. event = Event{Type: EventTypeOffCpuTIme, Pid: v.Pid, OffCPU: offcpu}
  729. case perfMapTypeL7Events:
  730. v := &l7Event{}
  731. reader := bytes.NewBuffer(rec.RawSample)
  732. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  733. klog.Warningln("failed to read msg:", err)
  734. continue
  735. }
  736. //fmt.Println("v.TraceIdFrom")
  737. //fmt.Println(v.TraceIdFrom)
  738. //a := hex.EncodeToString(v.TraceIdFrom[:])
  739. //for _, b := range v.AssumedAppId {
  740. // fmt.Printf("v.AssumedAppId- %02\n", b)
  741. //}
  742. //fmt.Println(a)
  743. payload := reader.Bytes()
  744. req := &l7.RequestData{
  745. Protocol: l7.Protocol(v.Protocol),
  746. Status: l7.Status(v.Status),
  747. Duration: time.Duration(v.Duration),
  748. Method: l7.Method(v.Method),
  749. StatementId: v.StatementId,
  750. TraceId: v.TraceId,
  751. TraceStart: v.TraceStart,
  752. TraceEnd: v.TraceEnd,
  753. EventCount: v.EventCount,
  754. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  755. SpanId: hex.EncodeToString(v.SpanId[:]),
  756. StartAt: v.StartAt,
  757. EndAt: v.EndtAt,
  758. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  759. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  760. }
  761. if req.Protocol == l7.ProtocolHTTP {
  762. klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  763. klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  764. }
  765. if v.TraceEnd == 1 {
  766. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  767. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  768. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  769. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  770. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  771. req.SAddr = ipPort(v.SAddr, v.Sport)
  772. req.DAddr = ipPort(v.DAddr, v.Dport)
  773. klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
  774. klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
  775. }
  776. switch {
  777. case v.PayloadSize == 0:
  778. case v.PayloadSize > MaxPayloadSize:
  779. req.Payload = payload[:MaxPayloadSize]
  780. default:
  781. req.Payload = payload[:v.PayloadSize]
  782. }
  783. //fmt.Println("==========")
  784. //fmt.Println("req.Payload:", string(req.Payload))
  785. //fmt.Println("==========")
  786. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  787. case perfMapTypeFileEvents:
  788. v := &fileEvent{}
  789. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  790. klog.Warningln("failed to read msg:", err)
  791. continue
  792. }
  793. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  794. case perfMapTypeProcEvents:
  795. v := &procEvent{}
  796. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  797. klog.Warningln("failed to read msg:", err)
  798. continue
  799. }
  800. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  801. case perfMapTypeTCPEvents:
  802. v := &tcpEvent{}
  803. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  804. klog.Warningln("failed to read msg:", err)
  805. continue
  806. }
  807. event = Event{
  808. Type: v.Type,
  809. Pid: v.Pid,
  810. SrcAddr: ipPort(v.SAddr, v.SPort),
  811. DstAddr: ipPort(v.DAddr, v.DPort),
  812. Fd: v.Fd,
  813. Timestamp: v.Timestamp,
  814. Duration: time.Duration(v.Duration),
  815. }
  816. if v.Type == EventTypeConnectionClose {
  817. event.TrafficStats = &TrafficStats{
  818. BytesSent: v.BytesSent,
  819. BytesReceived: v.BytesReceived,
  820. }
  821. }
  822. event.FirstReadTime = v.FirstReadTime
  823. event.FirstWriteTime = v.FirstWriteTime
  824. event.NewReadTime = v.NewReadTime
  825. if v.Type == EventTypeAcceptClose {
  826. event.TrafficStats = &TrafficStats{
  827. BytesSent: v.BytesSent,
  828. BytesReceived: v.BytesReceived,
  829. }
  830. }
  831. case perfMapTypePythonThreadEvents:
  832. v := &pythonThreadEvent{}
  833. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  834. klog.Warningln("failed to read msg:", err)
  835. continue
  836. }
  837. event = Event{
  838. Type: v.Type,
  839. Pid: v.Pid,
  840. Duration: time.Duration(v.Duration),
  841. }
  842. case perfMapTypeEventQueue:
  843. v := &StackEvent{}
  844. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  845. klog.Warningln("failed to read msg:", err)
  846. continue
  847. }
  848. event = Event{
  849. Type: EventTypeFunEnt,
  850. StackEvent: v,
  851. }
  852. default:
  853. continue
  854. }
  855. ch <- event
  856. }
  857. }
  858. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  859. i, _ := netaddr.FromStdIP(ip[:])
  860. return netaddr.IPPortFrom(i, port)
  861. }
  862. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  863. var err error
  864. var info EbpfProcInfo
  865. if appInfo.EBPFProcInfo == nil {
  866. info = EbpfProcInfo{
  867. InstanceId: appInfo.InstanceIdHash.HashtVal,
  868. AppId: appInfo.AppIdHash.HashtVal,
  869. CodeType: uint16(appInfo.CodeType),
  870. }
  871. } else {
  872. info = *appInfo.EBPFProcInfo
  873. info.AppId = appInfo.AppIdHash.HashtVal
  874. }
  875. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  876. if err != nil {
  877. klog.Error("failed to update program info", err)
  878. }
  879. appInfo.EBPFProcInfo = &info
  880. return err
  881. }
  882. func (t *Tracer) DelKProcInfo(pid uint32) error {
  883. _, err := tracer.DelProcInfoFromMap(t.collection, pid)
  884. if err != nil {
  885. klog.Error("failed to delete proc info", err)
  886. }
  887. return err
  888. }
  889. // TODO check language
  890. func (t *Tracer) DisableL7Tracing() bool {
  891. return t.disableL7Tracing
  892. }
  893. func (t *Tracer) DisableE2ETracing() bool {
  894. return t.disableE2ETracing
  895. }
  896. func (t *Tracer) DisableStackTracing() bool {
  897. return t.disableStackTracing
  898. }