tracer.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/cilium/ebpf/perf"
  16. "github.com/coroot/coroot-node-agent/common"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. . "github.com/coroot/coroot-node-agent/utils/modelse"
  21. klog "github.com/sirupsen/logrus"
  22. "golang.org/x/sys/unix"
  23. "inet.af/netaddr"
  24. )
  25. /*
  26. #define TASK_COMM_LEN 16
  27. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  28. #include <linux/types.h>
  29. struct __tuple_t {
  30. __u8 daddr[16];
  31. __u8 rcv_saddr[16];
  32. __u8 addr_len;
  33. __u8 l4_protocol;
  34. __u16 dport;
  35. __u16 num;
  36. };
  37. struct __socket_data {
  38. __u32 pid;
  39. __u32 tgid;
  40. __u64 coroutine_id;
  41. __u8 source;
  42. __u8 comm[TASK_COMM_LEN];
  43. __u64 socket_id;
  44. struct __tuple_t tuple;
  45. __u32 extra_data;
  46. __u32 extra_data_count;
  47. __u32 tcp_seq;
  48. __u64 thread_trace_id;
  49. __u64 timestamp;
  50. __u8 direction: 1;
  51. __u8 msg_type: 7;
  52. __u64 syscall_len;
  53. __u64 data_seq;
  54. __u16 data_type;
  55. __u16 data_len;
  56. char data[BURST_DATA_BUF_SIZE];
  57. } __attribute__((packed));
  58. struct __socket_data_buffer {
  59. __u32 events_num;
  60. __u32 len;
  61. char data[32760];
  62. };
  63. */
  64. import "C"
  65. type SocketData C.struct___socket_data
  66. type SocketDataBuffer C.struct___socket_data_buffer
  67. const MaxPayloadSize = 1024
  68. type EventType uint32
  69. type EventReason uint32
  70. const (
  71. EventTypeProcessStart EventType = 1
  72. EventTypeProcessExit EventType = 2
  73. EventTypeConnectionOpen EventType = 3
  74. EventTypeConnectionClose EventType = 4
  75. EventTypeConnectionError EventType = 5
  76. EventTypeListenOpen EventType = 6
  77. EventTypeListenClose EventType = 7
  78. EventTypeFileOpen EventType = 8
  79. EventTypeTCPRetransmit EventType = 9
  80. EventTypeL7Request EventType = 10
  81. EventTypeFunEnt EventType = 11
  82. EventTypeFunRet EventType = 12
  83. EventTypeAcceptOpen EventType = 13
  84. EventTypeAcceptClose EventType = 14
  85. EventReasonNone EventReason = 0
  86. EventReasonOOMKill EventReason = 1
  87. )
  88. type TrafficStats struct {
  89. BytesSent uint64
  90. BytesReceived uint64
  91. }
  92. type Event struct {
  93. StackEvent *StackEvent
  94. Type EventType
  95. Reason EventReason
  96. Pid uint32
  97. SrcAddr netaddr.IPPort
  98. DstAddr netaddr.IPPort
  99. Fd uint64
  100. Timestamp uint64
  101. Duration time.Duration
  102. L7Request *l7.RequestData
  103. TrafficStats *TrafficStats
  104. FirstReadTime uint64
  105. FirstWriteTime uint64
  106. NewReadTime uint64
  107. }
  108. type perfMapType uint8
  109. const (
  110. perfMapTypeProcEvents perfMapType = 1
  111. perfMapTypeTCPEvents perfMapType = 2
  112. perfMapTypeFileEvents perfMapType = 3
  113. perfMapTypeL7Events perfMapType = 4
  114. perfMapTypeSocketEvents perfMapType = 5
  115. perfMapTypeEventQueue perfMapType = 6
  116. perfMapTypePythonThreadEvents perfMapType = 7
  117. )
  118. type Tracer struct {
  119. kernelVersion string
  120. disableL7Tracing bool
  121. disableE2ETracing bool
  122. disableStackTracing bool
  123. collection *ebpf.Collection
  124. collectionSpec *ebpf.CollectionSpec
  125. readers map[string]*perf.Reader
  126. links []link.Link
  127. uprobes map[string]*ebpf.Program
  128. Symbols []debugelf.Symbol
  129. Uprobes []tracer.Uprobe
  130. UprobesMap map[string]tracer.Uprobe
  131. }
  132. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  133. if disableL7Tracing {
  134. klog.Infoln("L7 tracing is disabled")
  135. }
  136. if disableE2ETracing {
  137. klog.Infoln("L7 tracing is disabled")
  138. }
  139. if disableStackTracing {
  140. klog.Infoln("L7 stack is disabled")
  141. }
  142. return &Tracer{
  143. kernelVersion: kernelVersion,
  144. disableL7Tracing: disableL7Tracing,
  145. disableE2ETracing: disableE2ETracing,
  146. disableStackTracing: disableStackTracing,
  147. readers: map[string]*perf.Reader{},
  148. uprobes: map[string]*ebpf.Program{},
  149. }
  150. }
  151. func (t *Tracer) Run(events chan<- Event) error {
  152. if err := t.ebpf(events); err != nil {
  153. return err
  154. }
  155. if err := t.init(events); err != nil {
  156. return err
  157. }
  158. return nil
  159. }
  160. func (t *Tracer) Close() {
  161. for _, p := range t.uprobes {
  162. _ = p.Close()
  163. }
  164. for _, l := range t.links {
  165. _ = l.Close()
  166. }
  167. for _, r := range t.readers {
  168. _ = r.Close()
  169. }
  170. t.collection.Close()
  171. }
  172. func (t *Tracer) init(ch chan<- Event) error {
  173. pids, err := proc.ListPids()
  174. if err != nil {
  175. return fmt.Errorf("failed to list pids: %w", err)
  176. }
  177. for _, pid := range pids {
  178. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  179. }
  180. fds, sockets := readFds(pids)
  181. for _, fd := range fds {
  182. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  183. }
  184. listens := map[uint64]bool{}
  185. for _, s := range sockets {
  186. if s.Listen {
  187. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  188. }
  189. }
  190. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  191. timestamp := uint64(time.Now().UnixNano())
  192. for _, s := range sockets {
  193. typ := EventTypeConnectionOpen
  194. if s.Listen {
  195. typ = EventTypeListenOpen
  196. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  197. continue
  198. }
  199. ch <- Event{
  200. Type: typ,
  201. Pid: s.pid,
  202. Timestamp: timestamp,
  203. Fd: s.fd,
  204. SrcAddr: s.SAddr,
  205. DstAddr: s.DAddr,
  206. }
  207. if typ == EventTypeConnectionOpen {
  208. id := ConnectionId{FD: s.fd, PID: s.pid}
  209. conn := Connection{Timestamp: timestamp}
  210. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  211. klog.Warningln(err)
  212. }
  213. }
  214. }
  215. return nil
  216. }
  217. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  218. return t.collection.Maps["active_connections"].Iterate()
  219. }
  220. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  221. return t.collection.Maps["active_accepts"].Iterate()
  222. }
  223. type perfMap struct {
  224. name string
  225. perCPUBufferSizePages int
  226. typ perfMapType
  227. }
  228. func (t *Tracer) ebpf(ch chan<- Event) error {
  229. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  230. path, prg, err := EbpfCode(kv)
  231. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  232. if len(prg) == 0 || err != nil {
  233. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  234. }
  235. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  236. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  237. if debugFsErr != nil && traceFsErr != nil {
  238. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  239. }
  240. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  241. if err != nil {
  242. return fmt.Errorf("failed to load collection spec: %w", err)
  243. }
  244. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  245. tracer.PidFilter(collectionSpec)
  246. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  247. klog.Infof("[start] Look eBPF .maps")
  248. for _, spec := range collectionSpec.Maps {
  249. klog.Infoln(spec.Name)
  250. }
  251. klog.Infof("[end] Look eBPF .maps")
  252. tracer.MapInit(collectionSpec, opts)
  253. // TODO 多进程
  254. // tracer.SetConstants(collectionSpec)
  255. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  256. if err != nil {
  257. var verr *ebpf.VerifierError
  258. if errors.As(err, &verr) {
  259. klog.Errorf("----%+v", verr)
  260. }
  261. return fmt.Errorf("failed to load collection: %w", err)
  262. }
  263. tracer.Offset()
  264. t.collectionSpec = collectionSpec
  265. t.collection = c
  266. perfMaps := []perfMap{
  267. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  268. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  269. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  270. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  271. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  272. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  273. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  274. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  275. }
  276. tracer.MapInsert(c)
  277. if !t.DisableL7Tracing() {
  278. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  279. }
  280. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  281. klog.Infof("[start] Look eBPF perf_maps")
  282. for _, pm := range perfMaps {
  283. klog.Infoln(pm.name)
  284. m, ok := t.collection.Maps[pm.name]
  285. if ok {
  286. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  287. if err != nil {
  288. t.Close()
  289. return fmt.Errorf("failed to create ebpf reader: %w", err)
  290. }
  291. t.readers[pm.name] = r
  292. // event监听
  293. go runEventsReader(pm.name, r, ch, pm.typ)
  294. }
  295. }
  296. klog.Infof("[end] Look eBPF perf_maps")
  297. klog.Infof("[start] Look eBPF specPrograms")
  298. if err = t.LinkEbpfProg(); err != nil {
  299. return err
  300. }
  301. klog.Infof("[end] Look eBPF specPrograms")
  302. return nil
  303. }
  304. func (t *Tracer) LinkEbpfProg() error {
  305. klog.Infof("[start] Look eBPF specPrograms")
  306. var (
  307. l link.Link
  308. err error
  309. )
  310. for _, programSpec := range t.collectionSpec.Programs {
  311. program := t.collection.Programs[programSpec.Name]
  312. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  313. if t.DisableL7Tracing() {
  314. switch programSpec.Name {
  315. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  316. continue
  317. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  318. continue
  319. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  320. continue
  321. }
  322. }
  323. switch programSpec.Type {
  324. case ebpf.TracePoint:
  325. if strings.Contains(programSpec.SectionName, "prog") {
  326. continue
  327. }
  328. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  329. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  330. case ebpf.Kprobe:
  331. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  332. t.uprobes[programSpec.Name] = program
  333. continue
  334. }
  335. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  336. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  337. t.links = append(t.links, l)
  338. continue
  339. }
  340. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  341. }
  342. if err != nil {
  343. t.Close()
  344. return fmt.Errorf("failed to link program: %w", err)
  345. }
  346. t.links = append(t.links, l)
  347. }
  348. klog.Infof("[end] Look eBPF specPrograms")
  349. return nil
  350. }
  351. func (t *Tracer) UnlinkEbpfProg() {
  352. for _, p := range t.uprobes {
  353. _ = p.Close()
  354. }
  355. for _, l := range t.links {
  356. _ = l.Close()
  357. }
  358. }
  359. func (t EventType) Int() int {
  360. return int(t)
  361. }
  362. func (t EventType) String() string {
  363. switch t {
  364. case EventTypeProcessStart:
  365. return "process-start"
  366. case EventTypeProcessExit:
  367. return "process-exit"
  368. case EventTypeConnectionOpen:
  369. return "connection-open"
  370. case EventTypeConnectionClose:
  371. return "connection-close"
  372. case EventTypeConnectionError:
  373. return "connection-error"
  374. case EventTypeListenOpen:
  375. return "listen-open"
  376. case EventTypeListenClose:
  377. return "listen-close"
  378. case EventTypeFileOpen:
  379. return "file-open"
  380. case EventTypeTCPRetransmit:
  381. return "tcp-retransmit"
  382. case EventTypeL7Request:
  383. return "l7-request"
  384. }
  385. return "unknown: " + strconv.Itoa(int(t))
  386. }
  387. func (t EventReason) String() string {
  388. switch t {
  389. case EventReasonNone:
  390. return "none"
  391. case EventReasonOOMKill:
  392. return "oom-kill"
  393. }
  394. return "unknown: " + strconv.Itoa(int(t))
  395. }
  396. type procEvent struct {
  397. Type EventType
  398. Pid uint32
  399. Reason uint32
  400. }
  401. type tcpEvent struct {
  402. Fd uint64
  403. Timestamp uint64
  404. Duration uint64
  405. FirstReadTime uint64
  406. FirstWriteTime uint64
  407. NewReadTime uint64
  408. Type EventType
  409. Pid uint32
  410. BytesSent uint64
  411. BytesReceived uint64
  412. SPort uint16
  413. DPort uint16
  414. SAddr [16]byte
  415. DAddr [16]byte
  416. }
  417. type fileEvent struct {
  418. Type EventType
  419. Pid uint32
  420. Fd uint64
  421. }
  422. // struct l7_event in l7.c
  423. type l7Event struct {
  424. Fd uint64
  425. ConnectionTimestamp uint64
  426. Pid uint32
  427. Status uint32
  428. Duration uint64
  429. Protocol uint8
  430. Method uint8
  431. Padding uint16
  432. StatementId uint32
  433. PayloadSize uint64
  434. TraceId uint64
  435. StartAt uint64 // ns
  436. EndtAt uint64 // ns
  437. TraceStart uint32
  438. TraceEnd uint32
  439. EventCount uint32
  440. Sport uint16
  441. Dport uint16
  442. SAddr [16]byte
  443. DAddr [16]byte
  444. ComponentSport uint16
  445. ComponentDport uint16
  446. ComponentSAddr [16]byte
  447. ComponentDAddr [16]byte
  448. AssumedAppId HashByte
  449. SpanId HashByte
  450. TraceIdFrom HashByte16
  451. CalledId HashByte
  452. InstanceIdFrom HashByte
  453. AppIdFrom HashByte
  454. SpanIdFrom HashByte
  455. }
  456. type SocketDataBufferddd struct {
  457. EventsNum uint32
  458. Len uint32
  459. Data [32760]byte
  460. }
  461. const (
  462. TASK_COMM_LEN = 16
  463. BURST_DATA_BUF_SIZE = 8192
  464. )
  465. type Tuple struct {
  466. Daddr [16]uint8
  467. RcvSaddr [16]uint8
  468. AddrLen uint8
  469. L4Protocol uint8
  470. Dport uint16
  471. Num uint16
  472. }
  473. type SocketDatadddd struct {
  474. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  475. Tgid uint32 // 进程号
  476. CoroutineID uint64
  477. Source uint8
  478. Comm [TASK_COMM_LEN]byte
  479. SocketID uint64
  480. Tuple Tuple
  481. ExtraData uint32
  482. ExtraDataCount uint32
  483. TcpSeq uint32
  484. ThreadTraceID uint64
  485. Timestamp uint64
  486. Direction uint8
  487. MsgType uint8
  488. SyscallLen uint64
  489. DataSeq uint64
  490. DataType uint16
  491. DataLen uint16
  492. Data [BURST_DATA_BUF_SIZE]byte
  493. }
  494. type StackEvent struct {
  495. Type uint64
  496. Pid uint64
  497. TraceId uint64
  498. Goid uint64
  499. Ip uint64
  500. Bp uint64
  501. CallerIp uint64
  502. CallerBp uint64
  503. TimeNsStart uint64
  504. TimeNsEnd uint64
  505. // Nid uint64
  506. // Fpid uint64
  507. // Level uint64
  508. Location byte
  509. ClassName [100]byte
  510. MethedName [100]byte
  511. }
  512. type StackFunEvent struct {
  513. StackEvent StackEvent
  514. Uprobe *tracer.Uprobe
  515. }
  516. type pythonThreadEvent struct {
  517. Type EventType
  518. Pid uint32
  519. Duration uint64
  520. }
  521. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  522. for {
  523. rec, err := r.Read()
  524. if err != nil {
  525. if errors.Is(err, perf.ErrClosed) {
  526. break
  527. }
  528. continue
  529. }
  530. if rec.LostSamples > 0 {
  531. klog.Errorln(name, "lost samples:", rec.LostSamples)
  532. continue
  533. }
  534. var event Event
  535. switch typ {
  536. case perfMapTypeSocketEvents:
  537. //fmt.Println("perfMapTypeSocketEvents")
  538. // 假设 rec.RawSample 包含数据,类型为 []byte
  539. //rawData := rec.RawSample
  540. //fmt.Println("perfMapTypeSocketEvents2")
  541. //
  542. //// 创建一个 SocketDataBuffer 结构体实例
  543. //var buffer SocketDataBuffer
  544. //
  545. //// 创建一个字节缓冲区,并将数据填充到其中
  546. //reader := bytes.NewReader(rawData)
  547. //fmt.Println("perfMapTypeSocketEvents3")
  548. //fmt.Println(len(rawData))
  549. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  550. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  551. // fmt.Println(reader.Len())
  552. // fmt.Println("Failed to read data:", err)
  553. // continue
  554. //}
  555. //fmt.Println("perfMapTypeSocketEvents4")
  556. //
  557. //// 打印解析后的数据
  558. //fmt.Println("EventsNum:", buffer.EventsNum)
  559. //fmt.Println("Len:", buffer.Len)
  560. //
  561. //// 打印 char data 的内容
  562. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  563. //socketDataBuffer := rec.RawSample
  564. // 解析 __socket_data_buffer
  565. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  566. //
  567. //// 获取 char data[32760];
  568. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  569. //// todo
  570. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  571. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  572. //
  573. //// 解析C结构体中的data字段
  574. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  575. //// 打印或处理包含的数据
  576. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  577. /*todo */
  578. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  579. //dataPtr := unsafe.Pointer(&buf.data[0])
  580. //socketData := (*SocketData)(dataPtr)
  581. //reader2 := bytes.NewBuffer(rec.RawSample)
  582. // 222222
  583. //fmt.Println("socketData.Pid:", socketData.pid)
  584. //fmt.Println("socketData.Tgid:", socketData.tgid)
  585. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  586. //fmt.Println("socketData.Source:", socketData.source)
  587. //
  588. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  589. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  590. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  591. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  592. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  593. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  594. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  595. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  596. //fmt.Println("socketData.Direction:", socketData.Direction)
  597. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  598. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  599. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  600. //socketData := &SocketData{}
  601. //reader := bytes.NewBuffer(rec.RawSample)
  602. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  603. // klog.Warningln("failed1 to read msg:", err)
  604. // continue
  605. //}
  606. //
  607. //var data []byte
  608. //payload := reader.Bytes()
  609. //switch {
  610. //case v.Len == 0:
  611. //case v.Len > 32760:
  612. // data = payload[:32760]
  613. //default:
  614. // data = payload[:v.Len]
  615. //}
  616. //////data2 := data[:v.Len]
  617. ////fmt.Println("perfMapTypeSocketEvents")
  618. //fmt.Println(v.EventsNum)
  619. //fmt.Println(v.Len)
  620. //fmt.Println(string(data))
  621. //
  622. //var data2 SocketData
  623. //reader2 := bytes.NewBuffer(data)
  624. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  625. // klog.Warningln("failed2 to read msg:", err)
  626. // continue
  627. //}
  628. //
  629. //fmt.Println(data2.Pid)
  630. //fmt.Println(data2.Tgid)
  631. //fmt.Println(string(v.Data))
  632. //continue
  633. case perfMapTypeL7Events:
  634. v := &l7Event{}
  635. reader := bytes.NewBuffer(rec.RawSample)
  636. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  637. klog.Warningln("failed to read msg:", err)
  638. continue
  639. }
  640. //fmt.Println("v.TraceIdFrom")
  641. //fmt.Println(v.TraceIdFrom)
  642. //a := hex.EncodeToString(v.TraceIdFrom[:])
  643. //for _, b := range v.AssumedAppId {
  644. // fmt.Printf("v.AssumedAppId- %02\n", b)
  645. //}
  646. //fmt.Println(a)
  647. payload := reader.Bytes()
  648. req := &l7.RequestData{
  649. Protocol: l7.Protocol(v.Protocol),
  650. Status: l7.Status(v.Status),
  651. Duration: time.Duration(v.Duration),
  652. Method: l7.Method(v.Method),
  653. StatementId: v.StatementId,
  654. TraceId: v.TraceId,
  655. TraceStart: v.TraceStart,
  656. TraceEnd: v.TraceEnd,
  657. EventCount: v.EventCount,
  658. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  659. SpanId: hex.EncodeToString(v.SpanId[:]),
  660. StartAt: v.StartAt,
  661. EndAt: v.EndtAt,
  662. ComponentSAddr: ipPort(v.ComponentSAddr, v.ComponentSport),
  663. ComponentDAddr: ipPort(v.ComponentDAddr, v.ComponentDport),
  664. }
  665. if req.Protocol == l7.ProtocolHTTP {
  666. klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  667. klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  668. }
  669. if v.TraceEnd == 1 {
  670. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  671. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  672. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  673. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  674. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  675. req.SAddr = ipPort(v.SAddr, v.Sport)
  676. req.DAddr = ipPort(v.DAddr, v.Dport)
  677. klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
  678. klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
  679. }
  680. switch {
  681. case v.PayloadSize == 0:
  682. case v.PayloadSize > MaxPayloadSize:
  683. req.Payload = payload[:MaxPayloadSize]
  684. default:
  685. req.Payload = payload[:v.PayloadSize]
  686. }
  687. //fmt.Println("==========")
  688. //fmt.Println("req.Payload:", string(req.Payload))
  689. //fmt.Println("==========")
  690. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  691. case perfMapTypeFileEvents:
  692. v := &fileEvent{}
  693. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  694. klog.Warningln("failed to read msg:", err)
  695. continue
  696. }
  697. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  698. case perfMapTypeProcEvents:
  699. v := &procEvent{}
  700. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  701. klog.Warningln("failed to read msg:", err)
  702. continue
  703. }
  704. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  705. case perfMapTypeTCPEvents:
  706. v := &tcpEvent{}
  707. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  708. klog.Warningln("failed to read msg:", err)
  709. continue
  710. }
  711. event = Event{
  712. Type: v.Type,
  713. Pid: v.Pid,
  714. SrcAddr: ipPort(v.SAddr, v.SPort),
  715. DstAddr: ipPort(v.DAddr, v.DPort),
  716. Fd: v.Fd,
  717. Timestamp: v.Timestamp,
  718. Duration: time.Duration(v.Duration),
  719. }
  720. if v.Type == EventTypeConnectionClose {
  721. event.TrafficStats = &TrafficStats{
  722. BytesSent: v.BytesSent,
  723. BytesReceived: v.BytesReceived,
  724. }
  725. }
  726. event.FirstReadTime = v.FirstReadTime
  727. event.FirstWriteTime = v.FirstWriteTime
  728. event.NewReadTime = v.NewReadTime
  729. if v.Type == EventTypeAcceptClose {
  730. event.TrafficStats = &TrafficStats{
  731. BytesSent: v.BytesSent,
  732. BytesReceived: v.BytesReceived,
  733. }
  734. }
  735. case perfMapTypePythonThreadEvents:
  736. v := &pythonThreadEvent{}
  737. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  738. klog.Warningln("failed to read msg:", err)
  739. continue
  740. }
  741. event = Event{
  742. Type: v.Type,
  743. Pid: v.Pid,
  744. Duration: time.Duration(v.Duration),
  745. }
  746. case perfMapTypeEventQueue:
  747. v := &StackEvent{}
  748. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  749. klog.Warningln("failed to read msg:", err)
  750. continue
  751. }
  752. event = Event{
  753. Type: EventTypeFunEnt,
  754. StackEvent: v,
  755. }
  756. default:
  757. continue
  758. }
  759. ch <- event
  760. }
  761. }
  762. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  763. i, _ := netaddr.FromStdIP(ip[:])
  764. return netaddr.IPPortFrom(i, port)
  765. }
  766. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  767. var err error
  768. var info EbpfProcInfo
  769. if appInfo.EBPFProcInfo == nil {
  770. info = EbpfProcInfo{
  771. InstanceId: appInfo.InstanceIdHash.HashtVal,
  772. AppId: appInfo.AppIdHash.HashtVal,
  773. CodeType: uint16(appInfo.CodeType),
  774. }
  775. } else {
  776. info = *appInfo.EBPFProcInfo
  777. info.AppId = appInfo.AppIdHash.HashtVal
  778. }
  779. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  780. if err != nil {
  781. klog.Error("failed to update program info", err)
  782. }
  783. appInfo.EBPFProcInfo = &info
  784. return err
  785. }
  786. // TODO check language
  787. func (t *Tracer) DisableL7Tracing() bool {
  788. return t.disableL7Tracing
  789. }
  790. func (t *Tracer) DisableE2ETracing() bool {
  791. return t.disableE2ETracing
  792. }
  793. func (t *Tracer) DisableStackTracing() bool {
  794. return t.disableStackTracing
  795. }