tracer.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/cilium/ebpf/perf"
  16. "github.com/coroot/coroot-node-agent/common"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. . "github.com/coroot/coroot-node-agent/utils/modelse"
  21. klog "github.com/sirupsen/logrus"
  22. "golang.org/x/sys/unix"
  23. "inet.af/netaddr"
  24. )
  25. /*
  26. #define TASK_COMM_LEN 16
  27. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  28. #include <linux/types.h>
  29. struct __tuple_t {
  30. __u8 daddr[16];
  31. __u8 rcv_saddr[16];
  32. __u8 addr_len;
  33. __u8 l4_protocol;
  34. __u16 dport;
  35. __u16 num;
  36. };
  37. struct __socket_data {
  38. __u32 pid;
  39. __u32 tgid;
  40. __u64 coroutine_id;
  41. __u8 source;
  42. __u8 comm[TASK_COMM_LEN];
  43. __u64 socket_id;
  44. struct __tuple_t tuple;
  45. __u32 extra_data;
  46. __u32 extra_data_count;
  47. __u32 tcp_seq;
  48. __u64 thread_trace_id;
  49. __u64 timestamp;
  50. __u8 direction: 1;
  51. __u8 msg_type: 7;
  52. __u64 syscall_len;
  53. __u64 data_seq;
  54. __u16 data_type;
  55. __u16 data_len;
  56. char data[BURST_DATA_BUF_SIZE];
  57. } __attribute__((packed));
  58. struct __socket_data_buffer {
  59. __u32 events_num;
  60. __u32 len;
  61. char data[32760];
  62. };
  63. */
  64. import "C"
  65. type SocketData C.struct___socket_data
  66. type SocketDataBuffer C.struct___socket_data_buffer
  67. const MaxPayloadSize = 1024
  68. type EventType uint32
  69. type EventReason uint32
  70. const (
  71. EventTypeProcessStart EventType = 1
  72. EventTypeProcessExit EventType = 2
  73. EventTypeConnectionOpen EventType = 3
  74. EventTypeConnectionClose EventType = 4
  75. EventTypeConnectionError EventType = 5
  76. EventTypeListenOpen EventType = 6
  77. EventTypeListenClose EventType = 7
  78. EventTypeFileOpen EventType = 8
  79. EventTypeTCPRetransmit EventType = 9
  80. EventTypeL7Request EventType = 10
  81. EventTypeFunEnt EventType = 11
  82. EventTypeFunRet EventType = 12
  83. EventTypeAcceptOpen EventType = 13
  84. EventTypeAcceptClose EventType = 14
  85. EventReasonNone EventReason = 0
  86. EventReasonOOMKill EventReason = 1
  87. )
  88. type TrafficStats struct {
  89. BytesSent uint64
  90. BytesReceived uint64
  91. }
  92. type Event struct {
  93. StackEvent *StackEvent
  94. Type EventType
  95. Reason EventReason
  96. Pid uint32
  97. SrcAddr netaddr.IPPort
  98. DstAddr netaddr.IPPort
  99. Fd uint64
  100. Timestamp uint64
  101. Duration time.Duration
  102. L7Request *l7.RequestData
  103. TrafficStats *TrafficStats
  104. FirstReadTime uint64
  105. FirstWriteTime uint64
  106. NewReadTime uint64
  107. }
  108. type perfMapType uint8
  109. const (
  110. perfMapTypeProcEvents perfMapType = 1
  111. perfMapTypeTCPEvents perfMapType = 2
  112. perfMapTypeFileEvents perfMapType = 3
  113. perfMapTypeL7Events perfMapType = 4
  114. perfMapTypeSocketEvents perfMapType = 5
  115. perfMapTypeEventQueue perfMapType = 6
  116. perfMapTypePythonThreadEvents perfMapType = 7
  117. )
  118. type Tracer struct {
  119. kernelVersion string
  120. disableL7Tracing bool
  121. disableE2ETracing bool
  122. disableStackTracing bool
  123. collection *ebpf.Collection
  124. collectionSpec *ebpf.CollectionSpec
  125. readers map[string]*perf.Reader
  126. links []link.Link
  127. uprobes map[string]*ebpf.Program
  128. Symbols []debugelf.Symbol
  129. Uprobes []tracer.Uprobe
  130. UprobesMap map[string]tracer.Uprobe
  131. }
  132. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  133. if disableL7Tracing {
  134. klog.Infoln("L7 tracing is disabled")
  135. }
  136. if disableE2ETracing {
  137. klog.Infoln("L7 tracing is disabled")
  138. }
  139. if disableStackTracing {
  140. klog.Infoln("L7 stack is disabled")
  141. }
  142. return &Tracer{
  143. kernelVersion: kernelVersion,
  144. disableL7Tracing: disableL7Tracing,
  145. disableE2ETracing: disableE2ETracing,
  146. disableStackTracing: disableStackTracing,
  147. readers: map[string]*perf.Reader{},
  148. uprobes: map[string]*ebpf.Program{},
  149. }
  150. }
  151. func (t *Tracer) Run(events chan<- Event) error {
  152. if err := t.ebpf(events); err != nil {
  153. return err
  154. }
  155. if err := t.init(events); err != nil {
  156. return err
  157. }
  158. return nil
  159. }
  160. func (t *Tracer) Close() {
  161. for k, p := range t.uprobes {
  162. err := p.Close()
  163. klog.WithError(err).Infof("[close] uprobes %s", k)
  164. }
  165. for _, l := range t.links {
  166. err := l.Close()
  167. klog.WithError(err).Infof("[close] links")
  168. }
  169. for k, r := range t.readers {
  170. err := r.Close()
  171. klog.WithError(err).Infof("[close] readers %s", k)
  172. }
  173. t.collection.Close()
  174. }
  175. func (t *Tracer) init(ch chan<- Event) error {
  176. pids, err := proc.ListPids()
  177. if err != nil {
  178. return fmt.Errorf("failed to list pids: %w", err)
  179. }
  180. for _, pid := range pids {
  181. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  182. }
  183. fds, sockets := readFds(pids)
  184. for _, fd := range fds {
  185. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  186. }
  187. listens := map[uint64]bool{}
  188. for _, s := range sockets {
  189. if s.Listen {
  190. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  191. }
  192. }
  193. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  194. timestamp := uint64(time.Now().UnixNano())
  195. for _, s := range sockets {
  196. typ := EventTypeConnectionOpen
  197. if s.Listen {
  198. typ = EventTypeListenOpen
  199. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  200. continue
  201. }
  202. ch <- Event{
  203. Type: typ,
  204. Pid: s.pid,
  205. Timestamp: timestamp,
  206. Fd: s.fd,
  207. SrcAddr: s.SAddr,
  208. DstAddr: s.DAddr,
  209. }
  210. if typ == EventTypeConnectionOpen {
  211. id := ConnectionId{FD: s.fd, PID: s.pid}
  212. sip := s.SAddr.IP()
  213. sipbytes := sip.As16()
  214. dip := s.DAddr.IP()
  215. dipbytes := dip.As16()
  216. conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()}
  217. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  218. klog.Warningln(err)
  219. }
  220. }
  221. }
  222. return nil
  223. }
  224. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  225. return t.collection.Maps["active_connections"].Iterate()
  226. }
  227. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  228. return t.collection.Maps["active_accepts"].Iterate()
  229. }
  230. type perfMap struct {
  231. name string
  232. perCPUBufferSizePages int
  233. typ perfMapType
  234. }
  235. func (t *Tracer) ebpf(ch chan<- Event) error {
  236. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  237. path, prg, err := EbpfCode(kv)
  238. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  239. if len(prg) == 0 || err != nil {
  240. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  241. }
  242. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  243. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  244. if debugFsErr != nil && traceFsErr != nil {
  245. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  246. }
  247. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  248. if err != nil {
  249. return fmt.Errorf("failed to load collection spec: %w", err)
  250. }
  251. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  252. tracer.PidFilter(collectionSpec)
  253. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  254. klog.Infof("[start] Look eBPF .maps")
  255. for _, spec := range collectionSpec.Maps {
  256. klog.Infoln(spec.Name)
  257. }
  258. klog.Infof("[end] Look eBPF .maps")
  259. tracer.MapInit(collectionSpec, opts)
  260. // TODO 多进程
  261. // tracer.SetConstants(collectionSpec)
  262. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  263. if err != nil {
  264. var verr *ebpf.VerifierError
  265. if errors.As(err, &verr) {
  266. klog.Errorf("----%+v", verr)
  267. }
  268. return fmt.Errorf("failed to load collection: %w", err)
  269. }
  270. tracer.Offset()
  271. t.collectionSpec = collectionSpec
  272. t.collection = c
  273. perfMaps := []perfMap{
  274. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  275. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  276. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  277. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  278. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  279. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  280. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  281. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  282. }
  283. tracer.MapInsert(c)
  284. if !t.DisableL7Tracing() {
  285. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  286. }
  287. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  288. klog.Infof("[start] Look eBPF perf_maps")
  289. for _, pm := range perfMaps {
  290. klog.Infoln(pm.name)
  291. m, ok := t.collection.Maps[pm.name]
  292. if ok {
  293. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  294. if err != nil {
  295. t.Close()
  296. return fmt.Errorf("failed to create ebpf reader: %w", err)
  297. }
  298. t.readers[pm.name] = r
  299. // event监听
  300. go runEventsReader(pm.name, r, ch, pm.typ)
  301. }
  302. }
  303. klog.Infof("[end] Look eBPF perf_maps")
  304. klog.Infof("[start] Look eBPF specPrograms")
  305. if err = t.LinkEbpfProg(); err != nil {
  306. return err
  307. }
  308. klog.Infof("[end] Look eBPF specPrograms")
  309. return nil
  310. }
  311. func (t *Tracer) LinkEbpfProg() error {
  312. klog.Infof("[start] Look eBPF specPrograms")
  313. var (
  314. l link.Link
  315. err error
  316. lastErr error
  317. )
  318. for _, programSpec := range t.collectionSpec.Programs {
  319. program := t.collection.Programs[programSpec.Name]
  320. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  321. if t.DisableL7Tracing() {
  322. switch programSpec.Name {
  323. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  324. continue
  325. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  326. continue
  327. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  328. continue
  329. }
  330. }
  331. switch programSpec.Type {
  332. case ebpf.TracePoint:
  333. if strings.Contains(programSpec.SectionName, "prog") {
  334. continue
  335. }
  336. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  337. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  338. case ebpf.Kprobe:
  339. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  340. t.uprobes[programSpec.Name] = program
  341. continue
  342. }
  343. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  344. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  345. t.links = append(t.links, l)
  346. continue
  347. }
  348. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  349. }
  350. if err != nil {
  351. lastErr = err
  352. t.Close()
  353. klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
  354. //return fmt.Errorf("failed to link program: %w", err)
  355. } else {
  356. t.links = append(t.links, l)
  357. }
  358. }
  359. klog.Infof("[end] Look eBPF specPrograms")
  360. if lastErr != nil {
  361. return fmt.Errorf("failed to link program: %w", lastErr)
  362. }
  363. return nil
  364. }
  365. func (t *Tracer) UnlinkEbpfProg() error {
  366. var (
  367. lastErr error
  368. err error
  369. )
  370. /* 此处不应该处理 t.uprobes 中 ebpf-program
  371. for pName, p := range t.uprobes {
  372. if err = p.Close(); err != nil {
  373. lastErr = err
  374. klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
  375. }
  376. }
  377. */
  378. for _, l := range t.links {
  379. if err = l.Close(); err != nil {
  380. lastErr = err
  381. klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
  382. }
  383. }
  384. return lastErr
  385. }
  386. func (t EventType) Int() int {
  387. return int(t)
  388. }
  389. func (t EventType) String() string {
  390. switch t {
  391. case EventTypeProcessStart:
  392. return "process-start"
  393. case EventTypeProcessExit:
  394. return "process-exit"
  395. case EventTypeConnectionOpen:
  396. return "connection-open"
  397. case EventTypeConnectionClose:
  398. return "connection-close"
  399. case EventTypeConnectionError:
  400. return "connection-error"
  401. case EventTypeListenOpen:
  402. return "listen-open"
  403. case EventTypeListenClose:
  404. return "listen-close"
  405. case EventTypeFileOpen:
  406. return "file-open"
  407. case EventTypeTCPRetransmit:
  408. return "tcp-retransmit"
  409. case EventTypeL7Request:
  410. return "l7-request"
  411. }
  412. return "unknown: " + strconv.Itoa(int(t))
  413. }
  414. func (t EventReason) String() string {
  415. switch t {
  416. case EventReasonNone:
  417. return "none"
  418. case EventReasonOOMKill:
  419. return "oom-kill"
  420. }
  421. return "unknown: " + strconv.Itoa(int(t))
  422. }
  423. type procEvent struct {
  424. Type EventType
  425. Pid uint32
  426. Reason uint32
  427. }
  428. type tcpEvent struct {
  429. Fd uint64
  430. Timestamp uint64
  431. Duration uint64
  432. FirstReadTime uint64
  433. FirstWriteTime uint64
  434. NewReadTime uint64
  435. Type EventType
  436. Pid uint32
  437. BytesSent uint64
  438. BytesReceived uint64
  439. SPort uint16
  440. DPort uint16
  441. SAddr [16]byte
  442. DAddr [16]byte
  443. }
  444. type fileEvent struct {
  445. Type EventType
  446. Pid uint32
  447. Fd uint64
  448. }
  449. // struct l7_event in l7.c
  450. type l7Event struct {
  451. Fd uint64
  452. ConnectionTimestamp uint64
  453. Pid uint32
  454. Status uint32
  455. Duration uint64
  456. Protocol uint8
  457. Method uint8
  458. Padding uint16
  459. StatementId uint32
  460. PayloadSize uint64
  461. TraceId uint64
  462. StartAt uint64 // ns
  463. EndtAt uint64 // ns
  464. TraceStart uint32
  465. TraceEnd uint32
  466. EventCount uint32
  467. Sport uint16
  468. Dport uint16
  469. SAddr [16]byte
  470. DAddr [16]byte
  471. ComponentSport uint16
  472. ComponentDport uint16
  473. ComponentSAddr [16]byte
  474. ComponentDAddr [16]byte
  475. AssumedAppId HashByte
  476. SpanId HashByte
  477. TraceIdFrom HashByte16
  478. CalledId HashByte
  479. InstanceIdFrom HashByte
  480. AppIdFrom HashByte
  481. SpanIdFrom HashByte
  482. }
  483. type SocketDataBufferddd struct {
  484. EventsNum uint32
  485. Len uint32
  486. Data [32760]byte
  487. }
  488. const (
  489. TASK_COMM_LEN = 16
  490. BURST_DATA_BUF_SIZE = 8192
  491. )
  492. type Tuple struct {
  493. Daddr [16]uint8
  494. RcvSaddr [16]uint8
  495. AddrLen uint8
  496. L4Protocol uint8
  497. Dport uint16
  498. Num uint16
  499. }
  500. type SocketDatadddd struct {
  501. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  502. Tgid uint32 // 进程号
  503. CoroutineID uint64
  504. Source uint8
  505. Comm [TASK_COMM_LEN]byte
  506. SocketID uint64
  507. Tuple Tuple
  508. ExtraData uint32
  509. ExtraDataCount uint32
  510. TcpSeq uint32
  511. ThreadTraceID uint64
  512. Timestamp uint64
  513. Direction uint8
  514. MsgType uint8
  515. SyscallLen uint64
  516. DataSeq uint64
  517. DataType uint16
  518. DataLen uint16
  519. Data [BURST_DATA_BUF_SIZE]byte
  520. }
  521. type StackEvent struct {
  522. Type uint64
  523. Pid uint64
  524. TraceId uint64
  525. Goid uint64
  526. Ip uint64
  527. Bp uint64
  528. CallerIp uint64
  529. CallerBp uint64
  530. TimeNsStart uint64
  531. TimeNsEnd uint64
  532. // Nid uint64
  533. // Fpid uint64
  534. // Level uint64
  535. Location byte
  536. ClassName [100]byte
  537. MethedName [100]byte
  538. }
  539. type StackFunEvent struct {
  540. StackEvent StackEvent
  541. Uprobe *tracer.Uprobe
  542. }
  543. type pythonThreadEvent struct {
  544. Type EventType
  545. Pid uint32
  546. Duration uint64
  547. }
  548. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  549. for {
  550. rec, err := r.Read()
  551. if err != nil {
  552. if errors.Is(err, perf.ErrClosed) {
  553. break
  554. }
  555. continue
  556. }
  557. if rec.LostSamples > 0 {
  558. klog.Errorln(name, "lost samples:", rec.LostSamples)
  559. continue
  560. }
  561. var event Event
  562. switch typ {
  563. case perfMapTypeSocketEvents:
  564. //fmt.Println("perfMapTypeSocketEvents")
  565. // 假设 rec.RawSample 包含数据,类型为 []byte
  566. //rawData := rec.RawSample
  567. //fmt.Println("perfMapTypeSocketEvents2")
  568. //
  569. //// 创建一个 SocketDataBuffer 结构体实例
  570. //var buffer SocketDataBuffer
  571. //
  572. //// 创建一个字节缓冲区,并将数据填充到其中
  573. //reader := bytes.NewReader(rawData)
  574. //fmt.Println("perfMapTypeSocketEvents3")
  575. //fmt.Println(len(rawData))
  576. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  577. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  578. // fmt.Println(reader.Len())
  579. // fmt.Println("Failed to read data:", err)
  580. // continue
  581. //}
  582. //fmt.Println("perfMapTypeSocketEvents4")
  583. //
  584. //// 打印解析后的数据
  585. //fmt.Println("EventsNum:", buffer.EventsNum)
  586. //fmt.Println("Len:", buffer.Len)
  587. //
  588. //// 打印 char data 的内容
  589. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  590. //socketDataBuffer := rec.RawSample
  591. // 解析 __socket_data_buffer
  592. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  593. //
  594. //// 获取 char data[32760];
  595. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  596. //// todo
  597. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  598. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  599. //
  600. //// 解析C结构体中的data字段
  601. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  602. //// 打印或处理包含的数据
  603. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  604. /*todo */
  605. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  606. //dataPtr := unsafe.Pointer(&buf.data[0])
  607. //socketData := (*SocketData)(dataPtr)
  608. //reader2 := bytes.NewBuffer(rec.RawSample)
  609. // 222222
  610. //fmt.Println("socketData.Pid:", socketData.pid)
  611. //fmt.Println("socketData.Tgid:", socketData.tgid)
  612. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  613. //fmt.Println("socketData.Source:", socketData.source)
  614. //
  615. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  616. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  617. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  618. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  619. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  620. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  621. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  622. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  623. //fmt.Println("socketData.Direction:", socketData.Direction)
  624. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  625. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  626. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  627. //socketData := &SocketData{}
  628. //reader := bytes.NewBuffer(rec.RawSample)
  629. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  630. // klog.Warningln("failed1 to read msg:", err)
  631. // continue
  632. //}
  633. //
  634. //var data []byte
  635. //payload := reader.Bytes()
  636. //switch {
  637. //case v.Len == 0:
  638. //case v.Len > 32760:
  639. // data = payload[:32760]
  640. //default:
  641. // data = payload[:v.Len]
  642. //}
  643. //////data2 := data[:v.Len]
  644. ////fmt.Println("perfMapTypeSocketEvents")
  645. //fmt.Println(v.EventsNum)
  646. //fmt.Println(v.Len)
  647. //fmt.Println(string(data))
  648. //
  649. //var data2 SocketData
  650. //reader2 := bytes.NewBuffer(data)
  651. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  652. // klog.Warningln("failed2 to read msg:", err)
  653. // continue
  654. //}
  655. //
  656. //fmt.Println(data2.Pid)
  657. //fmt.Println(data2.Tgid)
  658. //fmt.Println(string(v.Data))
  659. //continue
  660. case perfMapTypeL7Events:
  661. v := &l7Event{}
  662. reader := bytes.NewBuffer(rec.RawSample)
  663. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  664. klog.Warningln("failed to read msg:", err)
  665. continue
  666. }
  667. //fmt.Println("v.TraceIdFrom")
  668. //fmt.Println(v.TraceIdFrom)
  669. //a := hex.EncodeToString(v.TraceIdFrom[:])
  670. //for _, b := range v.AssumedAppId {
  671. // fmt.Printf("v.AssumedAppId- %02\n", b)
  672. //}
  673. //fmt.Println(a)
  674. payload := reader.Bytes()
  675. req := &l7.RequestData{
  676. Protocol: l7.Protocol(v.Protocol),
  677. Status: l7.Status(v.Status),
  678. Duration: time.Duration(v.Duration),
  679. Method: l7.Method(v.Method),
  680. StatementId: v.StatementId,
  681. TraceId: v.TraceId,
  682. TraceStart: v.TraceStart,
  683. TraceEnd: v.TraceEnd,
  684. EventCount: v.EventCount,
  685. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  686. SpanId: hex.EncodeToString(v.SpanId[:]),
  687. StartAt: v.StartAt,
  688. EndAt: v.EndtAt,
  689. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  690. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  691. }
  692. if req.Protocol == l7.ProtocolHTTP {
  693. klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  694. klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  695. }
  696. if v.TraceEnd == 1 {
  697. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  698. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  699. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  700. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  701. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  702. req.SAddr = ipPort(v.SAddr, v.Sport)
  703. req.DAddr = ipPort(v.DAddr, v.Dport)
  704. klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
  705. klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
  706. }
  707. switch {
  708. case v.PayloadSize == 0:
  709. case v.PayloadSize > MaxPayloadSize:
  710. req.Payload = payload[:MaxPayloadSize]
  711. default:
  712. req.Payload = payload[:v.PayloadSize]
  713. }
  714. //fmt.Println("==========")
  715. //fmt.Println("req.Payload:", string(req.Payload))
  716. //fmt.Println("==========")
  717. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  718. case perfMapTypeFileEvents:
  719. v := &fileEvent{}
  720. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  721. klog.Warningln("failed to read msg:", err)
  722. continue
  723. }
  724. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  725. case perfMapTypeProcEvents:
  726. v := &procEvent{}
  727. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  728. klog.Warningln("failed to read msg:", err)
  729. continue
  730. }
  731. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  732. case perfMapTypeTCPEvents:
  733. v := &tcpEvent{}
  734. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  735. klog.Warningln("failed to read msg:", err)
  736. continue
  737. }
  738. event = Event{
  739. Type: v.Type,
  740. Pid: v.Pid,
  741. SrcAddr: ipPort(v.SAddr, v.SPort),
  742. DstAddr: ipPort(v.DAddr, v.DPort),
  743. Fd: v.Fd,
  744. Timestamp: v.Timestamp,
  745. Duration: time.Duration(v.Duration),
  746. }
  747. if v.Type == EventTypeConnectionClose {
  748. event.TrafficStats = &TrafficStats{
  749. BytesSent: v.BytesSent,
  750. BytesReceived: v.BytesReceived,
  751. }
  752. }
  753. event.FirstReadTime = v.FirstReadTime
  754. event.FirstWriteTime = v.FirstWriteTime
  755. event.NewReadTime = v.NewReadTime
  756. if v.Type == EventTypeAcceptClose {
  757. event.TrafficStats = &TrafficStats{
  758. BytesSent: v.BytesSent,
  759. BytesReceived: v.BytesReceived,
  760. }
  761. }
  762. case perfMapTypePythonThreadEvents:
  763. v := &pythonThreadEvent{}
  764. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  765. klog.Warningln("failed to read msg:", err)
  766. continue
  767. }
  768. event = Event{
  769. Type: v.Type,
  770. Pid: v.Pid,
  771. Duration: time.Duration(v.Duration),
  772. }
  773. case perfMapTypeEventQueue:
  774. v := &StackEvent{}
  775. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  776. klog.Warningln("failed to read msg:", err)
  777. continue
  778. }
  779. event = Event{
  780. Type: EventTypeFunEnt,
  781. StackEvent: v,
  782. }
  783. default:
  784. continue
  785. }
  786. ch <- event
  787. }
  788. }
  789. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  790. i, _ := netaddr.FromStdIP(ip[:])
  791. return netaddr.IPPortFrom(i, port)
  792. }
  793. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  794. var err error
  795. var info EbpfProcInfo
  796. if appInfo.EBPFProcInfo == nil {
  797. info = EbpfProcInfo{
  798. InstanceId: appInfo.InstanceIdHash.HashtVal,
  799. AppId: appInfo.AppIdHash.HashtVal,
  800. CodeType: uint16(appInfo.CodeType),
  801. }
  802. } else {
  803. info = *appInfo.EBPFProcInfo
  804. info.AppId = appInfo.AppIdHash.HashtVal
  805. }
  806. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  807. if err != nil {
  808. klog.Error("failed to update program info", err)
  809. }
  810. appInfo.EBPFProcInfo = &info
  811. return err
  812. }
  813. func (t *Tracer) DelKProcInfo(pid uint32) error {
  814. _, err := tracer.DelProcInfoFromMap(t.collection, pid)
  815. if err != nil {
  816. klog.Error("failed to delete proc info", err)
  817. }
  818. return err
  819. }
  820. // TODO check language
  821. func (t *Tracer) DisableL7Tracing() bool {
  822. return t.disableL7Tracing
  823. }
  824. func (t *Tracer) DisableE2ETracing() bool {
  825. return t.disableE2ETracing
  826. }
  827. func (t *Tracer) DisableStackTracing() bool {
  828. return t.disableStackTracing
  829. }