tracer.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "github.com/coroot/coroot-node-agent/utils"
  10. "github.com/coroot/coroot-node-agent/utils/try"
  11. "os"
  12. "strconv"
  13. "strings"
  14. "time"
  15. "github.com/cilium/ebpf"
  16. "github.com/cilium/ebpf/link"
  17. "github.com/cilium/ebpf/perf"
  18. "github.com/coroot/coroot-node-agent/common"
  19. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  20. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  21. "github.com/coroot/coroot-node-agent/proc"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. klog "github.com/sirupsen/logrus"
  24. "golang.org/x/sys/unix"
  25. "inet.af/netaddr"
  26. )
  27. /*
  28. #define TASK_COMM_LEN 16
  29. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  30. #include <linux/types.h>
  31. struct __tuple_t {
  32. __u8 daddr[16];
  33. __u8 rcv_saddr[16];
  34. __u8 addr_len;
  35. __u8 l4_protocol;
  36. __u16 dport;
  37. __u16 num;
  38. };
  39. struct __socket_data {
  40. __u32 pid;
  41. __u32 tgid;
  42. __u64 coroutine_id;
  43. __u8 source;
  44. __u8 comm[TASK_COMM_LEN];
  45. __u64 socket_id;
  46. struct __tuple_t tuple;
  47. __u32 extra_data;
  48. __u32 extra_data_count;
  49. __u32 tcp_seq;
  50. __u64 thread_trace_id;
  51. __u64 timestamp;
  52. __u8 direction: 1;
  53. __u8 msg_type: 7;
  54. __u64 syscall_len;
  55. __u64 data_seq;
  56. __u16 data_type;
  57. __u16 data_len;
  58. char data[BURST_DATA_BUF_SIZE];
  59. } __attribute__((packed));
  60. struct __socket_data_buffer {
  61. __u32 events_num;
  62. __u32 len;
  63. char data[32760];
  64. };
  65. */
  66. import "C"
  67. type SocketData C.struct___socket_data
  68. type SocketDataBuffer C.struct___socket_data_buffer
  69. const MaxPayloadSize = 1024
  70. type EventType uint32
  71. type EventReason uint32
  72. const (
  73. EventTypeProcessStart EventType = 1
  74. EventTypeProcessExit EventType = 2
  75. EventTypeConnectionOpen EventType = 3
  76. EventTypeConnectionClose EventType = 4
  77. EventTypeConnectionError EventType = 5
  78. EventTypeListenOpen EventType = 6
  79. EventTypeListenClose EventType = 7
  80. EventTypeFileOpen EventType = 8
  81. EventTypeTCPRetransmit EventType = 9
  82. EventTypeL7Request EventType = 10
  83. EventTypeFunEnt EventType = 11
  84. EventTypeFunRet EventType = 12
  85. EventTypeAcceptOpen EventType = 13
  86. EventTypeAcceptClose EventType = 14
  87. EventReasonNone EventReason = 0
  88. EventReasonOOMKill EventReason = 1
  89. )
  90. type TrafficStats struct {
  91. BytesSent uint64
  92. BytesReceived uint64
  93. }
  94. type Event struct {
  95. StackEvent *StackEvent
  96. Type EventType
  97. Reason EventReason
  98. Pid uint32
  99. SrcAddr netaddr.IPPort
  100. DstAddr netaddr.IPPort
  101. Fd uint64
  102. Timestamp uint64
  103. Duration time.Duration
  104. L7Request *l7.RequestData
  105. TrafficStats *TrafficStats
  106. FirstReadTime uint64
  107. FirstWriteTime uint64
  108. NewReadTime uint64
  109. }
  110. type perfMapType uint8
  111. const (
  112. perfMapTypeProcEvents perfMapType = 1
  113. perfMapTypeTCPEvents perfMapType = 2
  114. perfMapTypeFileEvents perfMapType = 3
  115. perfMapTypeL7Events perfMapType = 4
  116. perfMapTypeSocketEvents perfMapType = 5
  117. perfMapTypeEventQueue perfMapType = 6
  118. perfMapTypePythonThreadEvents perfMapType = 7
  119. )
  120. type Tracer struct {
  121. kernelVersion string
  122. disableL7Tracing bool
  123. disableE2ETracing bool
  124. disableStackTracing bool
  125. collection *ebpf.Collection
  126. collectionSpec *ebpf.CollectionSpec
  127. readers map[string]*perf.Reader
  128. links []link.Link
  129. uprobes map[string]*ebpf.Program
  130. Symbols []debugelf.Symbol
  131. Uprobes []tracer.Uprobe
  132. UprobesMap map[string]tracer.Uprobe
  133. }
  134. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  135. if disableL7Tracing {
  136. klog.Infoln("L7 tracing is disabled")
  137. }
  138. if disableE2ETracing {
  139. klog.Infoln("L7 tracing is disabled")
  140. }
  141. if disableStackTracing {
  142. klog.Infoln("L7 stack is disabled")
  143. }
  144. return &Tracer{
  145. kernelVersion: kernelVersion,
  146. disableL7Tracing: disableL7Tracing,
  147. disableE2ETracing: disableE2ETracing,
  148. disableStackTracing: disableStackTracing,
  149. readers: map[string]*perf.Reader{},
  150. uprobes: map[string]*ebpf.Program{},
  151. }
  152. }
  153. func (t *Tracer) Run(events chan<- Event) error {
  154. if err := t.ebpf(events); err != nil {
  155. return err
  156. }
  157. if err := t.init(events); err != nil {
  158. return err
  159. }
  160. return nil
  161. }
  162. func (t *Tracer) Close() {
  163. for k, p := range t.uprobes {
  164. err := p.Close()
  165. klog.WithError(err).Infof("[close] uprobes %s", k)
  166. }
  167. for _, l := range t.links {
  168. err := l.Close()
  169. klog.WithError(err).Infof("[close] links")
  170. }
  171. for k, r := range t.readers {
  172. err := r.Close()
  173. klog.WithError(err).Infof("[close] readers %s", k)
  174. }
  175. t.collection.Close()
  176. }
  177. func (t *Tracer) init(ch chan<- Event) error {
  178. pids, err := proc.ListPids()
  179. if err != nil {
  180. return fmt.Errorf("failed to list pids: %w", err)
  181. }
  182. for _, pid := range pids {
  183. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  184. }
  185. fds, sockets := readFds(pids)
  186. for _, fd := range fds {
  187. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  188. }
  189. listens := map[uint64]bool{}
  190. for _, s := range sockets {
  191. if s.Listen {
  192. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  193. }
  194. }
  195. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  196. timestamp := uint64(time.Now().UnixNano())
  197. for _, s := range sockets {
  198. typ := EventTypeConnectionOpen
  199. if s.Listen {
  200. typ = EventTypeListenOpen
  201. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  202. continue
  203. }
  204. ch <- Event{
  205. Type: typ,
  206. Pid: s.pid,
  207. Timestamp: timestamp,
  208. Fd: s.fd,
  209. SrcAddr: s.SAddr,
  210. DstAddr: s.DAddr,
  211. }
  212. if typ == EventTypeConnectionOpen {
  213. id := ConnectionId{FD: s.fd, PID: s.pid}
  214. sip := s.SAddr.IP()
  215. sipbytes := sip.As16()
  216. dip := s.DAddr.IP()
  217. dipbytes := dip.As16()
  218. conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()}
  219. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  220. klog.Warningln(err)
  221. }
  222. }
  223. }
  224. return nil
  225. }
  226. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  227. return t.collection.Maps["active_connections"].Iterate()
  228. }
  229. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  230. return t.collection.Maps["active_accepts"].Iterate()
  231. }
  232. type perfMap struct {
  233. name string
  234. perCPUBufferSizePages int
  235. typ perfMapType
  236. }
  237. func (t *Tracer) ebpf(ch chan<- Event) error {
  238. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  239. path, prg, err := EbpfCode(kv)
  240. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  241. if len(prg) == 0 || err != nil {
  242. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  243. }
  244. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  245. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  246. if debugFsErr != nil && traceFsErr != nil {
  247. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  248. }
  249. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  250. if err != nil {
  251. return fmt.Errorf("failed to load collection spec: %w", err)
  252. }
  253. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  254. tracer.PidFilter(collectionSpec)
  255. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  256. klog.Infof("[start] Look eBPF .maps")
  257. for _, spec := range collectionSpec.Maps {
  258. klog.Infoln(spec.Name)
  259. }
  260. klog.Infof("[end] Look eBPF .maps")
  261. tracer.MapInit(collectionSpec, opts)
  262. // TODO 多进程
  263. // tracer.SetConstants(collectionSpec)
  264. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  265. if err != nil {
  266. var verr *ebpf.VerifierError
  267. if errors.As(err, &verr) {
  268. klog.Errorf("----%+v", verr)
  269. }
  270. return fmt.Errorf("failed to load collection: %w", err)
  271. }
  272. tracer.Offset()
  273. t.collectionSpec = collectionSpec
  274. t.collection = c
  275. perfMaps := []perfMap{
  276. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  277. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  278. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  279. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  280. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  281. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  282. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  283. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  284. }
  285. tracer.MapInsert(c)
  286. if !t.DisableL7Tracing() {
  287. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  288. }
  289. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  290. klog.Infof("[start] Look eBPF perf_maps")
  291. for _, pm := range perfMaps {
  292. klog.Infoln(pm.name)
  293. m, ok := t.collection.Maps[pm.name]
  294. if ok {
  295. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  296. if err != nil {
  297. t.Close()
  298. return fmt.Errorf("failed to create ebpf reader: %w", err)
  299. }
  300. t.readers[pm.name] = r
  301. // event监听
  302. //go runEventsReader(pm.name, r, ch, pm.typ)
  303. try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ)
  304. }
  305. }
  306. klog.Infof("[end] Look eBPF perf_maps")
  307. klog.Infof("[start] Look eBPF specPrograms")
  308. if err = t.LinkEbpfProg(); err != nil {
  309. return err
  310. }
  311. klog.Infof("[end] Look eBPF specPrograms")
  312. return nil
  313. }
  314. func (t *Tracer) LinkEbpfProg() error {
  315. klog.Infof("[start] Look eBPF specPrograms")
  316. var (
  317. l link.Link
  318. err error
  319. lastErr error
  320. )
  321. for _, programSpec := range t.collectionSpec.Programs {
  322. program := t.collection.Programs[programSpec.Name]
  323. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  324. if t.DisableL7Tracing() {
  325. switch programSpec.Name {
  326. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  327. continue
  328. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  329. continue
  330. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  331. continue
  332. }
  333. }
  334. switch programSpec.Type {
  335. case ebpf.TracePoint:
  336. if strings.Contains(programSpec.SectionName, "prog") {
  337. continue
  338. }
  339. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  340. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  341. case ebpf.Kprobe:
  342. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  343. t.uprobes[programSpec.Name] = program
  344. continue
  345. }
  346. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  347. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  348. t.links = append(t.links, l)
  349. continue
  350. }
  351. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  352. }
  353. if err != nil {
  354. lastErr = err
  355. t.Close()
  356. klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
  357. //return fmt.Errorf("failed to link program: %w", err)
  358. } else {
  359. t.links = append(t.links, l)
  360. }
  361. }
  362. klog.Infof("[end] Look eBPF specPrograms")
  363. if lastErr != nil {
  364. return fmt.Errorf("failed to link program: %w", lastErr)
  365. }
  366. return nil
  367. }
  368. func (t *Tracer) UnlinkEbpfProg() error {
  369. var (
  370. lastErr error
  371. err error
  372. )
  373. /* 此处不应该处理 t.uprobes 中 ebpf-program
  374. for pName, p := range t.uprobes {
  375. if err = p.Close(); err != nil {
  376. lastErr = err
  377. klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
  378. }
  379. }
  380. */
  381. for _, l := range t.links {
  382. if err = l.Close(); err != nil {
  383. lastErr = err
  384. klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
  385. }
  386. }
  387. return lastErr
  388. }
  389. func (t EventType) Int() int {
  390. return int(t)
  391. }
  392. func (t EventType) String() string {
  393. switch t {
  394. case EventTypeProcessStart:
  395. return "process-start"
  396. case EventTypeProcessExit:
  397. return "process-exit"
  398. case EventTypeConnectionOpen:
  399. return "connection-open"
  400. case EventTypeConnectionClose:
  401. return "connection-close"
  402. case EventTypeConnectionError:
  403. return "connection-error"
  404. case EventTypeListenOpen:
  405. return "listen-open"
  406. case EventTypeListenClose:
  407. return "listen-close"
  408. case EventTypeFileOpen:
  409. return "file-open"
  410. case EventTypeTCPRetransmit:
  411. return "tcp-retransmit"
  412. case EventTypeL7Request:
  413. return "l7-request"
  414. }
  415. return "unknown: " + strconv.Itoa(int(t))
  416. }
  417. func (t EventReason) String() string {
  418. switch t {
  419. case EventReasonNone:
  420. return "none"
  421. case EventReasonOOMKill:
  422. return "oom-kill"
  423. }
  424. return "unknown: " + strconv.Itoa(int(t))
  425. }
  426. type procEvent struct {
  427. Type EventType
  428. Pid uint32
  429. Reason uint32
  430. }
  431. type tcpEvent struct {
  432. Fd uint64
  433. Timestamp uint64
  434. Duration uint64
  435. FirstReadTime uint64
  436. FirstWriteTime uint64
  437. NewReadTime uint64
  438. Type EventType
  439. Pid uint32
  440. BytesSent uint64
  441. BytesReceived uint64
  442. SPort uint16
  443. DPort uint16
  444. SAddr [16]byte
  445. DAddr [16]byte
  446. }
  447. type fileEvent struct {
  448. Type EventType
  449. Pid uint32
  450. Fd uint64
  451. }
  452. // struct l7_event in l7.c
  453. type l7Event struct {
  454. Fd uint64
  455. ConnectionTimestamp uint64
  456. Pid uint32
  457. Status uint32
  458. Duration uint64
  459. Protocol uint8
  460. Method uint8
  461. Padding uint16
  462. StatementId uint32
  463. PayloadSize uint64
  464. TraceId uint64
  465. StartAt uint64 // ns
  466. EndtAt uint64 // ns
  467. TraceStart uint32
  468. TraceEnd uint32
  469. EventCount uint32
  470. Sport uint16
  471. Dport uint16
  472. SAddr [16]byte
  473. DAddr [16]byte
  474. ComponentSport uint16
  475. ComponentDport uint16
  476. ComponentSAddr [16]byte
  477. ComponentDAddr [16]byte
  478. AssumedAppId HashByte
  479. SpanId HashByte
  480. TraceIdFrom HashByte16
  481. CalledId HashByte
  482. InstanceIdFrom HashByte
  483. AppIdFrom HashByte
  484. SpanIdFrom HashByte
  485. }
  486. type SocketDataBufferddd struct {
  487. EventsNum uint32
  488. Len uint32
  489. Data [32760]byte
  490. }
  491. const (
  492. TASK_COMM_LEN = 16
  493. BURST_DATA_BUF_SIZE = 8192
  494. )
  495. type Tuple struct {
  496. Daddr [16]uint8
  497. RcvSaddr [16]uint8
  498. AddrLen uint8
  499. L4Protocol uint8
  500. Dport uint16
  501. Num uint16
  502. }
  503. type SocketDatadddd struct {
  504. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  505. Tgid uint32 // 进程号
  506. CoroutineID uint64
  507. Source uint8
  508. Comm [TASK_COMM_LEN]byte
  509. SocketID uint64
  510. Tuple Tuple
  511. ExtraData uint32
  512. ExtraDataCount uint32
  513. TcpSeq uint32
  514. ThreadTraceID uint64
  515. Timestamp uint64
  516. Direction uint8
  517. MsgType uint8
  518. SyscallLen uint64
  519. DataSeq uint64
  520. DataType uint16
  521. DataLen uint16
  522. Data [BURST_DATA_BUF_SIZE]byte
  523. }
  524. type StackEvent struct {
  525. Type uint64
  526. Pid uint64
  527. TraceId uint64
  528. Goid uint64
  529. Ip uint64
  530. Bp uint64
  531. CallerIp uint64
  532. CallerBp uint64
  533. TimeNsStart uint64
  534. TimeNsEnd uint64
  535. // Nid uint64
  536. // Fpid uint64
  537. // Level uint64
  538. Location byte
  539. ClassName [100]byte
  540. MethedName [100]byte
  541. }
  542. type StackFunEvent struct {
  543. StackEvent StackEvent
  544. Uprobe *tracer.Uprobe
  545. }
  546. type pythonThreadEvent struct {
  547. Type EventType
  548. Pid uint32
  549. Duration uint64
  550. }
  551. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  552. for {
  553. rec, err := r.Read()
  554. if err != nil {
  555. if errors.Is(err, perf.ErrClosed) {
  556. break
  557. }
  558. continue
  559. }
  560. if rec.LostSamples > 0 {
  561. klog.Errorln(name, "lost samples:", rec.LostSamples)
  562. continue
  563. }
  564. var event Event
  565. switch typ {
  566. case perfMapTypeSocketEvents:
  567. //fmt.Println("perfMapTypeSocketEvents")
  568. // 假设 rec.RawSample 包含数据,类型为 []byte
  569. //rawData := rec.RawSample
  570. //fmt.Println("perfMapTypeSocketEvents2")
  571. //
  572. //// 创建一个 SocketDataBuffer 结构体实例
  573. //var buffer SocketDataBuffer
  574. //
  575. //// 创建一个字节缓冲区,并将数据填充到其中
  576. //reader := bytes.NewReader(rawData)
  577. //fmt.Println("perfMapTypeSocketEvents3")
  578. //fmt.Println(len(rawData))
  579. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  580. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  581. // fmt.Println(reader.Len())
  582. // fmt.Println("Failed to read data:", err)
  583. // continue
  584. //}
  585. //fmt.Println("perfMapTypeSocketEvents4")
  586. //
  587. //// 打印解析后的数据
  588. //fmt.Println("EventsNum:", buffer.EventsNum)
  589. //fmt.Println("Len:", buffer.Len)
  590. //
  591. //// 打印 char data 的内容
  592. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  593. //socketDataBuffer := rec.RawSample
  594. // 解析 __socket_data_buffer
  595. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  596. //
  597. //// 获取 char data[32760];
  598. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  599. //// todo
  600. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  601. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  602. //
  603. //// 解析C结构体中的data字段
  604. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  605. //// 打印或处理包含的数据
  606. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  607. /*todo */
  608. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  609. //dataPtr := unsafe.Pointer(&buf.data[0])
  610. //socketData := (*SocketData)(dataPtr)
  611. //reader2 := bytes.NewBuffer(rec.RawSample)
  612. // 222222
  613. //fmt.Println("socketData.Pid:", socketData.pid)
  614. //fmt.Println("socketData.Tgid:", socketData.tgid)
  615. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  616. //fmt.Println("socketData.Source:", socketData.source)
  617. //
  618. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  619. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  620. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  621. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  622. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  623. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  624. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  625. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  626. //fmt.Println("socketData.Direction:", socketData.Direction)
  627. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  628. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  629. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  630. //socketData := &SocketData{}
  631. //reader := bytes.NewBuffer(rec.RawSample)
  632. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  633. // klog.Warningln("failed1 to read msg:", err)
  634. // continue
  635. //}
  636. //
  637. //var data []byte
  638. //payload := reader.Bytes()
  639. //switch {
  640. //case v.Len == 0:
  641. //case v.Len > 32760:
  642. // data = payload[:32760]
  643. //default:
  644. // data = payload[:v.Len]
  645. //}
  646. //////data2 := data[:v.Len]
  647. ////fmt.Println("perfMapTypeSocketEvents")
  648. //fmt.Println(v.EventsNum)
  649. //fmt.Println(v.Len)
  650. //fmt.Println(string(data))
  651. //
  652. //var data2 SocketData
  653. //reader2 := bytes.NewBuffer(data)
  654. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  655. // klog.Warningln("failed2 to read msg:", err)
  656. // continue
  657. //}
  658. //
  659. //fmt.Println(data2.Pid)
  660. //fmt.Println(data2.Tgid)
  661. //fmt.Println(string(v.Data))
  662. //continue
  663. case perfMapTypeL7Events:
  664. v := &l7Event{}
  665. reader := bytes.NewBuffer(rec.RawSample)
  666. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  667. klog.Warningln("failed to read msg:", err)
  668. continue
  669. }
  670. //fmt.Println("v.TraceIdFrom")
  671. //fmt.Println(v.TraceIdFrom)
  672. //a := hex.EncodeToString(v.TraceIdFrom[:])
  673. //for _, b := range v.AssumedAppId {
  674. // fmt.Printf("v.AssumedAppId- %02\n", b)
  675. //}
  676. //fmt.Println(a)
  677. payload := reader.Bytes()
  678. req := &l7.RequestData{
  679. Protocol: l7.Protocol(v.Protocol),
  680. Status: l7.Status(v.Status),
  681. Duration: time.Duration(v.Duration),
  682. Method: l7.Method(v.Method),
  683. StatementId: v.StatementId,
  684. TraceId: v.TraceId,
  685. TraceStart: v.TraceStart,
  686. TraceEnd: v.TraceEnd,
  687. EventCount: v.EventCount,
  688. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  689. SpanId: hex.EncodeToString(v.SpanId[:]),
  690. StartAt: v.StartAt,
  691. EndAt: v.EndtAt,
  692. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  693. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  694. }
  695. // if req.Protocol == l7.ProtocolHTTP {
  696. // klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  697. // klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  698. // }
  699. if req.Protocol == l7.ProtocolKafka {
  700. klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  701. klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  702. }
  703. if v.TraceEnd == 1 {
  704. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  705. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  706. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  707. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  708. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  709. req.SAddr = ipPort(v.SAddr, v.Sport)
  710. req.DAddr = ipPort(v.DAddr, v.Dport)
  711. // klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
  712. // klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
  713. }
  714. switch {
  715. case v.PayloadSize == 0:
  716. case v.PayloadSize > MaxPayloadSize:
  717. req.Payload = payload[:MaxPayloadSize]
  718. default:
  719. req.Payload = payload[:v.PayloadSize]
  720. }
  721. //fmt.Println("==========")
  722. //fmt.Println("req.Payload:", string(req.Payload))
  723. //fmt.Println("==========")
  724. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  725. case perfMapTypeFileEvents:
  726. v := &fileEvent{}
  727. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  728. klog.Warningln("failed to read msg:", err)
  729. continue
  730. }
  731. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  732. case perfMapTypeProcEvents:
  733. v := &procEvent{}
  734. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  735. klog.Warningln("failed to read msg:", err)
  736. continue
  737. }
  738. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  739. case perfMapTypeTCPEvents:
  740. v := &tcpEvent{}
  741. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  742. klog.Warningln("failed to read msg:", err)
  743. continue
  744. }
  745. event = Event{
  746. Type: v.Type,
  747. Pid: v.Pid,
  748. SrcAddr: ipPort(v.SAddr, v.SPort),
  749. DstAddr: ipPort(v.DAddr, v.DPort),
  750. Fd: v.Fd,
  751. Timestamp: v.Timestamp,
  752. Duration: time.Duration(v.Duration),
  753. }
  754. if v.Type == EventTypeConnectionClose {
  755. event.TrafficStats = &TrafficStats{
  756. BytesSent: v.BytesSent,
  757. BytesReceived: v.BytesReceived,
  758. }
  759. }
  760. event.FirstReadTime = v.FirstReadTime
  761. event.FirstWriteTime = v.FirstWriteTime
  762. event.NewReadTime = v.NewReadTime
  763. if v.Type == EventTypeAcceptClose {
  764. event.TrafficStats = &TrafficStats{
  765. BytesSent: v.BytesSent,
  766. BytesReceived: v.BytesReceived,
  767. }
  768. }
  769. case perfMapTypePythonThreadEvents:
  770. v := &pythonThreadEvent{}
  771. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  772. klog.Warningln("failed to read msg:", err)
  773. continue
  774. }
  775. event = Event{
  776. Type: v.Type,
  777. Pid: v.Pid,
  778. Duration: time.Duration(v.Duration),
  779. }
  780. case perfMapTypeEventQueue:
  781. v := &StackEvent{}
  782. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  783. klog.Warningln("failed to read msg:", err)
  784. continue
  785. }
  786. event = Event{
  787. Type: EventTypeFunEnt,
  788. StackEvent: v,
  789. }
  790. default:
  791. continue
  792. }
  793. ch <- event
  794. }
  795. }
  796. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  797. i, _ := netaddr.FromStdIP(ip[:])
  798. return netaddr.IPPortFrom(i, port)
  799. }
  800. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  801. var err error
  802. var info EbpfProcInfo
  803. if appInfo.EBPFProcInfo == nil {
  804. info = EbpfProcInfo{
  805. InstanceId: appInfo.InstanceIdHash.HashtVal,
  806. AppId: appInfo.AppIdHash.HashtVal,
  807. CodeType: uint16(appInfo.CodeType),
  808. }
  809. } else {
  810. info = *appInfo.EBPFProcInfo
  811. info.AppId = appInfo.AppIdHash.HashtVal
  812. }
  813. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  814. if err != nil {
  815. klog.Error("failed to update program info", err)
  816. }
  817. appInfo.EBPFProcInfo = &info
  818. return err
  819. }
  820. func (t *Tracer) DelKProcInfo(pid uint32) error {
  821. _, err := tracer.DelProcInfoFromMap(t.collection, pid)
  822. if err != nil {
  823. klog.Error("failed to delete proc info", err)
  824. }
  825. return err
  826. }
  827. // TODO check language
  828. func (t *Tracer) DisableL7Tracing() bool {
  829. return t.disableL7Tracing
  830. }
  831. func (t *Tracer) DisableE2ETracing() bool {
  832. return t.disableE2ETracing
  833. }
  834. func (t *Tracer) DisableStackTracing() bool {
  835. return t.disableStackTracing
  836. }