tracer.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "github.com/coroot/coroot-node-agent/utils"
  10. "github.com/coroot/coroot-node-agent/utils/try"
  11. "os"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/cilium/ebpf"
  16. "github.com/cilium/ebpf/link"
  17. "github.com/cilium/ebpf/perf"
  18. "github.com/coroot/coroot-node-agent/common"
  19. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  20. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  21. "github.com/coroot/coroot-node-agent/proc"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. klog "github.com/sirupsen/logrus"
  24. "golang.org/x/sys/unix"
  25. "inet.af/netaddr"
  26. )
  27. /*
  28. #define TASK_COMM_LEN 16
  29. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  30. #include <linux/types.h>
  31. struct __tuple_t {
  32. __u8 daddr[16];
  33. __u8 rcv_saddr[16];
  34. __u8 addr_len;
  35. __u8 l4_protocol;
  36. __u16 dport;
  37. __u16 num;
  38. };
  39. struct __socket_data {
  40. __u32 pid;
  41. __u32 tgid;
  42. __u64 coroutine_id;
  43. __u8 source;
  44. __u8 comm[TASK_COMM_LEN];
  45. __u64 socket_id;
  46. struct __tuple_t tuple;
  47. __u32 extra_data;
  48. __u32 extra_data_count;
  49. __u32 tcp_seq;
  50. __u64 thread_trace_id;
  51. __u64 timestamp;
  52. __u8 direction: 1;
  53. __u8 msg_type: 7;
  54. __u64 syscall_len;
  55. __u64 data_seq;
  56. __u16 data_type;
  57. __u16 data_len;
  58. char data[BURST_DATA_BUF_SIZE];
  59. } __attribute__((packed));
  60. struct __socket_data_buffer {
  61. __u32 events_num;
  62. __u32 len;
  63. char data[32760];
  64. };
  65. */
  66. import "C"
  67. type SocketData C.struct___socket_data
  68. type SocketDataBuffer C.struct___socket_data_buffer
  69. const MaxPayloadSize = 1024
  70. type EventType uint32
  71. type EventReason uint32
  72. const (
  73. EventTypeProcessStart EventType = 1
  74. EventTypeProcessExit EventType = 2
  75. EventTypeConnectionOpen EventType = 3
  76. EventTypeConnectionClose EventType = 4
  77. EventTypeConnectionError EventType = 5
  78. EventTypeListenOpen EventType = 6
  79. EventTypeListenClose EventType = 7
  80. EventTypeFileOpen EventType = 8
  81. EventTypeTCPRetransmit EventType = 9
  82. EventTypeL7Request EventType = 10
  83. EventTypeFunEnt EventType = 11
  84. EventTypeFunRet EventType = 12
  85. EventTypeAcceptOpen EventType = 13
  86. EventTypeAcceptClose EventType = 14
  87. EventReasonNone EventReason = 0
  88. EventReasonOOMKill EventReason = 1
  89. )
  90. type TrafficStats struct {
  91. BytesSent uint64
  92. BytesReceived uint64
  93. }
  94. type Event struct {
  95. StackEvent *StackEvent
  96. Type EventType
  97. Reason EventReason
  98. Pid uint32
  99. SrcAddr netaddr.IPPort
  100. DstAddr netaddr.IPPort
  101. Fd uint64
  102. Timestamp uint64
  103. Duration time.Duration
  104. L7Request *l7.RequestData
  105. TrafficStats *TrafficStats
  106. FirstReadTime uint64
  107. FirstWriteTime uint64
  108. NewReadTime uint64
  109. }
  110. type perfMapType uint8
  111. const (
  112. perfMapTypeProcEvents perfMapType = 1
  113. perfMapTypeTCPEvents perfMapType = 2
  114. perfMapTypeFileEvents perfMapType = 3
  115. perfMapTypeL7Events perfMapType = 4
  116. perfMapTypeSocketEvents perfMapType = 5
  117. perfMapTypeEventQueue perfMapType = 6
  118. perfMapTypePythonThreadEvents perfMapType = 7
  119. )
  120. type Tracer struct {
  121. kernelVersion string
  122. disableL7Tracing bool
  123. disableE2ETracing bool
  124. disableStackTracing bool
  125. collection *ebpf.Collection
  126. collectionSpec *ebpf.CollectionSpec
  127. readers map[string]*perf.Reader
  128. links []link.Link
  129. uprobes map[string]*ebpf.Program
  130. Symbols []debugelf.Symbol
  131. Uprobes []tracer.Uprobe
  132. UprobesMap map[string]tracer.Uprobe
  133. }
  134. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  135. if disableL7Tracing {
  136. klog.Infoln("L7 tracing is disabled")
  137. } else {
  138. klog.Infoln("L7 tracing is enabled")
  139. }
  140. if disableE2ETracing {
  141. klog.Infoln("e2e is disabled")
  142. } else {
  143. klog.Infoln("e2e is enabled")
  144. }
  145. if disableStackTracing {
  146. klog.Infoln("L7 stack is disabled")
  147. } else {
  148. klog.Infoln("L7 stack is enabled")
  149. }
  150. return &Tracer{
  151. kernelVersion: kernelVersion,
  152. disableL7Tracing: disableL7Tracing,
  153. disableE2ETracing: disableE2ETracing,
  154. disableStackTracing: disableStackTracing,
  155. readers: map[string]*perf.Reader{},
  156. uprobes: map[string]*ebpf.Program{},
  157. }
  158. }
  159. func (t *Tracer) Run(events chan<- Event) error {
  160. if err := t.ebpf(events); err != nil {
  161. return err
  162. }
  163. if err := t.init(events); err != nil {
  164. return err
  165. }
  166. return nil
  167. }
  168. func (t *Tracer) Close() {
  169. for k, p := range t.uprobes {
  170. err := p.Close()
  171. klog.WithError(err).Infof("[close] uprobes %s", k)
  172. }
  173. for _, l := range t.links {
  174. err := l.Close()
  175. klog.WithError(err).Infof("[close] links")
  176. }
  177. for k, r := range t.readers {
  178. err := r.Close()
  179. klog.WithError(err).Infof("[close] readers %s", k)
  180. }
  181. t.collection.Close()
  182. }
  183. func (t *Tracer) init(ch chan<- Event) error {
  184. pids, err := proc.ListPids()
  185. if err != nil {
  186. return fmt.Errorf("failed to list pids: %w", err)
  187. }
  188. for _, pid := range pids {
  189. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  190. }
  191. fds, sockets := readFds(pids)
  192. for _, fd := range fds {
  193. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  194. }
  195. listens := map[uint64]bool{}
  196. for _, s := range sockets {
  197. if s.Listen {
  198. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  199. }
  200. }
  201. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  202. timestamp := uint64(time.Now().UnixNano())
  203. for _, s := range sockets {
  204. typ := EventTypeConnectionOpen
  205. if s.Listen {
  206. typ = EventTypeListenOpen
  207. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  208. continue
  209. }
  210. ch <- Event{
  211. Type: typ,
  212. Pid: s.pid,
  213. Timestamp: timestamp,
  214. Fd: s.fd,
  215. SrcAddr: s.SAddr,
  216. DstAddr: s.DAddr,
  217. }
  218. if typ == EventTypeConnectionOpen {
  219. id := ConnectionId{FD: s.fd, PID: s.pid}
  220. sip := s.SAddr.IP()
  221. sipbytes := sip.As16()
  222. dip := s.DAddr.IP()
  223. dipbytes := dip.As16()
  224. conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()}
  225. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  226. klog.Warningln(err)
  227. }
  228. }
  229. }
  230. return nil
  231. }
  232. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  233. return t.collection.Maps["active_connections"].Iterate()
  234. }
  235. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  236. return t.collection.Maps["active_accepts"].Iterate()
  237. }
  238. type perfMap struct {
  239. name string
  240. perCPUBufferSizePages int
  241. typ perfMapType
  242. }
  243. func (t *Tracer) ebpf(ch chan<- Event) error {
  244. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  245. path, prg, err := EbpfCode(kv)
  246. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  247. if len(prg) == 0 || err != nil {
  248. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  249. }
  250. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  251. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  252. if debugFsErr != nil && traceFsErr != nil {
  253. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  254. }
  255. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  256. if err != nil {
  257. return fmt.Errorf("failed to load collection spec: %w", err)
  258. }
  259. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  260. tracer.PidFilter(collectionSpec)
  261. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  262. klog.Infof("[start] Look eBPF .maps")
  263. for _, spec := range collectionSpec.Maps {
  264. klog.Infoln(spec.Name)
  265. }
  266. klog.Infof("[end] Look eBPF .maps")
  267. tracer.MapInit(collectionSpec, opts)
  268. // TODO 多进程
  269. // tracer.SetConstants(collectionSpec)
  270. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  271. if err != nil {
  272. var verr *ebpf.VerifierError
  273. if errors.As(err, &verr) {
  274. klog.Errorf("----%+v", verr)
  275. }
  276. return fmt.Errorf("failed to load collection: %w", err)
  277. }
  278. tracer.Offset()
  279. t.collectionSpec = collectionSpec
  280. t.collection = c
  281. perfMaps := []perfMap{
  282. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  283. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  284. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  285. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  286. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  287. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  288. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  289. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  290. }
  291. tracer.MapInsert(c)
  292. if !t.DisableL7Tracing() {
  293. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  294. }
  295. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  296. klog.Infof("[start] Look eBPF perf_maps")
  297. for _, pm := range perfMaps {
  298. klog.Infoln(pm.name)
  299. m, ok := t.collection.Maps[pm.name]
  300. if ok {
  301. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  302. if err != nil {
  303. t.Close()
  304. return fmt.Errorf("failed to create ebpf reader: %w", err)
  305. }
  306. t.readers[pm.name] = r
  307. // event监听
  308. //go runEventsReader(pm.name, r, ch, pm.typ)
  309. try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ)
  310. }
  311. }
  312. klog.Infof("[end] Look eBPF perf_maps")
  313. klog.Infof("[start] Look eBPF specPrograms")
  314. if err = t.LinkEbpfProg(); err != nil {
  315. return err
  316. }
  317. klog.Infof("[end] Look eBPF specPrograms")
  318. return nil
  319. }
  320. func (t *Tracer) LinkEbpfProg() error {
  321. klog.Infof("[start] Look eBPF specPrograms")
  322. var (
  323. l link.Link
  324. err error
  325. lastErr error
  326. )
  327. for _, programSpec := range t.collectionSpec.Programs {
  328. program := t.collection.Programs[programSpec.Name]
  329. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  330. if t.DisableL7Tracing() {
  331. switch programSpec.Name {
  332. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  333. continue
  334. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  335. continue
  336. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  337. continue
  338. }
  339. }
  340. switch programSpec.Type {
  341. case ebpf.TracePoint:
  342. if strings.Contains(programSpec.SectionName, "prog") {
  343. continue
  344. }
  345. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  346. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  347. case ebpf.Kprobe:
  348. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  349. t.uprobes[programSpec.Name] = program
  350. continue
  351. }
  352. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  353. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  354. t.links = append(t.links, l)
  355. continue
  356. }
  357. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  358. }
  359. if err != nil {
  360. lastErr = err
  361. t.Close()
  362. klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
  363. //return fmt.Errorf("failed to link program: %w", err)
  364. } else {
  365. t.links = append(t.links, l)
  366. }
  367. }
  368. klog.Infof("[end] Look eBPF specPrograms")
  369. if lastErr != nil {
  370. return fmt.Errorf("failed to link program: %w", lastErr)
  371. }
  372. return nil
  373. }
  374. func (t *Tracer) UnlinkEbpfProg() error {
  375. var (
  376. lastErr error
  377. err error
  378. )
  379. /* 此处不应该处理 t.uprobes 中 ebpf-program
  380. for pName, p := range t.uprobes {
  381. if err = p.Close(); err != nil {
  382. lastErr = err
  383. klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
  384. }
  385. }
  386. */
  387. for _, l := range t.links {
  388. if err = l.Close(); err != nil {
  389. lastErr = err
  390. klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
  391. }
  392. }
  393. return lastErr
  394. }
  395. func (t EventType) Int() int {
  396. return int(t)
  397. }
  398. func (t EventType) String() string {
  399. switch t {
  400. case EventTypeProcessStart:
  401. return "process-start"
  402. case EventTypeProcessExit:
  403. return "process-exit"
  404. case EventTypeConnectionOpen:
  405. return "connection-open"
  406. case EventTypeConnectionClose:
  407. return "connection-close"
  408. case EventTypeConnectionError:
  409. return "connection-error"
  410. case EventTypeListenOpen:
  411. return "listen-open"
  412. case EventTypeListenClose:
  413. return "listen-close"
  414. case EventTypeFileOpen:
  415. return "file-open"
  416. case EventTypeTCPRetransmit:
  417. return "tcp-retransmit"
  418. case EventTypeL7Request:
  419. return "l7-request"
  420. }
  421. return "unknown: " + strconv.Itoa(int(t))
  422. }
  423. func (t EventReason) String() string {
  424. switch t {
  425. case EventReasonNone:
  426. return "none"
  427. case EventReasonOOMKill:
  428. return "oom-kill"
  429. }
  430. return "unknown: " + strconv.Itoa(int(t))
  431. }
  432. type procEvent struct {
  433. Type EventType
  434. Pid uint32
  435. Reason uint32
  436. }
  437. type tcpEvent struct {
  438. Fd uint64
  439. Timestamp uint64
  440. Duration uint64
  441. FirstReadTime uint64
  442. FirstWriteTime uint64
  443. NewReadTime uint64
  444. Type EventType
  445. Pid uint32
  446. BytesSent uint64
  447. BytesReceived uint64
  448. SPort uint16
  449. DPort uint16
  450. SAddr [16]byte
  451. DAddr [16]byte
  452. }
  453. type fileEvent struct {
  454. Type EventType
  455. Pid uint32
  456. Fd uint64
  457. }
  458. // struct l7_event in l7.c
  459. type l7Event struct {
  460. Fd uint64
  461. ConnectionTimestamp uint64
  462. Pid uint32
  463. Status uint32
  464. Duration uint64
  465. Protocol uint8
  466. Method uint8
  467. Padding uint16
  468. StatementId uint32
  469. PayloadSize uint64
  470. TraceId uint64
  471. StartAt uint64 // ns
  472. EndtAt uint64 // ns
  473. TraceStart uint32
  474. TraceEnd uint32
  475. EventCount uint32
  476. Sport uint16
  477. Dport uint16
  478. SAddr [16]byte
  479. DAddr [16]byte
  480. ComponentSport uint16
  481. ComponentDport uint16
  482. ComponentSAddr [16]byte
  483. ComponentDAddr [16]byte
  484. AssumedAppId HashByte
  485. SpanId HashByte
  486. TraceIdFrom HashByte16
  487. CalledId HashByte
  488. InstanceIdFrom HashByte
  489. AppIdFrom HashByte
  490. SpanIdFrom HashByte
  491. }
  492. type SocketDataBufferddd struct {
  493. EventsNum uint32
  494. Len uint32
  495. Data [32760]byte
  496. }
  497. const (
  498. TASK_COMM_LEN = 16
  499. BURST_DATA_BUF_SIZE = 8192
  500. )
  501. type Tuple struct {
  502. Daddr [16]uint8
  503. RcvSaddr [16]uint8
  504. AddrLen uint8
  505. L4Protocol uint8
  506. Dport uint16
  507. Num uint16
  508. }
  509. type SocketDatadddd struct {
  510. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  511. Tgid uint32 // 进程号
  512. CoroutineID uint64
  513. Source uint8
  514. Comm [TASK_COMM_LEN]byte
  515. SocketID uint64
  516. Tuple Tuple
  517. ExtraData uint32
  518. ExtraDataCount uint32
  519. TcpSeq uint32
  520. ThreadTraceID uint64
  521. Timestamp uint64
  522. Direction uint8
  523. MsgType uint8
  524. SyscallLen uint64
  525. DataSeq uint64
  526. DataType uint16
  527. DataLen uint16
  528. Data [BURST_DATA_BUF_SIZE]byte
  529. }
  530. type StackEvent struct {
  531. Type uint64
  532. Pid uint64
  533. TraceId uint64
  534. Goid uint64
  535. Ip uint64
  536. Bp uint64
  537. CallerIp uint64
  538. CallerBp uint64
  539. TimeNsStart uint64
  540. TimeNsEnd uint64
  541. // Nid uint64
  542. // Fpid uint64
  543. // Level uint64
  544. Location byte
  545. ClassName [100]byte
  546. MethedName [100]byte
  547. }
  548. type StackFunEvent struct {
  549. StackEvent StackEvent
  550. Uprobe *tracer.Uprobe
  551. }
  552. type pythonThreadEvent struct {
  553. Type EventType
  554. Pid uint32
  555. Duration uint64
  556. }
  557. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  558. for {
  559. rec, err := r.Read()
  560. if err != nil {
  561. if errors.Is(err, perf.ErrClosed) {
  562. break
  563. }
  564. continue
  565. }
  566. if rec.LostSamples > 0 {
  567. klog.Errorln(name, "lost samples:", rec.LostSamples)
  568. continue
  569. }
  570. var event Event
  571. switch typ {
  572. case perfMapTypeSocketEvents:
  573. //fmt.Println("perfMapTypeSocketEvents")
  574. // 假设 rec.RawSample 包含数据,类型为 []byte
  575. //rawData := rec.RawSample
  576. //fmt.Println("perfMapTypeSocketEvents2")
  577. //
  578. //// 创建一个 SocketDataBuffer 结构体实例
  579. //var buffer SocketDataBuffer
  580. //
  581. //// 创建一个字节缓冲区,并将数据填充到其中
  582. //reader := bytes.NewReader(rawData)
  583. //fmt.Println("perfMapTypeSocketEvents3")
  584. //fmt.Println(len(rawData))
  585. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  586. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  587. // fmt.Println(reader.Len())
  588. // fmt.Println("Failed to read data:", err)
  589. // continue
  590. //}
  591. //fmt.Println("perfMapTypeSocketEvents4")
  592. //
  593. //// 打印解析后的数据
  594. //fmt.Println("EventsNum:", buffer.EventsNum)
  595. //fmt.Println("Len:", buffer.Len)
  596. //
  597. //// 打印 char data 的内容
  598. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  599. //socketDataBuffer := rec.RawSample
  600. // 解析 __socket_data_buffer
  601. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  602. //
  603. //// 获取 char data[32760];
  604. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  605. //// todo
  606. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  607. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  608. //
  609. //// 解析C结构体中的data字段
  610. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  611. //// 打印或处理包含的数据
  612. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  613. /*todo */
  614. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  615. //dataPtr := unsafe.Pointer(&buf.data[0])
  616. //socketData := (*SocketData)(dataPtr)
  617. //reader2 := bytes.NewBuffer(rec.RawSample)
  618. // 222222
  619. //fmt.Println("socketData.Pid:", socketData.pid)
  620. //fmt.Println("socketData.Tgid:", socketData.tgid)
  621. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  622. //fmt.Println("socketData.Source:", socketData.source)
  623. //
  624. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  625. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  626. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  627. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  628. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  629. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  630. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  631. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  632. //fmt.Println("socketData.Direction:", socketData.Direction)
  633. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  634. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  635. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  636. //socketData := &SocketData{}
  637. //reader := bytes.NewBuffer(rec.RawSample)
  638. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  639. // klog.Warningln("failed1 to read msg:", err)
  640. // continue
  641. //}
  642. //
  643. //var data []byte
  644. //payload := reader.Bytes()
  645. //switch {
  646. //case v.Len == 0:
  647. //case v.Len > 32760:
  648. // data = payload[:32760]
  649. //default:
  650. // data = payload[:v.Len]
  651. //}
  652. //////data2 := data[:v.Len]
  653. ////fmt.Println("perfMapTypeSocketEvents")
  654. //fmt.Println(v.EventsNum)
  655. //fmt.Println(v.Len)
  656. //fmt.Println(string(data))
  657. //
  658. //var data2 SocketData
  659. //reader2 := bytes.NewBuffer(data)
  660. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  661. // klog.Warningln("failed2 to read msg:", err)
  662. // continue
  663. //}
  664. //
  665. //fmt.Println(data2.Pid)
  666. //fmt.Println(data2.Tgid)
  667. //fmt.Println(string(v.Data))
  668. //continue
  669. case perfMapTypeL7Events:
  670. v := &l7Event{}
  671. reader := bytes.NewBuffer(rec.RawSample)
  672. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  673. klog.Warningln("failed to read msg:", err)
  674. continue
  675. }
  676. //fmt.Println("v.TraceIdFrom")
  677. //fmt.Println(v.TraceIdFrom)
  678. //a := hex.EncodeToString(v.TraceIdFrom[:])
  679. //for _, b := range v.AssumedAppId {
  680. // fmt.Printf("v.AssumedAppId- %02\n", b)
  681. //}
  682. //fmt.Println(a)
  683. payload := reader.Bytes()
  684. req := &l7.RequestData{
  685. Protocol: l7.Protocol(v.Protocol),
  686. Status: l7.Status(v.Status),
  687. Duration: time.Duration(v.Duration),
  688. Method: l7.Method(v.Method),
  689. StatementId: v.StatementId,
  690. TraceId: v.TraceId,
  691. TraceStart: v.TraceStart,
  692. TraceEnd: v.TraceEnd,
  693. EventCount: v.EventCount,
  694. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  695. SpanId: hex.EncodeToString(v.SpanId[:]),
  696. StartAt: v.StartAt,
  697. EndAt: v.EndtAt,
  698. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  699. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  700. }
  701. if req.Protocol == l7.ProtocolHTTP {
  702. klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  703. klog.Debugf("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  704. }
  705. if v.TraceEnd == 1 {
  706. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  707. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  708. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  709. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  710. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  711. req.SAddr = ipPort(v.SAddr, v.Sport)
  712. req.DAddr = ipPort(v.DAddr, v.Dport)
  713. klog.Debugf("runEventsReader SAddr.String %s", req.SAddr.String())
  714. klog.Debugf("runEventsReader DAddr.String %s", req.DAddr.String())
  715. }
  716. switch {
  717. case v.PayloadSize == 0:
  718. case v.PayloadSize > MaxPayloadSize:
  719. req.Payload = payload[:MaxPayloadSize]
  720. default:
  721. req.Payload = payload[:v.PayloadSize]
  722. }
  723. //fmt.Println("==========")
  724. //fmt.Println("req.Payload:", string(req.Payload))
  725. //fmt.Println("==========")
  726. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  727. case perfMapTypeFileEvents:
  728. v := &fileEvent{}
  729. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  730. klog.Warningln("failed to read msg:", err)
  731. continue
  732. }
  733. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  734. case perfMapTypeProcEvents:
  735. v := &procEvent{}
  736. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  737. klog.Warningln("failed to read msg:", err)
  738. continue
  739. }
  740. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  741. case perfMapTypeTCPEvents:
  742. v := &tcpEvent{}
  743. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  744. klog.Warningln("failed to read msg:", err)
  745. continue
  746. }
  747. event = Event{
  748. Type: v.Type,
  749. Pid: v.Pid,
  750. SrcAddr: ipPort(v.SAddr, v.SPort),
  751. DstAddr: ipPort(v.DAddr, v.DPort),
  752. Fd: v.Fd,
  753. Timestamp: v.Timestamp,
  754. Duration: time.Duration(v.Duration),
  755. }
  756. if v.Type == EventTypeConnectionClose {
  757. event.TrafficStats = &TrafficStats{
  758. BytesSent: v.BytesSent,
  759. BytesReceived: v.BytesReceived,
  760. }
  761. }
  762. event.FirstReadTime = v.FirstReadTime
  763. event.FirstWriteTime = v.FirstWriteTime
  764. event.NewReadTime = v.NewReadTime
  765. if v.Type == EventTypeAcceptClose {
  766. event.TrafficStats = &TrafficStats{
  767. BytesSent: v.BytesSent,
  768. BytesReceived: v.BytesReceived,
  769. }
  770. }
  771. case perfMapTypePythonThreadEvents:
  772. v := &pythonThreadEvent{}
  773. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  774. klog.Warningln("failed to read msg:", err)
  775. continue
  776. }
  777. event = Event{
  778. Type: v.Type,
  779. Pid: v.Pid,
  780. Duration: time.Duration(v.Duration),
  781. }
  782. case perfMapTypeEventQueue:
  783. v := &StackEvent{}
  784. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  785. klog.Warningln("failed to read msg:", err)
  786. continue
  787. }
  788. event = Event{
  789. Type: EventTypeFunEnt,
  790. StackEvent: v,
  791. }
  792. default:
  793. continue
  794. }
  795. ch <- event
  796. }
  797. }
  798. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  799. i, _ := netaddr.FromStdIP(ip[:])
  800. return netaddr.IPPortFrom(i, port)
  801. }
  802. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  803. var err error
  804. var info EbpfProcInfo
  805. if appInfo.EBPFProcInfo == nil {
  806. info = EbpfProcInfo{
  807. InstanceId: appInfo.InstanceIdHash.HashtVal,
  808. AppId: appInfo.AppIdHash.HashtVal,
  809. CodeType: uint16(appInfo.CodeType),
  810. }
  811. } else {
  812. info = *appInfo.EBPFProcInfo
  813. info.AppId = appInfo.AppIdHash.HashtVal
  814. }
  815. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  816. if err != nil {
  817. klog.Error("failed to update program info", err)
  818. }
  819. appInfo.EBPFProcInfo = &info
  820. return err
  821. }
  822. func (t *Tracer) DelKProcInfo(pid uint32) error {
  823. _, err := tracer.DelProcInfoFromMap(t.collection, pid)
  824. if err != nil {
  825. klog.Error("failed to delete proc info", err)
  826. }
  827. return err
  828. }
  829. // TODO check language
  830. func (t *Tracer) DisableL7Tracing() bool {
  831. return t.disableL7Tracing
  832. }
  833. func (t *Tracer) DisableE2ETracing() bool {
  834. return t.disableE2ETracing
  835. }
  836. func (t *Tracer) DisableStackTracing() bool {
  837. return t.disableStackTracing
  838. }