tracer.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/cilium/ebpf/perf"
  16. "github.com/coroot/coroot-node-agent/common"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. . "github.com/coroot/coroot-node-agent/utils/modelse"
  21. klog "github.com/sirupsen/logrus"
  22. "golang.org/x/sys/unix"
  23. "inet.af/netaddr"
  24. )
  25. /*
  26. #define TASK_COMM_LEN 16
  27. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  28. #include <linux/types.h>
  29. struct __tuple_t {
  30. __u8 daddr[16];
  31. __u8 rcv_saddr[16];
  32. __u8 addr_len;
  33. __u8 l4_protocol;
  34. __u16 dport;
  35. __u16 num;
  36. };
  37. struct __socket_data {
  38. __u32 pid;
  39. __u32 tgid;
  40. __u64 coroutine_id;
  41. __u8 source;
  42. __u8 comm[TASK_COMM_LEN];
  43. __u64 socket_id;
  44. struct __tuple_t tuple;
  45. __u32 extra_data;
  46. __u32 extra_data_count;
  47. __u32 tcp_seq;
  48. __u64 thread_trace_id;
  49. __u64 timestamp;
  50. __u8 direction: 1;
  51. __u8 msg_type: 7;
  52. __u64 syscall_len;
  53. __u64 data_seq;
  54. __u16 data_type;
  55. __u16 data_len;
  56. char data[BURST_DATA_BUF_SIZE];
  57. } __attribute__((packed));
  58. struct __socket_data_buffer {
  59. __u32 events_num;
  60. __u32 len;
  61. char data[32760];
  62. };
  63. */
  64. import "C"
  65. type SocketData C.struct___socket_data
  66. type SocketDataBuffer C.struct___socket_data_buffer
  67. const MaxPayloadSize = 1024
  68. type EventType uint32
  69. type EventReason uint32
  70. const (
  71. EventTypeProcessStart EventType = 1
  72. EventTypeProcessExit EventType = 2
  73. EventTypeConnectionOpen EventType = 3
  74. EventTypeConnectionClose EventType = 4
  75. EventTypeConnectionError EventType = 5
  76. EventTypeListenOpen EventType = 6
  77. EventTypeListenClose EventType = 7
  78. EventTypeFileOpen EventType = 8
  79. EventTypeTCPRetransmit EventType = 9
  80. EventTypeL7Request EventType = 10
  81. EventTypeFunEnt EventType = 11
  82. EventTypeFunRet EventType = 12
  83. EventReasonNone EventReason = 0
  84. EventReasonOOMKill EventReason = 1
  85. )
  86. type TrafficStats struct {
  87. BytesSent uint64
  88. BytesReceived uint64
  89. }
  90. type Event struct {
  91. StackEvent *StackEvent
  92. Type EventType
  93. Reason EventReason
  94. Pid uint32
  95. SrcAddr netaddr.IPPort
  96. DstAddr netaddr.IPPort
  97. Fd uint64
  98. Timestamp uint64
  99. Duration time.Duration
  100. L7Request *l7.RequestData
  101. TrafficStats *TrafficStats
  102. }
  103. type perfMapType uint8
  104. const (
  105. perfMapTypeProcEvents perfMapType = 1
  106. perfMapTypeTCPEvents perfMapType = 2
  107. perfMapTypeFileEvents perfMapType = 3
  108. perfMapTypeL7Events perfMapType = 4
  109. perfMapTypeSocketEvents perfMapType = 5
  110. perfMapTypeEventQueue perfMapType = 6
  111. perfMapTypePythonThreadEvents perfMapType = 7
  112. )
  113. type Tracer struct {
  114. kernelVersion string
  115. disableL7Tracing bool
  116. disableE2ETracing bool
  117. disableStackTracing bool
  118. collection *ebpf.Collection
  119. readers map[string]*perf.Reader
  120. links []link.Link
  121. uprobes map[string]*ebpf.Program
  122. Symbols []debugelf.Symbol
  123. Uprobes []tracer.Uprobe
  124. UprobesMap map[string]tracer.Uprobe
  125. }
  126. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  127. if disableL7Tracing {
  128. klog.Infoln("L7 tracing is disabled")
  129. }
  130. if disableE2ETracing {
  131. klog.Infoln("L7 tracing is disabled")
  132. }
  133. if disableStackTracing {
  134. klog.Infoln("L7 stack is disabled")
  135. }
  136. return &Tracer{
  137. kernelVersion: kernelVersion,
  138. disableL7Tracing: disableL7Tracing,
  139. disableE2ETracing: disableE2ETracing,
  140. disableStackTracing: disableStackTracing,
  141. readers: map[string]*perf.Reader{},
  142. uprobes: map[string]*ebpf.Program{},
  143. }
  144. }
  145. func (t *Tracer) Run(events chan<- Event) error {
  146. if err := t.ebpf(events); err != nil {
  147. return err
  148. }
  149. if err := t.init(events); err != nil {
  150. return err
  151. }
  152. return nil
  153. }
  154. func (t *Tracer) Close() {
  155. for _, p := range t.uprobes {
  156. _ = p.Close()
  157. }
  158. for _, l := range t.links {
  159. _ = l.Close()
  160. }
  161. for _, r := range t.readers {
  162. _ = r.Close()
  163. }
  164. t.collection.Close()
  165. }
  166. func (t *Tracer) init(ch chan<- Event) error {
  167. pids, err := proc.ListPids()
  168. if err != nil {
  169. return fmt.Errorf("failed to list pids: %w", err)
  170. }
  171. for _, pid := range pids {
  172. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  173. }
  174. fds, sockets := readFds(pids)
  175. for _, fd := range fds {
  176. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  177. }
  178. listens := map[uint64]bool{}
  179. for _, s := range sockets {
  180. if s.Listen {
  181. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  182. }
  183. }
  184. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  185. timestamp := uint64(time.Now().UnixNano())
  186. for _, s := range sockets {
  187. typ := EventTypeConnectionOpen
  188. if s.Listen {
  189. typ = EventTypeListenOpen
  190. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  191. continue
  192. }
  193. ch <- Event{
  194. Type: typ,
  195. Pid: s.pid,
  196. Timestamp: timestamp,
  197. Fd: s.fd,
  198. SrcAddr: s.SAddr,
  199. DstAddr: s.DAddr,
  200. }
  201. if typ == EventTypeConnectionOpen {
  202. id := ConnectionId{FD: s.fd, PID: s.pid}
  203. conn := Connection{Timestamp: timestamp}
  204. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  205. klog.Warningln(err)
  206. }
  207. }
  208. }
  209. return nil
  210. }
  211. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  212. return t.collection.Maps["active_connections"].Iterate()
  213. }
  214. type ConnectionId struct {
  215. FD uint64
  216. PID uint32
  217. _ uint32
  218. }
  219. type Connection struct {
  220. Timestamp uint64
  221. BytesSent uint64
  222. BytesReceived uint64
  223. }
  224. type perfMap struct {
  225. name string
  226. perCPUBufferSizePages int
  227. typ perfMapType
  228. }
  229. func (t *Tracer) ebpf(ch chan<- Event) error {
  230. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  231. path, prg, err := EbpfCode(kv)
  232. klog.Infof("kv is %s, kernel version: %s path: %s", kv, t.kernelVersion, path)
  233. if len(prg) == 0 || err != nil {
  234. return fmt.Errorf("kv is %s, unsupported kernel version: %s path: %s err:<%v>", kv, t.kernelVersion, path, err)
  235. }
  236. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  237. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  238. if debugFsErr != nil && traceFsErr != nil {
  239. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  240. }
  241. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  242. if err != nil {
  243. return fmt.Errorf("failed to load collection spec: %w", err)
  244. }
  245. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  246. tracer.PidFilter(collectionSpec)
  247. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  248. klog.Infof("[start] Look eBPF .maps")
  249. for _, spec := range collectionSpec.Maps {
  250. klog.Infoln(spec.Name)
  251. }
  252. klog.Infof("[end] Look eBPF .maps")
  253. tracer.MapInit(collectionSpec, opts)
  254. // TODO 多进程
  255. // tracer.SetConstants(collectionSpec)
  256. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  257. if err != nil {
  258. var verr *ebpf.VerifierError
  259. if errors.As(err, &verr) {
  260. klog.Errorf("----%+v", verr)
  261. }
  262. return fmt.Errorf("failed to load collection: %w", err)
  263. }
  264. tracer.Offset()
  265. t.collection = c
  266. perfMaps := []perfMap{
  267. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  268. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  269. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  270. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  271. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  272. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  273. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  274. }
  275. tracer.MapInsert(c)
  276. if !t.DisableL7Tracing() {
  277. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  278. }
  279. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  280. klog.Infof("[start] Look eBPF perf_maps")
  281. for _, pm := range perfMaps {
  282. klog.Infoln(pm.name)
  283. m, ok := t.collection.Maps[pm.name]
  284. if ok {
  285. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  286. if err != nil {
  287. t.Close()
  288. return fmt.Errorf("failed to create ebpf reader: %w", err)
  289. }
  290. t.readers[pm.name] = r
  291. // event监听
  292. go runEventsReader(pm.name, r, ch, pm.typ)
  293. }
  294. }
  295. klog.Infof("[end] Look eBPF perf_maps")
  296. klog.Infof("[start] Look eBPF specPrograms")
  297. for _, programSpec := range collectionSpec.Programs {
  298. program := t.collection.Programs[programSpec.Name]
  299. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  300. if t.DisableL7Tracing() {
  301. switch programSpec.Name {
  302. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  303. continue
  304. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  305. continue
  306. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  307. continue
  308. }
  309. }
  310. var l link.Link
  311. switch programSpec.Type {
  312. case ebpf.TracePoint:
  313. if strings.Contains(programSpec.SectionName, "prog") {
  314. continue
  315. }
  316. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  317. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  318. case ebpf.Kprobe:
  319. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  320. t.uprobes[programSpec.Name] = program
  321. continue
  322. }
  323. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  324. }
  325. if err != nil {
  326. t.Close()
  327. return fmt.Errorf("failed to link program: %w", err)
  328. }
  329. t.links = append(t.links, l)
  330. }
  331. klog.Infof("[end] Look eBPF specPrograms")
  332. return nil
  333. }
  334. func (t EventType) String() string {
  335. switch t {
  336. case EventTypeProcessStart:
  337. return "process-start"
  338. case EventTypeProcessExit:
  339. return "process-exit"
  340. case EventTypeConnectionOpen:
  341. return "connection-open"
  342. case EventTypeConnectionClose:
  343. return "connection-close"
  344. case EventTypeConnectionError:
  345. return "connection-error"
  346. case EventTypeListenOpen:
  347. return "listen-open"
  348. case EventTypeListenClose:
  349. return "listen-close"
  350. case EventTypeFileOpen:
  351. return "file-open"
  352. case EventTypeTCPRetransmit:
  353. return "tcp-retransmit"
  354. case EventTypeL7Request:
  355. return "l7-request"
  356. }
  357. return "unknown: " + strconv.Itoa(int(t))
  358. }
  359. func (t EventReason) String() string {
  360. switch t {
  361. case EventReasonNone:
  362. return "none"
  363. case EventReasonOOMKill:
  364. return "oom-kill"
  365. }
  366. return "unknown: " + strconv.Itoa(int(t))
  367. }
  368. type procEvent struct {
  369. Type EventType
  370. Pid uint32
  371. Reason uint32
  372. }
  373. type tcpEvent struct {
  374. Fd uint64
  375. Timestamp uint64
  376. Duration uint64
  377. Type EventType
  378. Pid uint32
  379. BytesSent uint64
  380. BytesReceived uint64
  381. SPort uint16
  382. DPort uint16
  383. SAddr [16]byte
  384. DAddr [16]byte
  385. }
  386. type fileEvent struct {
  387. Type EventType
  388. Pid uint32
  389. Fd uint64
  390. }
  391. // struct l7_event in l7.c
  392. type l7Event struct {
  393. Fd uint64
  394. ConnectionTimestamp uint64
  395. Pid uint32
  396. Status uint32
  397. Duration uint64
  398. Protocol uint8
  399. Method uint8
  400. Padding uint16
  401. StatementId uint32
  402. PayloadSize uint64
  403. TraceId uint64
  404. StartAt uint64 // ns
  405. EndtAt uint64 // ns
  406. TraceStart uint32
  407. TraceEnd uint32
  408. EventCount uint32
  409. AssumedAppId HashByte
  410. SpanId HashByte
  411. TraceIdFrom HashByte16
  412. CalledId HashByte
  413. InstanceIdFrom HashByte
  414. AppIdFrom HashByte
  415. SpanIdFrom HashByte
  416. }
  417. type SocketDataBufferddd struct {
  418. EventsNum uint32
  419. Len uint32
  420. Data [32760]byte
  421. }
  422. const (
  423. TASK_COMM_LEN = 16
  424. BURST_DATA_BUF_SIZE = 8192
  425. )
  426. type Tuple struct {
  427. Daddr [16]uint8
  428. RcvSaddr [16]uint8
  429. AddrLen uint8
  430. L4Protocol uint8
  431. Dport uint16
  432. Num uint16
  433. }
  434. type SocketDatadddd struct {
  435. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  436. Tgid uint32 // 进程号
  437. CoroutineID uint64
  438. Source uint8
  439. Comm [TASK_COMM_LEN]byte
  440. SocketID uint64
  441. Tuple Tuple
  442. ExtraData uint32
  443. ExtraDataCount uint32
  444. TcpSeq uint32
  445. ThreadTraceID uint64
  446. Timestamp uint64
  447. Direction uint8
  448. MsgType uint8
  449. SyscallLen uint64
  450. DataSeq uint64
  451. DataType uint16
  452. DataLen uint16
  453. Data [BURST_DATA_BUF_SIZE]byte
  454. }
  455. type StackEvent struct {
  456. Type uint64
  457. Pid uint64
  458. TraceId uint64
  459. Goid uint64
  460. Ip uint64
  461. Bp uint64
  462. CallerIp uint64
  463. CallerBp uint64
  464. TimeNsStart uint64
  465. TimeNsEnd uint64
  466. // Nid uint64
  467. // Fpid uint64
  468. // Level uint64
  469. Location byte
  470. ClassName [100]byte
  471. MethedName [100]byte
  472. }
  473. type StackFunEvent struct {
  474. StackEvent StackEvent
  475. Uprobe *tracer.Uprobe
  476. }
  477. type pythonThreadEvent struct {
  478. Type EventType
  479. Pid uint32
  480. Duration uint64
  481. }
  482. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  483. for {
  484. rec, err := r.Read()
  485. if err != nil {
  486. if errors.Is(err, perf.ErrClosed) {
  487. break
  488. }
  489. continue
  490. }
  491. if rec.LostSamples > 0 {
  492. klog.Errorln(name, "lost samples:", rec.LostSamples)
  493. continue
  494. }
  495. var event Event
  496. switch typ {
  497. case perfMapTypeSocketEvents:
  498. //fmt.Println("perfMapTypeSocketEvents")
  499. // 假设 rec.RawSample 包含数据,类型为 []byte
  500. //rawData := rec.RawSample
  501. //fmt.Println("perfMapTypeSocketEvents2")
  502. //
  503. //// 创建一个 SocketDataBuffer 结构体实例
  504. //var buffer SocketDataBuffer
  505. //
  506. //// 创建一个字节缓冲区,并将数据填充到其中
  507. //reader := bytes.NewReader(rawData)
  508. //fmt.Println("perfMapTypeSocketEvents3")
  509. //fmt.Println(len(rawData))
  510. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  511. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  512. // fmt.Println(reader.Len())
  513. // fmt.Println("Failed to read data:", err)
  514. // continue
  515. //}
  516. //fmt.Println("perfMapTypeSocketEvents4")
  517. //
  518. //// 打印解析后的数据
  519. //fmt.Println("EventsNum:", buffer.EventsNum)
  520. //fmt.Println("Len:", buffer.Len)
  521. //
  522. //// 打印 char data 的内容
  523. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  524. //socketDataBuffer := rec.RawSample
  525. // 解析 __socket_data_buffer
  526. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  527. //
  528. //// 获取 char data[32760];
  529. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  530. //// todo
  531. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  532. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  533. //
  534. //// 解析C结构体中的data字段
  535. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  536. //// 打印或处理包含的数据
  537. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  538. /*todo */
  539. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  540. //dataPtr := unsafe.Pointer(&buf.data[0])
  541. //socketData := (*SocketData)(dataPtr)
  542. //reader2 := bytes.NewBuffer(rec.RawSample)
  543. // 222222
  544. //fmt.Println("socketData.Pid:", socketData.pid)
  545. //fmt.Println("socketData.Tgid:", socketData.tgid)
  546. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  547. //fmt.Println("socketData.Source:", socketData.source)
  548. //
  549. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  550. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  551. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  552. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  553. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  554. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  555. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  556. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  557. //fmt.Println("socketData.Direction:", socketData.Direction)
  558. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  559. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  560. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  561. //socketData := &SocketData{}
  562. //reader := bytes.NewBuffer(rec.RawSample)
  563. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  564. // klog.Warningln("failed1 to read msg:", err)
  565. // continue
  566. //}
  567. //
  568. //var data []byte
  569. //payload := reader.Bytes()
  570. //switch {
  571. //case v.Len == 0:
  572. //case v.Len > 32760:
  573. // data = payload[:32760]
  574. //default:
  575. // data = payload[:v.Len]
  576. //}
  577. //////data2 := data[:v.Len]
  578. ////fmt.Println("perfMapTypeSocketEvents")
  579. //fmt.Println(v.EventsNum)
  580. //fmt.Println(v.Len)
  581. //fmt.Println(string(data))
  582. //
  583. //var data2 SocketData
  584. //reader2 := bytes.NewBuffer(data)
  585. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  586. // klog.Warningln("failed2 to read msg:", err)
  587. // continue
  588. //}
  589. //
  590. //fmt.Println(data2.Pid)
  591. //fmt.Println(data2.Tgid)
  592. //fmt.Println(string(v.Data))
  593. //continue
  594. case perfMapTypeL7Events:
  595. v := &l7Event{}
  596. reader := bytes.NewBuffer(rec.RawSample)
  597. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  598. klog.Warningln("failed to read msg:", err)
  599. continue
  600. }
  601. //fmt.Println("v.TraceIdFrom")
  602. //fmt.Println(v.TraceIdFrom)
  603. //a := hex.EncodeToString(v.TraceIdFrom[:])
  604. //for _, b := range v.AssumedAppId {
  605. // fmt.Printf("v.AssumedAppId- %02\n", b)
  606. //}
  607. //fmt.Println(a)
  608. payload := reader.Bytes()
  609. req := &l7.RequestData{
  610. Protocol: l7.Protocol(v.Protocol),
  611. Status: l7.Status(v.Status),
  612. Duration: time.Duration(v.Duration),
  613. Method: l7.Method(v.Method),
  614. StatementId: v.StatementId,
  615. TraceId: v.TraceId,
  616. TraceStart: v.TraceStart,
  617. TraceEnd: v.TraceEnd,
  618. EventCount: v.EventCount,
  619. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  620. SpanId: hex.EncodeToString(v.SpanId[:]),
  621. StartAt: v.StartAt,
  622. EndAt: v.EndtAt,
  623. }
  624. if v.TraceEnd == 1 {
  625. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  626. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  627. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  628. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  629. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  630. }
  631. switch {
  632. case v.PayloadSize == 0:
  633. case v.PayloadSize > MaxPayloadSize:
  634. req.Payload = payload[:MaxPayloadSize]
  635. default:
  636. req.Payload = payload[:v.PayloadSize]
  637. }
  638. //fmt.Println("==========")
  639. //fmt.Println("req.Payload:", string(req.Payload))
  640. //fmt.Println("==========")
  641. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  642. case perfMapTypeFileEvents:
  643. v := &fileEvent{}
  644. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  645. klog.Warningln("failed to read msg:", err)
  646. continue
  647. }
  648. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  649. case perfMapTypeProcEvents:
  650. v := &procEvent{}
  651. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  652. klog.Warningln("failed to read msg:", err)
  653. continue
  654. }
  655. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  656. case perfMapTypeTCPEvents:
  657. v := &tcpEvent{}
  658. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  659. klog.Warningln("failed to read msg:", err)
  660. continue
  661. }
  662. event = Event{
  663. Type: v.Type,
  664. Pid: v.Pid,
  665. SrcAddr: ipPort(v.SAddr, v.SPort),
  666. DstAddr: ipPort(v.DAddr, v.DPort),
  667. Fd: v.Fd,
  668. Timestamp: v.Timestamp,
  669. Duration: time.Duration(v.Duration),
  670. }
  671. if v.Type == EventTypeConnectionClose {
  672. event.TrafficStats = &TrafficStats{
  673. BytesSent: v.BytesSent,
  674. BytesReceived: v.BytesReceived,
  675. }
  676. }
  677. case perfMapTypePythonThreadEvents:
  678. v := &pythonThreadEvent{}
  679. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  680. klog.Warningln("failed to read msg:", err)
  681. continue
  682. }
  683. event = Event{
  684. Type: v.Type,
  685. Pid: v.Pid,
  686. Duration: time.Duration(v.Duration),
  687. }
  688. case perfMapTypeEventQueue:
  689. v := &StackEvent{}
  690. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  691. klog.Warningln("failed to read msg:", err)
  692. continue
  693. }
  694. event = Event{
  695. Type: EventTypeFunEnt,
  696. StackEvent: v,
  697. }
  698. default:
  699. continue
  700. }
  701. ch <- event
  702. }
  703. }
  704. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  705. i, _ := netaddr.FromStdIP(ip[:])
  706. return netaddr.IPPortFrom(i, port)
  707. }
  708. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  709. var err error
  710. var info EbpfProcInfo
  711. if appInfo.EBPFProcInfo == nil {
  712. info = EbpfProcInfo{
  713. InstanceId: appInfo.InstanceIdHash.HashtVal,
  714. AppId: appInfo.AppIdHash.HashtVal,
  715. CodeType: uint16(appInfo.CodeType),
  716. }
  717. } else {
  718. info = *appInfo.EBPFProcInfo
  719. info.AppId = appInfo.AppIdHash.HashtVal
  720. }
  721. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  722. if err != nil {
  723. klog.Error("failed to update program info", err)
  724. }
  725. appInfo.EBPFProcInfo = &info
  726. return err
  727. }
  728. // TODO check language
  729. func (t *Tracer) DisableL7Tracing() bool {
  730. return t.disableL7Tracing
  731. }
  732. func (t *Tracer) DisableE2ETracing() bool {
  733. return t.disableE2ETracing
  734. }
  735. func (t *Tracer) DisableStackTracing() bool {
  736. return t.disableStackTracing
  737. }