tracer.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/cilium/ebpf/perf"
  16. "github.com/coroot/coroot-node-agent/common"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. . "github.com/coroot/coroot-node-agent/utils/modelse"
  21. klog "github.com/sirupsen/logrus"
  22. "golang.org/x/sys/unix"
  23. "inet.af/netaddr"
  24. )
  25. /*
  26. #define TASK_COMM_LEN 16
  27. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  28. #include <linux/types.h>
  29. struct __tuple_t {
  30. __u8 daddr[16];
  31. __u8 rcv_saddr[16];
  32. __u8 addr_len;
  33. __u8 l4_protocol;
  34. __u16 dport;
  35. __u16 num;
  36. };
  37. struct __socket_data {
  38. __u32 pid;
  39. __u32 tgid;
  40. __u64 coroutine_id;
  41. __u8 source;
  42. __u8 comm[TASK_COMM_LEN];
  43. __u64 socket_id;
  44. struct __tuple_t tuple;
  45. __u32 extra_data;
  46. __u32 extra_data_count;
  47. __u32 tcp_seq;
  48. __u64 thread_trace_id;
  49. __u64 timestamp;
  50. __u8 direction: 1;
  51. __u8 msg_type: 7;
  52. __u64 syscall_len;
  53. __u64 data_seq;
  54. __u16 data_type;
  55. __u16 data_len;
  56. char data[BURST_DATA_BUF_SIZE];
  57. } __attribute__((packed));
  58. struct __socket_data_buffer {
  59. __u32 events_num;
  60. __u32 len;
  61. char data[32760];
  62. };
  63. */
  64. import "C"
  65. type SocketData C.struct___socket_data
  66. type SocketDataBuffer C.struct___socket_data_buffer
  67. const MaxPayloadSize = 1024
  68. type EventType uint32
  69. type EventReason uint32
  70. const (
  71. EventTypeProcessStart EventType = 1
  72. EventTypeProcessExit EventType = 2
  73. EventTypeConnectionOpen EventType = 3
  74. EventTypeConnectionClose EventType = 4
  75. EventTypeConnectionError EventType = 5
  76. EventTypeListenOpen EventType = 6
  77. EventTypeListenClose EventType = 7
  78. EventTypeFileOpen EventType = 8
  79. EventTypeTCPRetransmit EventType = 9
  80. EventTypeL7Request EventType = 10
  81. EventTypeFunEnt EventType = 11
  82. EventTypeFunRet EventType = 12
  83. EventTypeAcceptOpen EventType = 13
  84. EventTypeAcceptClose EventType = 14
  85. EventReasonNone EventReason = 0
  86. EventReasonOOMKill EventReason = 1
  87. )
  88. type TrafficStats struct {
  89. BytesSent uint64
  90. BytesReceived uint64
  91. }
  92. type Event struct {
  93. StackEvent *StackEvent
  94. Type EventType
  95. Reason EventReason
  96. Pid uint32
  97. SrcAddr netaddr.IPPort
  98. DstAddr netaddr.IPPort
  99. Fd uint64
  100. Timestamp uint64
  101. Duration time.Duration
  102. L7Request *l7.RequestData
  103. TrafficStats *TrafficStats
  104. FirstReadTime uint64
  105. FirstWriteTime uint64
  106. NewReadTime uint64
  107. }
  108. type perfMapType uint8
  109. const (
  110. perfMapTypeProcEvents perfMapType = 1
  111. perfMapTypeTCPEvents perfMapType = 2
  112. perfMapTypeFileEvents perfMapType = 3
  113. perfMapTypeL7Events perfMapType = 4
  114. perfMapTypeSocketEvents perfMapType = 5
  115. perfMapTypeEventQueue perfMapType = 6
  116. perfMapTypePythonThreadEvents perfMapType = 7
  117. )
  118. type Tracer struct {
  119. kernelVersion string
  120. disableL7Tracing bool
  121. disableE2ETracing bool
  122. disableStackTracing bool
  123. collection *ebpf.Collection
  124. readers map[string]*perf.Reader
  125. links []link.Link
  126. uprobes map[string]*ebpf.Program
  127. Symbols []debugelf.Symbol
  128. Uprobes []tracer.Uprobe
  129. UprobesMap map[string]tracer.Uprobe
  130. }
  131. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  132. if disableL7Tracing {
  133. klog.Infoln("L7 tracing is disabled")
  134. }
  135. if disableE2ETracing {
  136. klog.Infoln("L7 tracing is disabled")
  137. }
  138. if disableStackTracing {
  139. klog.Infoln("L7 stack is disabled")
  140. }
  141. return &Tracer{
  142. kernelVersion: kernelVersion,
  143. disableL7Tracing: disableL7Tracing,
  144. disableE2ETracing: disableE2ETracing,
  145. disableStackTracing: disableStackTracing,
  146. readers: map[string]*perf.Reader{},
  147. uprobes: map[string]*ebpf.Program{},
  148. }
  149. }
  150. func (t *Tracer) Run(events chan<- Event) error {
  151. if err := t.ebpf(events); err != nil {
  152. return err
  153. }
  154. if err := t.init(events); err != nil {
  155. return err
  156. }
  157. return nil
  158. }
  159. func (t *Tracer) Close() {
  160. for _, p := range t.uprobes {
  161. _ = p.Close()
  162. }
  163. for _, l := range t.links {
  164. _ = l.Close()
  165. }
  166. for _, r := range t.readers {
  167. _ = r.Close()
  168. }
  169. t.collection.Close()
  170. }
  171. func (t *Tracer) init(ch chan<- Event) error {
  172. pids, err := proc.ListPids()
  173. if err != nil {
  174. return fmt.Errorf("failed to list pids: %w", err)
  175. }
  176. for _, pid := range pids {
  177. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  178. }
  179. fds, sockets := readFds(pids)
  180. for _, fd := range fds {
  181. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  182. }
  183. listens := map[uint64]bool{}
  184. for _, s := range sockets {
  185. if s.Listen {
  186. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  187. }
  188. }
  189. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  190. timestamp := uint64(time.Now().UnixNano())
  191. for _, s := range sockets {
  192. typ := EventTypeConnectionOpen
  193. if s.Listen {
  194. typ = EventTypeListenOpen
  195. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  196. continue
  197. }
  198. ch <- Event{
  199. Type: typ,
  200. Pid: s.pid,
  201. Timestamp: timestamp,
  202. Fd: s.fd,
  203. SrcAddr: s.SAddr,
  204. DstAddr: s.DAddr,
  205. }
  206. if typ == EventTypeConnectionOpen {
  207. id := ConnectionId{FD: s.fd, PID: s.pid}
  208. conn := Connection{Timestamp: timestamp}
  209. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  210. klog.Warningln(err)
  211. }
  212. }
  213. }
  214. return nil
  215. }
  216. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  217. return t.collection.Maps["active_connections"].Iterate()
  218. }
  219. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  220. return t.collection.Maps["active_accepts"].Iterate()
  221. }
  222. type ConnectionId struct {
  223. FD uint64
  224. PID uint32
  225. _ uint32
  226. }
  227. type Connection struct {
  228. Timestamp uint64
  229. BytesSent uint64
  230. BytesReceived uint64
  231. }
  232. type perfMap struct {
  233. name string
  234. perCPUBufferSizePages int
  235. typ perfMapType
  236. }
  237. func (t *Tracer) ebpf(ch chan<- Event) error {
  238. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  239. path, prg, err := EbpfCode(kv)
  240. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  241. if len(prg) == 0 || err != nil {
  242. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  243. }
  244. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  245. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  246. if debugFsErr != nil && traceFsErr != nil {
  247. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  248. }
  249. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  250. if err != nil {
  251. return fmt.Errorf("failed to load collection spec: %w", err)
  252. }
  253. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  254. tracer.PidFilter(collectionSpec)
  255. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  256. klog.Infof("[start] Look eBPF .maps")
  257. for _, spec := range collectionSpec.Maps {
  258. klog.Infoln(spec.Name)
  259. }
  260. klog.Infof("[end] Look eBPF .maps")
  261. tracer.MapInit(collectionSpec, opts)
  262. // TODO 多进程
  263. // tracer.SetConstants(collectionSpec)
  264. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  265. if err != nil {
  266. var verr *ebpf.VerifierError
  267. if errors.As(err, &verr) {
  268. klog.Errorf("----%+v", verr)
  269. }
  270. return fmt.Errorf("failed to load collection: %w", err)
  271. }
  272. tracer.Offset()
  273. t.collection = c
  274. perfMaps := []perfMap{
  275. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  276. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  277. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  278. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  279. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  280. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  281. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  282. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  283. }
  284. tracer.MapInsert(c)
  285. if !t.DisableL7Tracing() {
  286. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  287. }
  288. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  289. klog.Infof("[start] Look eBPF perf_maps")
  290. for _, pm := range perfMaps {
  291. klog.Infoln(pm.name)
  292. m, ok := t.collection.Maps[pm.name]
  293. if ok {
  294. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  295. if err != nil {
  296. t.Close()
  297. return fmt.Errorf("failed to create ebpf reader: %w", err)
  298. }
  299. t.readers[pm.name] = r
  300. // event监听
  301. go runEventsReader(pm.name, r, ch, pm.typ)
  302. }
  303. }
  304. klog.Infof("[end] Look eBPF perf_maps")
  305. klog.Infof("[start] Look eBPF specPrograms")
  306. for _, programSpec := range collectionSpec.Programs {
  307. program := t.collection.Programs[programSpec.Name]
  308. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  309. if t.DisableL7Tracing() {
  310. switch programSpec.Name {
  311. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  312. continue
  313. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  314. continue
  315. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  316. continue
  317. }
  318. }
  319. var l link.Link
  320. switch programSpec.Type {
  321. case ebpf.TracePoint:
  322. if strings.Contains(programSpec.SectionName, "prog") {
  323. continue
  324. }
  325. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  326. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  327. case ebpf.Kprobe:
  328. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  329. t.uprobes[programSpec.Name] = program
  330. continue
  331. }
  332. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  333. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  334. t.links = append(t.links, l)
  335. continue
  336. }
  337. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  338. }
  339. if err != nil {
  340. t.Close()
  341. return fmt.Errorf("failed to link program: %w", err)
  342. }
  343. t.links = append(t.links, l)
  344. }
  345. klog.Infof("[end] Look eBPF specPrograms")
  346. return nil
  347. }
  348. func (t EventType) String() string {
  349. switch t {
  350. case EventTypeProcessStart:
  351. return "process-start"
  352. case EventTypeProcessExit:
  353. return "process-exit"
  354. case EventTypeConnectionOpen:
  355. return "connection-open"
  356. case EventTypeConnectionClose:
  357. return "connection-close"
  358. case EventTypeConnectionError:
  359. return "connection-error"
  360. case EventTypeListenOpen:
  361. return "listen-open"
  362. case EventTypeListenClose:
  363. return "listen-close"
  364. case EventTypeFileOpen:
  365. return "file-open"
  366. case EventTypeTCPRetransmit:
  367. return "tcp-retransmit"
  368. case EventTypeL7Request:
  369. return "l7-request"
  370. }
  371. return "unknown: " + strconv.Itoa(int(t))
  372. }
  373. func (t EventReason) String() string {
  374. switch t {
  375. case EventReasonNone:
  376. return "none"
  377. case EventReasonOOMKill:
  378. return "oom-kill"
  379. }
  380. return "unknown: " + strconv.Itoa(int(t))
  381. }
  382. type procEvent struct {
  383. Type EventType
  384. Pid uint32
  385. Reason uint32
  386. }
  387. type tcpEvent struct {
  388. Fd uint64
  389. Timestamp uint64
  390. Duration uint64
  391. FirstReadTime uint64
  392. FirstWriteTime uint64
  393. NewReadTime uint64
  394. Type EventType
  395. Pid uint32
  396. BytesSent uint64
  397. BytesReceived uint64
  398. SPort uint16
  399. DPort uint16
  400. SAddr [16]byte
  401. DAddr [16]byte
  402. }
  403. type fileEvent struct {
  404. Type EventType
  405. Pid uint32
  406. Fd uint64
  407. }
  408. // struct l7_event in l7.c
  409. type l7Event struct {
  410. Fd uint64
  411. ConnectionTimestamp uint64
  412. Pid uint32
  413. Status uint32
  414. Duration uint64
  415. Protocol uint8
  416. Method uint8
  417. Padding uint16
  418. StatementId uint32
  419. PayloadSize uint64
  420. TraceId uint64
  421. StartAt uint64 // ns
  422. EndtAt uint64 // ns
  423. TraceStart uint32
  424. TraceEnd uint32
  425. EventCount uint32
  426. Sport uint16
  427. Dport uint16
  428. SAddr [16]byte
  429. DAddr [16]byte
  430. AssumedAppId HashByte
  431. SpanId HashByte
  432. TraceIdFrom HashByte16
  433. CalledId HashByte
  434. InstanceIdFrom HashByte
  435. AppIdFrom HashByte
  436. SpanIdFrom HashByte
  437. }
  438. type SocketDataBufferddd struct {
  439. EventsNum uint32
  440. Len uint32
  441. Data [32760]byte
  442. }
  443. const (
  444. TASK_COMM_LEN = 16
  445. BURST_DATA_BUF_SIZE = 8192
  446. )
  447. type Tuple struct {
  448. Daddr [16]uint8
  449. RcvSaddr [16]uint8
  450. AddrLen uint8
  451. L4Protocol uint8
  452. Dport uint16
  453. Num uint16
  454. }
  455. type SocketDatadddd struct {
  456. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  457. Tgid uint32 // 进程号
  458. CoroutineID uint64
  459. Source uint8
  460. Comm [TASK_COMM_LEN]byte
  461. SocketID uint64
  462. Tuple Tuple
  463. ExtraData uint32
  464. ExtraDataCount uint32
  465. TcpSeq uint32
  466. ThreadTraceID uint64
  467. Timestamp uint64
  468. Direction uint8
  469. MsgType uint8
  470. SyscallLen uint64
  471. DataSeq uint64
  472. DataType uint16
  473. DataLen uint16
  474. Data [BURST_DATA_BUF_SIZE]byte
  475. }
  476. type StackEvent struct {
  477. Type uint64
  478. Pid uint64
  479. TraceId uint64
  480. Goid uint64
  481. Ip uint64
  482. Bp uint64
  483. CallerIp uint64
  484. CallerBp uint64
  485. TimeNsStart uint64
  486. TimeNsEnd uint64
  487. // Nid uint64
  488. // Fpid uint64
  489. // Level uint64
  490. Location byte
  491. ClassName [100]byte
  492. MethedName [100]byte
  493. }
  494. type StackFunEvent struct {
  495. StackEvent StackEvent
  496. Uprobe *tracer.Uprobe
  497. }
  498. type pythonThreadEvent struct {
  499. Type EventType
  500. Pid uint32
  501. Duration uint64
  502. }
  503. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  504. for {
  505. rec, err := r.Read()
  506. if err != nil {
  507. if errors.Is(err, perf.ErrClosed) {
  508. break
  509. }
  510. continue
  511. }
  512. if rec.LostSamples > 0 {
  513. klog.Errorln(name, "lost samples:", rec.LostSamples)
  514. continue
  515. }
  516. var event Event
  517. switch typ {
  518. case perfMapTypeSocketEvents:
  519. //fmt.Println("perfMapTypeSocketEvents")
  520. // 假设 rec.RawSample 包含数据,类型为 []byte
  521. //rawData := rec.RawSample
  522. //fmt.Println("perfMapTypeSocketEvents2")
  523. //
  524. //// 创建一个 SocketDataBuffer 结构体实例
  525. //var buffer SocketDataBuffer
  526. //
  527. //// 创建一个字节缓冲区,并将数据填充到其中
  528. //reader := bytes.NewReader(rawData)
  529. //fmt.Println("perfMapTypeSocketEvents3")
  530. //fmt.Println(len(rawData))
  531. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  532. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  533. // fmt.Println(reader.Len())
  534. // fmt.Println("Failed to read data:", err)
  535. // continue
  536. //}
  537. //fmt.Println("perfMapTypeSocketEvents4")
  538. //
  539. //// 打印解析后的数据
  540. //fmt.Println("EventsNum:", buffer.EventsNum)
  541. //fmt.Println("Len:", buffer.Len)
  542. //
  543. //// 打印 char data 的内容
  544. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  545. //socketDataBuffer := rec.RawSample
  546. // 解析 __socket_data_buffer
  547. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  548. //
  549. //// 获取 char data[32760];
  550. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  551. //// todo
  552. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  553. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  554. //
  555. //// 解析C结构体中的data字段
  556. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  557. //// 打印或处理包含的数据
  558. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  559. /*todo */
  560. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  561. //dataPtr := unsafe.Pointer(&buf.data[0])
  562. //socketData := (*SocketData)(dataPtr)
  563. //reader2 := bytes.NewBuffer(rec.RawSample)
  564. // 222222
  565. //fmt.Println("socketData.Pid:", socketData.pid)
  566. //fmt.Println("socketData.Tgid:", socketData.tgid)
  567. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  568. //fmt.Println("socketData.Source:", socketData.source)
  569. //
  570. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  571. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  572. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  573. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  574. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  575. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  576. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  577. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  578. //fmt.Println("socketData.Direction:", socketData.Direction)
  579. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  580. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  581. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  582. //socketData := &SocketData{}
  583. //reader := bytes.NewBuffer(rec.RawSample)
  584. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  585. // klog.Warningln("failed1 to read msg:", err)
  586. // continue
  587. //}
  588. //
  589. //var data []byte
  590. //payload := reader.Bytes()
  591. //switch {
  592. //case v.Len == 0:
  593. //case v.Len > 32760:
  594. // data = payload[:32760]
  595. //default:
  596. // data = payload[:v.Len]
  597. //}
  598. //////data2 := data[:v.Len]
  599. ////fmt.Println("perfMapTypeSocketEvents")
  600. //fmt.Println(v.EventsNum)
  601. //fmt.Println(v.Len)
  602. //fmt.Println(string(data))
  603. //
  604. //var data2 SocketData
  605. //reader2 := bytes.NewBuffer(data)
  606. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  607. // klog.Warningln("failed2 to read msg:", err)
  608. // continue
  609. //}
  610. //
  611. //fmt.Println(data2.Pid)
  612. //fmt.Println(data2.Tgid)
  613. //fmt.Println(string(v.Data))
  614. //continue
  615. case perfMapTypeL7Events:
  616. v := &l7Event{}
  617. reader := bytes.NewBuffer(rec.RawSample)
  618. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  619. klog.Warningln("failed to read msg:", err)
  620. continue
  621. }
  622. //fmt.Println("v.TraceIdFrom")
  623. //fmt.Println(v.TraceIdFrom)
  624. //a := hex.EncodeToString(v.TraceIdFrom[:])
  625. //for _, b := range v.AssumedAppId {
  626. // fmt.Printf("v.AssumedAppId- %02\n", b)
  627. //}
  628. //fmt.Println(a)
  629. payload := reader.Bytes()
  630. req := &l7.RequestData{
  631. Protocol: l7.Protocol(v.Protocol),
  632. Status: l7.Status(v.Status),
  633. Duration: time.Duration(v.Duration),
  634. Method: l7.Method(v.Method),
  635. StatementId: v.StatementId,
  636. TraceId: v.TraceId,
  637. TraceStart: v.TraceStart,
  638. TraceEnd: v.TraceEnd,
  639. EventCount: v.EventCount,
  640. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  641. SpanId: hex.EncodeToString(v.SpanId[:]),
  642. StartAt: v.StartAt,
  643. EndAt: v.EndtAt,
  644. }
  645. if v.TraceEnd == 1 {
  646. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  647. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  648. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  649. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  650. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  651. req.SAddr = ipPort(v.SAddr,v.Sport)
  652. req.DAddr = ipPort(v.DAddr,v.Dport)
  653. klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
  654. klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
  655. }
  656. switch {
  657. case v.PayloadSize == 0:
  658. case v.PayloadSize > MaxPayloadSize:
  659. req.Payload = payload[:MaxPayloadSize]
  660. default:
  661. req.Payload = payload[:v.PayloadSize]
  662. }
  663. //fmt.Println("==========")
  664. //fmt.Println("req.Payload:", string(req.Payload))
  665. //fmt.Println("==========")
  666. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  667. case perfMapTypeFileEvents:
  668. v := &fileEvent{}
  669. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  670. klog.Warningln("failed to read msg:", err)
  671. continue
  672. }
  673. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  674. case perfMapTypeProcEvents:
  675. v := &procEvent{}
  676. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  677. klog.Warningln("failed to read msg:", err)
  678. continue
  679. }
  680. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  681. case perfMapTypeTCPEvents:
  682. v := &tcpEvent{}
  683. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  684. klog.Warningln("failed to read msg:", err)
  685. continue
  686. }
  687. event = Event{
  688. Type: v.Type,
  689. Pid: v.Pid,
  690. SrcAddr: ipPort(v.SAddr, v.SPort),
  691. DstAddr: ipPort(v.DAddr, v.DPort),
  692. Fd: v.Fd,
  693. Timestamp: v.Timestamp,
  694. Duration: time.Duration(v.Duration),
  695. }
  696. if v.Type == EventTypeConnectionClose {
  697. event.TrafficStats = &TrafficStats{
  698. BytesSent: v.BytesSent,
  699. BytesReceived: v.BytesReceived,
  700. }
  701. }
  702. event.FirstReadTime = v.FirstReadTime
  703. event.FirstWriteTime = v.FirstWriteTime
  704. event.NewReadTime = v.NewReadTime
  705. if v.Type == EventTypeAcceptClose {
  706. event.TrafficStats = &TrafficStats{
  707. BytesSent: v.BytesSent,
  708. BytesReceived: v.BytesReceived,
  709. }
  710. }
  711. case perfMapTypePythonThreadEvents:
  712. v := &pythonThreadEvent{}
  713. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  714. klog.Warningln("failed to read msg:", err)
  715. continue
  716. }
  717. event = Event{
  718. Type: v.Type,
  719. Pid: v.Pid,
  720. Duration: time.Duration(v.Duration),
  721. }
  722. case perfMapTypeEventQueue:
  723. v := &StackEvent{}
  724. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  725. klog.Warningln("failed to read msg:", err)
  726. continue
  727. }
  728. event = Event{
  729. Type: EventTypeFunEnt,
  730. StackEvent: v,
  731. }
  732. default:
  733. continue
  734. }
  735. ch <- event
  736. }
  737. }
  738. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  739. i, _ := netaddr.FromStdIP(ip[:])
  740. return netaddr.IPPortFrom(i, port)
  741. }
  742. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  743. var err error
  744. var info EbpfProcInfo
  745. if appInfo.EBPFProcInfo == nil {
  746. info = EbpfProcInfo{
  747. InstanceId: appInfo.InstanceIdHash.HashtVal,
  748. AppId: appInfo.AppIdHash.HashtVal,
  749. CodeType: uint16(appInfo.CodeType),
  750. }
  751. } else {
  752. info = *appInfo.EBPFProcInfo
  753. info.AppId = appInfo.AppIdHash.HashtVal
  754. }
  755. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  756. if err != nil {
  757. klog.Error("failed to update program info", err)
  758. }
  759. appInfo.EBPFProcInfo = &info
  760. return err
  761. }
  762. // TODO check language
  763. func (t *Tracer) DisableL7Tracing() bool {
  764. return t.disableL7Tracing
  765. }
  766. func (t *Tracer) DisableE2ETracing() bool {
  767. return t.disableE2ETracing
  768. }
  769. func (t *Tracer) DisableStackTracing() bool {
  770. return t.disableStackTracing
  771. }