tracer.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/coroot/coroot-node-agent/utils"
  14. "github.com/coroot/coroot-node-agent/utils/try"
  15. "github.com/cilium/ebpf"
  16. "github.com/cilium/ebpf/link"
  17. "github.com/cilium/ebpf/perf"
  18. "github.com/coroot/coroot-node-agent/common"
  19. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  20. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  21. "github.com/coroot/coroot-node-agent/proc"
  22. . "github.com/coroot/coroot-node-agent/utils/modelse"
  23. klog "github.com/sirupsen/logrus"
  24. "golang.org/x/sys/unix"
  25. "inet.af/netaddr"
  26. )
  27. /*
  28. #define TASK_COMM_LEN 16
  29. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  30. #include <linux/types.h>
  31. struct __tuple_t {
  32. __u8 daddr[16];
  33. __u8 rcv_saddr[16];
  34. __u8 addr_len;
  35. __u8 l4_protocol;
  36. __u16 dport;
  37. __u16 num;
  38. };
  39. struct __socket_data {
  40. __u32 pid;
  41. __u32 tgid;
  42. __u64 coroutine_id;
  43. __u8 source;
  44. __u8 comm[TASK_COMM_LEN];
  45. __u64 socket_id;
  46. struct __tuple_t tuple;
  47. __u32 extra_data;
  48. __u32 extra_data_count;
  49. __u32 tcp_seq;
  50. __u64 thread_trace_id;
  51. __u64 timestamp;
  52. __u8 direction: 1;
  53. __u8 msg_type: 7;
  54. __u64 syscall_len;
  55. __u64 data_seq;
  56. __u16 data_type;
  57. __u16 data_len;
  58. char data[BURST_DATA_BUF_SIZE];
  59. } __attribute__((packed));
  60. struct __socket_data_buffer {
  61. __u32 events_num;
  62. __u32 len;
  63. char data[32760];
  64. };
  65. */
  66. import "C"
  67. type SocketData C.struct___socket_data
  68. type SocketDataBuffer C.struct___socket_data_buffer
  69. const MaxPayloadSize = 1024
  70. type EventType uint32
  71. type EventReason uint32
  72. const (
  73. EventTypeProcessStart EventType = 1
  74. EventTypeProcessExit EventType = 2
  75. EventTypeConnectionOpen EventType = 3
  76. EventTypeConnectionClose EventType = 4
  77. EventTypeConnectionError EventType = 5
  78. EventTypeListenOpen EventType = 6
  79. EventTypeListenClose EventType = 7
  80. EventTypeFileOpen EventType = 8
  81. EventTypeTCPRetransmit EventType = 9
  82. EventTypeL7Request EventType = 10
  83. EventTypeFunEnt EventType = 11
  84. EventTypeFunRet EventType = 12
  85. EventTypeAcceptOpen EventType = 13
  86. EventTypeAcceptClose EventType = 14
  87. EventReasonNone EventReason = 0
  88. EventReasonOOMKill EventReason = 1
  89. )
  90. type TrafficStats struct {
  91. BytesSent uint64
  92. BytesReceived uint64
  93. }
  94. type Event struct {
  95. StackEvent *StackEvent
  96. Type EventType
  97. Reason EventReason
  98. Pid uint32
  99. SrcAddr netaddr.IPPort
  100. DstAddr netaddr.IPPort
  101. Fd uint64
  102. Timestamp uint64
  103. Duration time.Duration
  104. L7Request *l7.RequestData
  105. TrafficStats *TrafficStats
  106. FirstReadTime uint64
  107. FirstWriteTime uint64
  108. NewReadTime uint64
  109. }
  110. type perfMapType uint8
  111. const (
  112. perfMapTypeProcEvents perfMapType = 1
  113. perfMapTypeTCPEvents perfMapType = 2
  114. perfMapTypeFileEvents perfMapType = 3
  115. perfMapTypeL7Events perfMapType = 4
  116. perfMapTypeSocketEvents perfMapType = 5
  117. perfMapTypeEventQueue perfMapType = 6
  118. perfMapTypePythonThreadEvents perfMapType = 7
  119. )
  120. type Tracer struct {
  121. kernelVersion string
  122. disableL7Tracing bool
  123. disableE2ETracing bool
  124. disableStackTracing bool
  125. collection *ebpf.Collection
  126. collectionSpec *ebpf.CollectionSpec
  127. readers map[string]*perf.Reader
  128. links []link.Link
  129. uprobes map[string]*ebpf.Program
  130. Symbols []debugelf.Symbol
  131. Uprobes []tracer.Uprobe
  132. UprobesMap map[string]tracer.Uprobe
  133. }
  134. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  135. klog.Infof(
  136. "Tracing config | L7=%t, E2E=%t, L7Stack=%t",
  137. !disableL7Tracing,
  138. !disableE2ETracing,
  139. !disableStackTracing,
  140. )
  141. return &Tracer{
  142. kernelVersion: kernelVersion,
  143. disableL7Tracing: disableL7Tracing,
  144. disableE2ETracing: disableE2ETracing,
  145. disableStackTracing: disableStackTracing,
  146. readers: map[string]*perf.Reader{},
  147. uprobes: map[string]*ebpf.Program{},
  148. links: []link.Link{},
  149. }
  150. }
  151. func (t *Tracer) Run(events chan<- Event) error {
  152. if err := t.ebpf(events); err != nil {
  153. return err
  154. }
  155. if err := t.init(events); err != nil {
  156. return err
  157. }
  158. return nil
  159. }
  160. func (t *Tracer) Close() {
  161. for k, p := range t.uprobes {
  162. if p != nil {
  163. err := p.Close()
  164. klog.WithError(err).Infof("[close] uprobes %s", k)
  165. }
  166. }
  167. for _, l := range t.links {
  168. if l != nil {
  169. err := l.Close()
  170. klog.WithError(err).Infof("[close] links")
  171. }
  172. }
  173. for k, r := range t.readers {
  174. if r != nil {
  175. err := r.Close()
  176. klog.WithError(err).Infof("[close] readers %s", k)
  177. }
  178. }
  179. if t.collection != nil {
  180. t.collection.Close()
  181. }
  182. }
  183. func (t *Tracer) init(ch chan<- Event) error {
  184. pids, err := proc.ListPids()
  185. if err != nil {
  186. return fmt.Errorf("failed to list pids: %w", err)
  187. }
  188. for _, pid := range pids {
  189. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  190. }
  191. fds, sockets := readFds(pids)
  192. for _, fd := range fds {
  193. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  194. }
  195. listens := map[uint64]bool{}
  196. for _, s := range sockets {
  197. if s.Listen {
  198. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  199. }
  200. }
  201. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  202. timestamp := uint64(time.Now().UnixNano())
  203. for _, s := range sockets {
  204. typ := EventTypeConnectionOpen
  205. if s.Listen {
  206. typ = EventTypeListenOpen
  207. //} else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  208. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] { // 存在误判
  209. continue
  210. }
  211. ch <- Event{
  212. Type: typ,
  213. Pid: s.pid,
  214. Timestamp: timestamp,
  215. Fd: s.fd,
  216. SrcAddr: s.SAddr,
  217. DstAddr: s.DAddr,
  218. }
  219. if typ == EventTypeConnectionOpen {
  220. id := ConnectionId{FD: s.fd, PID: s.pid}
  221. sip := s.SAddr.IP()
  222. sipbytes := sip.As16()
  223. dip := s.DAddr.IP()
  224. dipbytes := dip.As16()
  225. conn := Connection{Timestamp: timestamp, Saddr: sipbytes, Sport: s.SAddr.Port(), Daddr: dipbytes, Dport: s.DAddr.Port()}
  226. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  227. klog.Warningln(err)
  228. }
  229. }
  230. }
  231. return nil
  232. }
  233. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  234. return t.collection.Maps["active_connections"].Iterate()
  235. }
  236. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  237. return t.collection.Maps["active_accepts"].Iterate()
  238. }
  239. type perfMap struct {
  240. name string
  241. perCPUBufferSizePages int
  242. typ perfMapType
  243. }
  244. func (t *Tracer) ebpf(ch chan<- Event) error {
  245. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  246. path, prg, err := EbpfCode(kv)
  247. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  248. if len(prg) == 0 || err != nil {
  249. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  250. }
  251. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  252. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  253. if debugFsErr != nil && traceFsErr != nil {
  254. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  255. }
  256. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  257. if err != nil {
  258. return fmt.Errorf("failed to load collection spec: %w", err)
  259. }
  260. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  261. tracer.PidFilter(collectionSpec)
  262. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  263. klog.Infof("[start] Look eBPF .maps")
  264. for _, spec := range collectionSpec.Maps {
  265. klog.Infoln(spec.Name)
  266. }
  267. klog.Infof("[end] Look eBPF .maps")
  268. tracer.MapInit(collectionSpec, opts)
  269. // TODO 多进程
  270. // tracer.SetConstants(collectionSpec)
  271. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  272. if err != nil {
  273. var verr *ebpf.VerifierError
  274. if errors.As(err, &verr) {
  275. klog.Errorf("----%+v", verr)
  276. }
  277. return fmt.Errorf("failed to load collection: %w", err)
  278. }
  279. tracer.Offset()
  280. t.collectionSpec = collectionSpec
  281. t.collection = c
  282. perfMaps := []perfMap{
  283. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  284. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  285. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  286. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  287. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  288. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  289. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  290. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  291. }
  292. tracer.MapInsert(c)
  293. if !t.DisableL7Tracing() {
  294. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  295. }
  296. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  297. klog.Infof("[start] Look eBPF perf_maps")
  298. for _, pm := range perfMaps {
  299. klog.Infoln(pm.name)
  300. m, ok := t.collection.Maps[pm.name]
  301. if ok {
  302. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  303. if err != nil {
  304. t.Close()
  305. return fmt.Errorf("failed to create ebpf reader: %w", err)
  306. }
  307. t.readers[pm.name] = r
  308. // event监听
  309. //go runEventsReader(pm.name, r, ch, pm.typ)
  310. try.GoParams(runEventsReader, utils.CatchFn, pm.name, r, ch, pm.typ)
  311. }
  312. }
  313. klog.Infof("[end] Look eBPF perf_maps")
  314. klog.Infof("[start] Look eBPF specPrograms")
  315. if err = t.LinkEbpfProg(); err != nil {
  316. return err
  317. }
  318. klog.Infof("[end] Look eBPF specPrograms")
  319. return nil
  320. }
  321. func (t *Tracer) LinkEbpfProg() error {
  322. klog.Infof("[start] Look eBPF specPrograms")
  323. var (
  324. l link.Link
  325. err error
  326. lastErr error
  327. )
  328. for _, programSpec := range t.collectionSpec.Programs {
  329. program := t.collection.Programs[programSpec.Name]
  330. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  331. if t.DisableL7Tracing() {
  332. switch programSpec.Name {
  333. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  334. continue
  335. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  336. continue
  337. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  338. continue
  339. }
  340. }
  341. switch programSpec.Type {
  342. case ebpf.TracePoint:
  343. if strings.Contains(programSpec.SectionName, "prog") {
  344. continue
  345. }
  346. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  347. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  348. case ebpf.Kprobe:
  349. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  350. t.uprobes[programSpec.Name] = program
  351. continue
  352. }
  353. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  354. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  355. if err == nil {
  356. t.links = append(t.links, l)
  357. }
  358. continue
  359. }
  360. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  361. }
  362. if err != nil {
  363. lastErr = err
  364. t.Close()
  365. klog.Errorf("LinkEbpfProg failed to program[%s] link program: %s", programSpec.Name, err)
  366. //return fmt.Errorf("failed to link program: %w", err)
  367. } else {
  368. t.links = append(t.links, l)
  369. }
  370. }
  371. klog.Infof("[end] Look eBPF specPrograms")
  372. if lastErr != nil {
  373. return fmt.Errorf("failed to link program: %w", lastErr)
  374. }
  375. return nil
  376. }
  377. func (t *Tracer) UnlinkEbpfProg() error {
  378. var (
  379. lastErr error
  380. err error
  381. )
  382. /* 此处不应该处理 t.uprobes 中 ebpf-program
  383. for pName, p := range t.uprobes {
  384. if err = p.Close(); err != nil {
  385. lastErr = err
  386. klog.Errorf("UnlinkEbpfProg close program[%s] uprobe occurs error: %s", pName, err.Error())
  387. }
  388. }
  389. */
  390. for _, l := range t.links {
  391. if err = l.Close(); err != nil {
  392. lastErr = err
  393. klog.Errorf("UnlinkEbpfProg close link occurs error: %s", err.Error())
  394. }
  395. }
  396. return lastErr
  397. }
  398. func (t EventType) Int() int {
  399. return int(t)
  400. }
  401. func (t EventType) String() string {
  402. switch t {
  403. case EventTypeProcessStart:
  404. return "process-start"
  405. case EventTypeProcessExit:
  406. return "process-exit"
  407. case EventTypeConnectionOpen:
  408. return "connection-open"
  409. case EventTypeConnectionClose:
  410. return "connection-close"
  411. case EventTypeConnectionError:
  412. return "connection-error"
  413. case EventTypeListenOpen:
  414. return "listen-open"
  415. case EventTypeListenClose:
  416. return "listen-close"
  417. case EventTypeFileOpen:
  418. return "file-open"
  419. case EventTypeTCPRetransmit:
  420. return "tcp-retransmit"
  421. case EventTypeL7Request:
  422. return "l7-request"
  423. }
  424. return "unknown: " + strconv.Itoa(int(t))
  425. }
  426. func (t EventReason) String() string {
  427. switch t {
  428. case EventReasonNone:
  429. return "none"
  430. case EventReasonOOMKill:
  431. return "oom-kill"
  432. }
  433. return "unknown: " + strconv.Itoa(int(t))
  434. }
  435. type procEvent struct {
  436. Type EventType
  437. Pid uint32
  438. Reason uint32
  439. }
  440. type tcpEvent struct {
  441. Fd uint64
  442. Timestamp uint64
  443. Duration uint64
  444. FirstReadTime uint64
  445. FirstWriteTime uint64
  446. NewReadTime uint64
  447. Type EventType
  448. Pid uint32
  449. BytesSent uint64
  450. BytesReceived uint64
  451. SPort uint16
  452. DPort uint16
  453. SAddr [16]byte
  454. DAddr [16]byte
  455. }
  456. type fileEvent struct {
  457. Type EventType
  458. Pid uint32
  459. Fd uint64
  460. }
  461. // struct l7_event in l7.c
  462. type l7Event struct {
  463. Fd uint64
  464. ConnectionTimestamp uint64
  465. Pid uint32
  466. Status uint32
  467. Duration uint64
  468. Protocol uint8
  469. Method uint8
  470. Padding uint16
  471. StatementId uint32
  472. PayloadSize uint64
  473. TraceId uint64
  474. StartAt uint64 // ns
  475. EndtAt uint64 // ns
  476. TraceStart uint32
  477. TraceEnd uint32
  478. TraceType uint32
  479. EventCount uint32
  480. Sport uint16
  481. Dport uint16
  482. SAddr HashByte16
  483. DAddr HashByte16
  484. ComponentSport uint16
  485. ComponentDport uint16
  486. IsTls uint16
  487. ComponentSAddr HashByte16
  488. ComponentDAddr HashByte16
  489. AssumedAppId HashByte
  490. SpanId HashByte
  491. TraceIdFrom HashByte16
  492. CalledId HashByte
  493. InstanceIdFrom HashByte
  494. AppIdFrom HashByte
  495. SpanIdFrom HashByte
  496. TypeFrom [1]byte
  497. SysvcFrom [76]byte // apmother header value: NN:ParentServiceName:MM:ParentServiceSysTag
  498. RPCTarget [64]byte
  499. ErrorMsg HashByte128
  500. MQ struct {
  501. Topic [256]byte // MQ topic (e.g., Kafka topic)
  502. Key [256]byte // MQ key (e.g., Kafka message key)
  503. }
  504. }
  505. type SocketDataBufferddd struct {
  506. EventsNum uint32
  507. Len uint32
  508. Data [32760]byte
  509. }
  510. const (
  511. TASK_COMM_LEN = 16
  512. BURST_DATA_BUF_SIZE = 8192
  513. )
  514. type Tuple struct {
  515. Daddr [16]uint8
  516. RcvSaddr [16]uint8
  517. AddrLen uint8
  518. L4Protocol uint8
  519. Dport uint16
  520. Num uint16
  521. }
  522. type SocketDatadddd struct {
  523. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  524. Tgid uint32 // 进程号
  525. CoroutineID uint64
  526. Source uint8
  527. Comm [TASK_COMM_LEN]byte
  528. SocketID uint64
  529. Tuple Tuple
  530. ExtraData uint32
  531. ExtraDataCount uint32
  532. TcpSeq uint32
  533. ThreadTraceID uint64
  534. Timestamp uint64
  535. Direction uint8
  536. MsgType uint8
  537. SyscallLen uint64
  538. DataSeq uint64
  539. DataType uint16
  540. DataLen uint16
  541. Data [BURST_DATA_BUF_SIZE]byte
  542. }
  543. type StackEvent struct {
  544. Type uint64
  545. Pid uint64
  546. TraceId uint64
  547. Goid uint64
  548. Ip uint64
  549. Bp uint64
  550. CallerIp uint64
  551. CallerBp uint64
  552. TimeNsStart uint64
  553. TimeNsEnd uint64
  554. // Nid uint64
  555. // Fpid uint64
  556. // Level uint64
  557. Location byte
  558. ClassName [100]byte
  559. MethedName [100]byte
  560. }
  561. type StackFunEvent struct {
  562. StackEvent StackEvent
  563. Uprobe *tracer.Uprobe
  564. }
  565. type pythonThreadEvent struct {
  566. Type EventType
  567. Pid uint32
  568. Duration uint64
  569. }
  570. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  571. for {
  572. rec, err := r.Read()
  573. if err != nil {
  574. if errors.Is(err, perf.ErrClosed) {
  575. break
  576. }
  577. continue
  578. }
  579. if rec.LostSamples > 0 {
  580. klog.Errorln(name, "lost samples:", rec.LostSamples)
  581. continue
  582. }
  583. var event Event
  584. switch typ {
  585. case perfMapTypeSocketEvents:
  586. //fmt.Println("perfMapTypeSocketEvents")
  587. // 假设 rec.RawSample 包含数据,类型为 []byte
  588. //rawData := rec.RawSample
  589. //fmt.Println("perfMapTypeSocketEvents2")
  590. //
  591. //// 创建一个 SocketDataBuffer 结构体实例
  592. //var buffer SocketDataBuffer
  593. //
  594. //// 创建一个字节缓冲区,并将数据填充到其中
  595. //reader := bytes.NewReader(rawData)
  596. //fmt.Println("perfMapTypeSocketEvents3")
  597. //fmt.Println(len(rawData))
  598. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  599. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  600. // fmt.Println(reader.Len())
  601. // fmt.Println("Failed to read data:", err)
  602. // continue
  603. //}
  604. //fmt.Println("perfMapTypeSocketEvents4")
  605. //
  606. //// 打印解析后的数据
  607. //fmt.Println("EventsNum:", buffer.EventsNum)
  608. //fmt.Println("Len:", buffer.Len)
  609. //
  610. //// 打印 char data 的内容
  611. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  612. //socketDataBuffer := rec.RawSample
  613. // 解析 __socket_data_buffer
  614. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  615. //
  616. //// 获取 char data[32760];
  617. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  618. //// todo
  619. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  620. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  621. //
  622. //// 解析C结构体中的data字段
  623. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  624. //// 打印或处理包含的数据
  625. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  626. /*todo */
  627. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  628. //dataPtr := unsafe.Pointer(&buf.data[0])
  629. //socketData := (*SocketData)(dataPtr)
  630. //reader2 := bytes.NewBuffer(rec.RawSample)
  631. // 222222
  632. //fmt.Println("socketData.Pid:", socketData.pid)
  633. //fmt.Println("socketData.Tgid:", socketData.tgid)
  634. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  635. //fmt.Println("socketData.Source:", socketData.source)
  636. //
  637. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  638. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  639. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  640. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  641. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  642. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  643. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  644. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  645. //fmt.Println("socketData.Direction:", socketData.Direction)
  646. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  647. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  648. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  649. //socketData := &SocketData{}
  650. //reader := bytes.NewBuffer(rec.RawSample)
  651. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  652. // klog.Warningln("failed1 to read msg:", err)
  653. // continue
  654. //}
  655. //
  656. //var data []byte
  657. //payload := reader.Bytes()
  658. //switch {
  659. //case v.Len == 0:
  660. //case v.Len > 32760:
  661. // data = payload[:32760]
  662. //default:
  663. // data = payload[:v.Len]
  664. //}
  665. //////data2 := data[:v.Len]
  666. ////fmt.Println("perfMapTypeSocketEvents")
  667. //fmt.Println(v.EventsNum)
  668. //fmt.Println(v.Len)
  669. //fmt.Println(string(data))
  670. //
  671. //var data2 SocketData
  672. //reader2 := bytes.NewBuffer(data)
  673. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  674. // klog.Warningln("failed2 to read msg:", err)
  675. // continue
  676. //}
  677. //
  678. //fmt.Println(data2.Pid)
  679. //fmt.Println(data2.Tgid)
  680. //fmt.Println(string(v.Data))
  681. //continue
  682. case perfMapTypeL7Events:
  683. v := &l7Event{}
  684. reader := bytes.NewBuffer(rec.RawSample)
  685. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  686. klog.Warningln("failed to read msg:", err)
  687. continue
  688. }
  689. //fmt.Println("v.TraceIdFrom")
  690. //fmt.Println(v.TraceIdFrom)
  691. //a := hex.EncodeToString(v.TraceIdFrom[:])
  692. //for _, b := range v.AssumedAppId {
  693. // fmt.Printf("v.AssumedAppId- %02\n", b)
  694. //}
  695. //fmt.Println(a)
  696. payload := reader.Bytes()
  697. req := &l7.RequestData{
  698. Protocol: l7.Protocol(v.Protocol),
  699. Pid: v.Pid,
  700. Status: l7.Status(v.Status),
  701. Duration: time.Duration(v.Duration),
  702. Method: l7.Method(v.Method),
  703. StatementId: v.StatementId,
  704. TraceId: v.TraceId,
  705. TraceStart: v.TraceStart,
  706. TraceEnd: v.TraceEnd,
  707. TraceType: v.TraceType,
  708. EventCount: v.EventCount,
  709. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  710. SpanId: hex.EncodeToString(v.SpanId[:]),
  711. StartAt: v.StartAt,
  712. EndAt: v.EndtAt,
  713. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  714. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  715. DestAddrString: utils.BytesToString(v.RPCTarget[:]),
  716. ErrorMsg: utils.BytesToString(v.ErrorMsg[:]),
  717. IsTls: v.IsTls > 0,
  718. MQTopic: utils.BytesToString(v.MQ.Topic[:]),
  719. MQKey: utils.BytesToString(v.MQ.Key[:]),
  720. }
  721. if req.Protocol == l7.ProtocolHTTP {
  722. klog.Debugf("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  723. klog.Debugf("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  724. }
  725. if v.TraceEnd == TRACE_STATUS {
  726. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  727. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  728. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  729. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  730. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  731. req.ParentSpanContext.TypeFrom = l7.TypeFrom(hex.EncodeToString(v.TypeFrom[:]))
  732. req.SysvcFrom = utils.BytesToString(v.SysvcFrom[:])
  733. klog.Debugf("apmother '%s'", req.SysvcFrom)
  734. // klog.Debugf("req.ParentSpanContext.TraceIdFrom %s", req.ParentSpanContext.TraceIdFrom)
  735. // klog.Debugf("req.ParentSpanContext.TypeFrom %s", req.ParentSpanContext.TypeFrom)
  736. req.SAddr = ipPort(v.SAddr, v.Sport)
  737. req.DAddr = ipPort(v.DAddr, v.Dport)
  738. // klog.Debugf("runEventsReader SAddr.String %s", req.SAddr.String())
  739. // klog.Debugf("runEventsReader DAddr.String %s", req.DAddr.String())
  740. }
  741. switch {
  742. case v.PayloadSize == 0:
  743. case v.PayloadSize > MaxPayloadSize:
  744. req.Payload = payload[:MaxPayloadSize]
  745. default:
  746. req.Payload = payload[:v.PayloadSize]
  747. }
  748. //fmt.Println("==========")
  749. //fmt.Println("req.Payload:", string(req.Payload))
  750. //fmt.Println("==========")
  751. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  752. case perfMapTypeFileEvents:
  753. v := &fileEvent{}
  754. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  755. klog.Warningln("failed to read msg:", err)
  756. continue
  757. }
  758. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  759. case perfMapTypeProcEvents:
  760. v := &procEvent{}
  761. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  762. klog.Warningln("failed to read msg:", err)
  763. continue
  764. }
  765. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  766. case perfMapTypeTCPEvents:
  767. v := &tcpEvent{}
  768. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  769. klog.Warningln("failed to read msg:", err)
  770. continue
  771. }
  772. event = Event{
  773. Type: v.Type,
  774. Pid: v.Pid,
  775. SrcAddr: ipPort(v.SAddr, v.SPort),
  776. DstAddr: ipPort(v.DAddr, v.DPort),
  777. Fd: v.Fd,
  778. Timestamp: v.Timestamp,
  779. Duration: time.Duration(v.Duration),
  780. }
  781. if v.Type == EventTypeConnectionClose {
  782. event.TrafficStats = &TrafficStats{
  783. BytesSent: v.BytesSent,
  784. BytesReceived: v.BytesReceived,
  785. }
  786. }
  787. event.FirstReadTime = v.FirstReadTime
  788. event.FirstWriteTime = v.FirstWriteTime
  789. event.NewReadTime = v.NewReadTime
  790. if v.Type == EventTypeAcceptClose {
  791. event.TrafficStats = &TrafficStats{
  792. BytesSent: v.BytesSent,
  793. BytesReceived: v.BytesReceived,
  794. }
  795. }
  796. case perfMapTypePythonThreadEvents:
  797. v := &pythonThreadEvent{}
  798. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  799. klog.Warningln("failed to read msg:", err)
  800. continue
  801. }
  802. event = Event{
  803. Type: v.Type,
  804. Pid: v.Pid,
  805. Duration: time.Duration(v.Duration),
  806. }
  807. case perfMapTypeEventQueue:
  808. v := &StackEvent{}
  809. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  810. klog.Warningln("failed to read msg:", err)
  811. continue
  812. }
  813. event = Event{
  814. Type: EventTypeFunEnt,
  815. StackEvent: v,
  816. }
  817. default:
  818. continue
  819. }
  820. ch <- event
  821. }
  822. }
  823. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  824. i, _ := netaddr.FromStdIP(ip[:])
  825. return netaddr.IPPortFrom(i, port)
  826. }
  827. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  828. fmt.Println("InitKProcInfo111111111111111111111111111111111")
  829. var err error
  830. var info EbpfProcInfo
  831. if appInfo.EBPFProcInfo == nil {
  832. info = EbpfProcInfo{
  833. InstanceId: appInfo.InstanceIdHash.HashtVal,
  834. AppId: appInfo.AppIdHash.HashtVal,
  835. CodeType: uint16(appInfo.CodeType),
  836. }
  837. } else {
  838. info = *appInfo.EBPFProcInfo
  839. info.AppId = appInfo.AppIdHash.HashtVal
  840. }
  841. // 构建 sysvc 值:{app_name_len}:app_name:{appServiceType_len}:appServiceType:{SysTagLen}[:SysTag]
  842. // appServiceType 固定为 "APPLICATION"
  843. appName := appInfo.AppName
  844. nodeInfo := NODE_INFO.GetNodeInfo()
  845. sysTag := ""
  846. if nodeInfo != nil {
  847. sysTag = nodeInfo.SysTag
  848. }
  849. appServiceType := "APPLICATION"
  850. appNameLen := len(appName)
  851. appServiceTypeLen := len(appServiceType)
  852. sysTagLen := len(sysTag)
  853. // 限制长度,避免超出数组大小
  854. if appNameLen > 32 {
  855. appNameLen = 32
  856. appName = appName[:32]
  857. }
  858. if sysTagLen > 32 {
  859. sysTagLen = 32
  860. sysTag = sysTag[:32]
  861. }
  862. // 格式:{app_name_len}:app_name:{appServiceType_len}:appServiceType[:{SysTagLen}:SysTag]
  863. // 如果 sysTag 为空,则为 {app_name_len}:app_name:{appServiceType_len}:appServiceType
  864. // 如果 sysTag 不为空,则为 {app_name_len}:app_name:{appServiceType_len}:appServiceType:{SysTagLen}:SysTag
  865. // 长度不足2位时前面补0,例如:08:service:12:APPLICATION 或 08:service:12:APPLICATION:03:tag
  866. var sysvcStr string
  867. if sysTagLen == 0 {
  868. sysvcStr = fmt.Sprintf("%02d:%s:%02d:%s", appNameLen, appName, appServiceTypeLen, appServiceType)
  869. } else {
  870. sysvcStr = fmt.Sprintf("%02d:%s:%02d:%s:%02d:%s", appNameLen, appName, appServiceTypeLen, appServiceType, sysTagLen, sysTag)
  871. }
  872. // 复制到字节数组,确保不超过 76 字节
  873. sysvcBytes := []byte(sysvcStr)
  874. if len(sysvcBytes) > 76 {
  875. sysvcBytes = sysvcBytes[:76]
  876. }
  877. copy(info.Sysvc[:], sysvcBytes)
  878. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  879. if err != nil {
  880. klog.Error("failed to update program info", err)
  881. }
  882. appInfo.EBPFProcInfo = &info
  883. return err
  884. }
  885. func (t *Tracer) DelKProcInfo(pid uint32) error {
  886. _, err := tracer.DelProcInfoFromMap(t.collection, pid)
  887. if err != nil {
  888. klog.WithField("pid", pid).Error("failed to delete proc info", err)
  889. }
  890. return err
  891. }
  892. // TODO check language
  893. func (t *Tracer) DisableL7Tracing() bool {
  894. return t.disableL7Tracing
  895. }
  896. func (t *Tracer) DisableE2ETracing() bool {
  897. return t.disableE2ETracing
  898. }
  899. func (t *Tracer) DisableStackTracing() bool {
  900. return t.disableStackTracing
  901. }