tracer.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. debugelf "debug/elf"
  5. "encoding/binary"
  6. "encoding/hex"
  7. "errors"
  8. "fmt"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/cilium/ebpf"
  14. "github.com/cilium/ebpf/link"
  15. "github.com/cilium/ebpf/perf"
  16. "github.com/coroot/coroot-node-agent/common"
  17. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  18. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  19. "github.com/coroot/coroot-node-agent/proc"
  20. . "github.com/coroot/coroot-node-agent/utils/modelse"
  21. klog "github.com/sirupsen/logrus"
  22. "golang.org/x/sys/unix"
  23. "inet.af/netaddr"
  24. )
  25. /*
  26. #define TASK_COMM_LEN 16
  27. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  28. #include <linux/types.h>
  29. struct __tuple_t {
  30. __u8 daddr[16];
  31. __u8 rcv_saddr[16];
  32. __u8 addr_len;
  33. __u8 l4_protocol;
  34. __u16 dport;
  35. __u16 num;
  36. };
  37. struct __socket_data {
  38. __u32 pid;
  39. __u32 tgid;
  40. __u64 coroutine_id;
  41. __u8 source;
  42. __u8 comm[TASK_COMM_LEN];
  43. __u64 socket_id;
  44. struct __tuple_t tuple;
  45. __u32 extra_data;
  46. __u32 extra_data_count;
  47. __u32 tcp_seq;
  48. __u64 thread_trace_id;
  49. __u64 timestamp;
  50. __u8 direction: 1;
  51. __u8 msg_type: 7;
  52. __u64 syscall_len;
  53. __u64 data_seq;
  54. __u16 data_type;
  55. __u16 data_len;
  56. char data[BURST_DATA_BUF_SIZE];
  57. } __attribute__((packed));
  58. struct __socket_data_buffer {
  59. __u32 events_num;
  60. __u32 len;
  61. char data[32760];
  62. };
  63. */
  64. import "C"
  65. type SocketData C.struct___socket_data
  66. type SocketDataBuffer C.struct___socket_data_buffer
  67. const MaxPayloadSize = 1024
  68. type EventType uint32
  69. type EventReason uint32
  70. const (
  71. EventTypeProcessStart EventType = 1
  72. EventTypeProcessExit EventType = 2
  73. EventTypeConnectionOpen EventType = 3
  74. EventTypeConnectionClose EventType = 4
  75. EventTypeConnectionError EventType = 5
  76. EventTypeListenOpen EventType = 6
  77. EventTypeListenClose EventType = 7
  78. EventTypeFileOpen EventType = 8
  79. EventTypeTCPRetransmit EventType = 9
  80. EventTypeL7Request EventType = 10
  81. EventTypeFunEnt EventType = 11
  82. EventTypeFunRet EventType = 12
  83. EventTypeAcceptOpen EventType = 13
  84. EventTypeAcceptClose EventType = 14
  85. EventReasonNone EventReason = 0
  86. EventReasonOOMKill EventReason = 1
  87. )
  88. type TrafficStats struct {
  89. BytesSent uint64
  90. BytesReceived uint64
  91. }
  92. type Event struct {
  93. StackEvent *StackEvent
  94. Type EventType
  95. Reason EventReason
  96. Pid uint32
  97. SrcAddr netaddr.IPPort
  98. DstAddr netaddr.IPPort
  99. Fd uint64
  100. Timestamp uint64
  101. Duration time.Duration
  102. L7Request *l7.RequestData
  103. TrafficStats *TrafficStats
  104. FirstReadTime uint64
  105. FirstWriteTime uint64
  106. NewReadTime uint64
  107. }
  108. type perfMapType uint8
  109. const (
  110. perfMapTypeProcEvents perfMapType = 1
  111. perfMapTypeTCPEvents perfMapType = 2
  112. perfMapTypeFileEvents perfMapType = 3
  113. perfMapTypeL7Events perfMapType = 4
  114. perfMapTypeSocketEvents perfMapType = 5
  115. perfMapTypeEventQueue perfMapType = 6
  116. perfMapTypePythonThreadEvents perfMapType = 7
  117. )
  118. type Tracer struct {
  119. kernelVersion string
  120. disableL7Tracing bool
  121. disableE2ETracing bool
  122. disableStackTracing bool
  123. collection *ebpf.Collection
  124. readers map[string]*perf.Reader
  125. links []link.Link
  126. uprobes map[string]*ebpf.Program
  127. Symbols []debugelf.Symbol
  128. Uprobes []tracer.Uprobe
  129. UprobesMap map[string]tracer.Uprobe
  130. }
  131. func NewTracer(kernelVersion string, disableL7Tracing, disableE2ETracing, disableStackTracing bool) *Tracer {
  132. if disableL7Tracing {
  133. klog.Infoln("L7 tracing is disabled")
  134. }
  135. if disableE2ETracing {
  136. klog.Infoln("L7 tracing is disabled")
  137. }
  138. if disableStackTracing {
  139. klog.Infoln("L7 stack is disabled")
  140. }
  141. return &Tracer{
  142. kernelVersion: kernelVersion,
  143. disableL7Tracing: disableL7Tracing,
  144. disableE2ETracing: disableE2ETracing,
  145. disableStackTracing: disableStackTracing,
  146. readers: map[string]*perf.Reader{},
  147. uprobes: map[string]*ebpf.Program{},
  148. }
  149. }
  150. func (t *Tracer) Run(events chan<- Event) error {
  151. if err := t.ebpf(events); err != nil {
  152. return err
  153. }
  154. if err := t.init(events); err != nil {
  155. return err
  156. }
  157. return nil
  158. }
  159. func (t *Tracer) Close() {
  160. for _, p := range t.uprobes {
  161. _ = p.Close()
  162. }
  163. for _, l := range t.links {
  164. _ = l.Close()
  165. }
  166. for _, r := range t.readers {
  167. _ = r.Close()
  168. }
  169. t.collection.Close()
  170. }
  171. func (t *Tracer) init(ch chan<- Event) error {
  172. pids, err := proc.ListPids()
  173. if err != nil {
  174. return fmt.Errorf("failed to list pids: %w", err)
  175. }
  176. for _, pid := range pids {
  177. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  178. }
  179. fds, sockets := readFds(pids)
  180. for _, fd := range fds {
  181. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  182. }
  183. listens := map[uint64]bool{}
  184. for _, s := range sockets {
  185. if s.Listen {
  186. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  187. }
  188. }
  189. ebpfConnectionsMap := t.collection.Maps["active_connections"]
  190. timestamp := uint64(time.Now().UnixNano())
  191. for _, s := range sockets {
  192. typ := EventTypeConnectionOpen
  193. if s.Listen {
  194. typ = EventTypeListenOpen
  195. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  196. continue
  197. }
  198. ch <- Event{
  199. Type: typ,
  200. Pid: s.pid,
  201. Timestamp: timestamp,
  202. Fd: s.fd,
  203. SrcAddr: s.SAddr,
  204. DstAddr: s.DAddr,
  205. }
  206. if typ == EventTypeConnectionOpen {
  207. id := ConnectionId{FD: s.fd, PID: s.pid}
  208. conn := Connection{Timestamp: timestamp}
  209. if err := ebpfConnectionsMap.Update(id, conn, ebpf.UpdateNoExist); err != nil {
  210. klog.Warningln(err)
  211. }
  212. }
  213. }
  214. return nil
  215. }
  216. func (t *Tracer) ActiveConnectionsIterator() *ebpf.MapIterator {
  217. return t.collection.Maps["active_connections"].Iterate()
  218. }
  219. func (t *Tracer) ActiveAcceptsIterator() *ebpf.MapIterator {
  220. return t.collection.Maps["active_accepts"].Iterate()
  221. }
  222. type ConnectionId struct {
  223. FD uint64
  224. PID uint32
  225. _ uint32
  226. }
  227. type Connection struct {
  228. Timestamp uint64
  229. BytesSent uint64
  230. BytesReceived uint64
  231. }
  232. type perfMap struct {
  233. name string
  234. perCPUBufferSizePages int
  235. typ perfMapType
  236. }
  237. func (t *Tracer) ebpf(ch chan<- Event) error {
  238. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  239. path, prg, err := EbpfCode(kv)
  240. klog.Infof("kv is [%s], kernel version: [%s] path: [%s]", kv, t.kernelVersion, path)
  241. if len(prg) == 0 || err != nil {
  242. return fmt.Errorf("kv is %s, unsupported kernel version: [%s] path: [%s] err:<%v>", kv, t.kernelVersion, path, err)
  243. }
  244. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  245. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  246. if debugFsErr != nil && traceFsErr != nil {
  247. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  248. }
  249. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  250. if err != nil {
  251. return fmt.Errorf("failed to load collection spec: %w", err)
  252. }
  253. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  254. tracer.PidFilter(collectionSpec)
  255. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  256. klog.Infof("[start] Look eBPF .maps")
  257. for _, spec := range collectionSpec.Maps {
  258. klog.Infoln(spec.Name)
  259. }
  260. klog.Infof("[end] Look eBPF .maps")
  261. tracer.MapInit(collectionSpec, opts)
  262. // TODO 多进程
  263. // tracer.SetConstants(collectionSpec)
  264. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  265. if err != nil {
  266. var verr *ebpf.VerifierError
  267. if errors.As(err, &verr) {
  268. klog.Errorf("----%+v", verr)
  269. }
  270. return fmt.Errorf("failed to load collection: %w", err)
  271. }
  272. tracer.Offset()
  273. t.collection = c
  274. perfMaps := []perfMap{
  275. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  276. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  277. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  278. {name: "tcp_accept_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  279. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  280. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  281. {name: "event_queue", typ: perfMapTypeEventQueue, perCPUBufferSizePages: 32},
  282. {name: "python_thread_events", typ: perfMapTypePythonThreadEvents, perCPUBufferSizePages: 4},
  283. }
  284. tracer.MapInsert(c)
  285. if !t.DisableL7Tracing() {
  286. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  287. }
  288. perfMaps = append(perfMaps, perfMap{name: MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  289. klog.Infof("[start] Look eBPF perf_maps")
  290. for _, pm := range perfMaps {
  291. klog.Infoln(pm.name)
  292. m, ok := t.collection.Maps[pm.name]
  293. if ok {
  294. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  295. if err != nil {
  296. t.Close()
  297. return fmt.Errorf("failed to create ebpf reader: %w", err)
  298. }
  299. t.readers[pm.name] = r
  300. // event监听
  301. go runEventsReader(pm.name, r, ch, pm.typ)
  302. }
  303. }
  304. klog.Infof("[end] Look eBPF perf_maps")
  305. klog.Infof("[start] Look eBPF specPrograms")
  306. for _, programSpec := range collectionSpec.Programs {
  307. program := t.collection.Programs[programSpec.Name]
  308. klog.Infof("%s:[%s]", programSpec.SectionName, programSpec.Name)
  309. if t.DisableL7Tracing() {
  310. switch programSpec.Name {
  311. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  312. continue
  313. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  314. continue
  315. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  316. continue
  317. }
  318. }
  319. var l link.Link
  320. switch programSpec.Type {
  321. case ebpf.TracePoint:
  322. if strings.Contains(programSpec.SectionName, "prog") {
  323. continue
  324. }
  325. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  326. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  327. case ebpf.Kprobe:
  328. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  329. t.uprobes[programSpec.Name] = program
  330. continue
  331. }
  332. if strings.HasPrefix(programSpec.SectionName, "kretprobe/") {
  333. l, err = link.Kretprobe(programSpec.AttachTo, program, nil)
  334. t.links = append(t.links, l)
  335. continue
  336. }
  337. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  338. }
  339. if err != nil {
  340. t.Close()
  341. return fmt.Errorf("failed to link program: %w", err)
  342. }
  343. t.links = append(t.links, l)
  344. }
  345. klog.Infof("[end] Look eBPF specPrograms")
  346. return nil
  347. }
  348. func (t EventType) String() string {
  349. switch t {
  350. case EventTypeProcessStart:
  351. return "process-start"
  352. case EventTypeProcessExit:
  353. return "process-exit"
  354. case EventTypeConnectionOpen:
  355. return "connection-open"
  356. case EventTypeConnectionClose:
  357. return "connection-close"
  358. case EventTypeConnectionError:
  359. return "connection-error"
  360. case EventTypeListenOpen:
  361. return "listen-open"
  362. case EventTypeListenClose:
  363. return "listen-close"
  364. case EventTypeFileOpen:
  365. return "file-open"
  366. case EventTypeTCPRetransmit:
  367. return "tcp-retransmit"
  368. case EventTypeL7Request:
  369. return "l7-request"
  370. }
  371. return "unknown: " + strconv.Itoa(int(t))
  372. }
  373. func (t EventReason) String() string {
  374. switch t {
  375. case EventReasonNone:
  376. return "none"
  377. case EventReasonOOMKill:
  378. return "oom-kill"
  379. }
  380. return "unknown: " + strconv.Itoa(int(t))
  381. }
  382. type procEvent struct {
  383. Type EventType
  384. Pid uint32
  385. Reason uint32
  386. }
  387. type tcpEvent struct {
  388. Fd uint64
  389. Timestamp uint64
  390. Duration uint64
  391. FirstReadTime uint64
  392. FirstWriteTime uint64
  393. NewReadTime uint64
  394. Type EventType
  395. Pid uint32
  396. BytesSent uint64
  397. BytesReceived uint64
  398. SPort uint16
  399. DPort uint16
  400. SAddr [16]byte
  401. DAddr [16]byte
  402. }
  403. type fileEvent struct {
  404. Type EventType
  405. Pid uint32
  406. Fd uint64
  407. }
  408. // struct l7_event in l7.c
  409. type l7Event struct {
  410. Fd uint64
  411. ConnectionTimestamp uint64
  412. Pid uint32
  413. Status uint32
  414. Duration uint64
  415. Protocol uint8
  416. Method uint8
  417. Padding uint16
  418. StatementId uint32
  419. PayloadSize uint64
  420. TraceId uint64
  421. StartAt uint64 // ns
  422. EndtAt uint64 // ns
  423. TraceStart uint32
  424. TraceEnd uint32
  425. EventCount uint32
  426. Sport uint16
  427. Dport uint16
  428. SAddr [16]byte
  429. DAddr [16]byte
  430. ComponentSport uint16
  431. ComponentDport uint16
  432. ComponentSAddr [16]byte
  433. ComponentDAddr [16]byte
  434. AssumedAppId HashByte
  435. SpanId HashByte
  436. TraceIdFrom HashByte16
  437. CalledId HashByte
  438. InstanceIdFrom HashByte
  439. AppIdFrom HashByte
  440. SpanIdFrom HashByte
  441. }
  442. type SocketDataBufferddd struct {
  443. EventsNum uint32
  444. Len uint32
  445. Data [32760]byte
  446. }
  447. const (
  448. TASK_COMM_LEN = 16
  449. BURST_DATA_BUF_SIZE = 8192
  450. )
  451. type Tuple struct {
  452. Daddr [16]uint8
  453. RcvSaddr [16]uint8
  454. AddrLen uint8
  455. L4Protocol uint8
  456. Dport uint16
  457. Num uint16
  458. }
  459. type SocketDatadddd struct {
  460. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  461. Tgid uint32 // 进程号
  462. CoroutineID uint64
  463. Source uint8
  464. Comm [TASK_COMM_LEN]byte
  465. SocketID uint64
  466. Tuple Tuple
  467. ExtraData uint32
  468. ExtraDataCount uint32
  469. TcpSeq uint32
  470. ThreadTraceID uint64
  471. Timestamp uint64
  472. Direction uint8
  473. MsgType uint8
  474. SyscallLen uint64
  475. DataSeq uint64
  476. DataType uint16
  477. DataLen uint16
  478. Data [BURST_DATA_BUF_SIZE]byte
  479. }
  480. type StackEvent struct {
  481. Type uint64
  482. Pid uint64
  483. TraceId uint64
  484. Goid uint64
  485. Ip uint64
  486. Bp uint64
  487. CallerIp uint64
  488. CallerBp uint64
  489. TimeNsStart uint64
  490. TimeNsEnd uint64
  491. // Nid uint64
  492. // Fpid uint64
  493. // Level uint64
  494. Location byte
  495. ClassName [100]byte
  496. MethedName [100]byte
  497. }
  498. type StackFunEvent struct {
  499. StackEvent StackEvent
  500. Uprobe *tracer.Uprobe
  501. }
  502. type pythonThreadEvent struct {
  503. Type EventType
  504. Pid uint32
  505. Duration uint64
  506. }
  507. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  508. for {
  509. rec, err := r.Read()
  510. if err != nil {
  511. if errors.Is(err, perf.ErrClosed) {
  512. break
  513. }
  514. continue
  515. }
  516. if rec.LostSamples > 0 {
  517. klog.Errorln(name, "lost samples:", rec.LostSamples)
  518. continue
  519. }
  520. var event Event
  521. switch typ {
  522. case perfMapTypeSocketEvents:
  523. //fmt.Println("perfMapTypeSocketEvents")
  524. // 假设 rec.RawSample 包含数据,类型为 []byte
  525. //rawData := rec.RawSample
  526. //fmt.Println("perfMapTypeSocketEvents2")
  527. //
  528. //// 创建一个 SocketDataBuffer 结构体实例
  529. //var buffer SocketDataBuffer
  530. //
  531. //// 创建一个字节缓冲区,并将数据填充到其中
  532. //reader := bytes.NewReader(rawData)
  533. //fmt.Println("perfMapTypeSocketEvents3")
  534. //fmt.Println(len(rawData))
  535. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  536. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  537. // fmt.Println(reader.Len())
  538. // fmt.Println("Failed to read data:", err)
  539. // continue
  540. //}
  541. //fmt.Println("perfMapTypeSocketEvents4")
  542. //
  543. //// 打印解析后的数据
  544. //fmt.Println("EventsNum:", buffer.EventsNum)
  545. //fmt.Println("Len:", buffer.Len)
  546. //
  547. //// 打印 char data 的内容
  548. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  549. //socketDataBuffer := rec.RawSample
  550. // 解析 __socket_data_buffer
  551. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  552. //
  553. //// 获取 char data[32760];
  554. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  555. //// todo
  556. //fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  557. //fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  558. //
  559. //// 解析C结构体中的data字段
  560. //dataSlice := C.GoBytes(unsafe.Pointer(&socketData.data[0]), C.int(socketData.data_len))
  561. //// 打印或处理包含的数据
  562. //fmt.Printf("socketData.Payload:%v \n", string(dataSlice))
  563. /*todo */
  564. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  565. //dataPtr := unsafe.Pointer(&buf.data[0])
  566. //socketData := (*SocketData)(dataPtr)
  567. //reader2 := bytes.NewBuffer(rec.RawSample)
  568. // 222222
  569. //fmt.Println("socketData.Pid:", socketData.pid)
  570. //fmt.Println("socketData.Tgid:", socketData.tgid)
  571. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  572. //fmt.Println("socketData.Source:", socketData.source)
  573. //
  574. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  575. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  576. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  577. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  578. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  579. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  580. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  581. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  582. //fmt.Println("socketData.Direction:", socketData.Direction)
  583. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  584. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  585. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  586. //socketData := &SocketData{}
  587. //reader := bytes.NewBuffer(rec.RawSample)
  588. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  589. // klog.Warningln("failed1 to read msg:", err)
  590. // continue
  591. //}
  592. //
  593. //var data []byte
  594. //payload := reader.Bytes()
  595. //switch {
  596. //case v.Len == 0:
  597. //case v.Len > 32760:
  598. // data = payload[:32760]
  599. //default:
  600. // data = payload[:v.Len]
  601. //}
  602. //////data2 := data[:v.Len]
  603. ////fmt.Println("perfMapTypeSocketEvents")
  604. //fmt.Println(v.EventsNum)
  605. //fmt.Println(v.Len)
  606. //fmt.Println(string(data))
  607. //
  608. //var data2 SocketData
  609. //reader2 := bytes.NewBuffer(data)
  610. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  611. // klog.Warningln("failed2 to read msg:", err)
  612. // continue
  613. //}
  614. //
  615. //fmt.Println(data2.Pid)
  616. //fmt.Println(data2.Tgid)
  617. //fmt.Println(string(v.Data))
  618. //continue
  619. case perfMapTypeL7Events:
  620. v := &l7Event{}
  621. reader := bytes.NewBuffer(rec.RawSample)
  622. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  623. klog.Warningln("failed to read msg:", err)
  624. continue
  625. }
  626. //fmt.Println("v.TraceIdFrom")
  627. //fmt.Println(v.TraceIdFrom)
  628. //a := hex.EncodeToString(v.TraceIdFrom[:])
  629. //for _, b := range v.AssumedAppId {
  630. // fmt.Printf("v.AssumedAppId- %02\n", b)
  631. //}
  632. //fmt.Println(a)
  633. payload := reader.Bytes()
  634. req := &l7.RequestData{
  635. Protocol: l7.Protocol(v.Protocol),
  636. Status: l7.Status(v.Status),
  637. Duration: time.Duration(v.Duration),
  638. Method: l7.Method(v.Method),
  639. StatementId: v.StatementId,
  640. TraceId: v.TraceId,
  641. TraceStart: v.TraceStart,
  642. TraceEnd: v.TraceEnd,
  643. EventCount: v.EventCount,
  644. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  645. SpanId: hex.EncodeToString(v.SpanId[:]),
  646. StartAt: v.StartAt,
  647. EndAt: v.EndtAt,
  648. ComponentSAddr: ipPort(v.SAddr,v.Sport),
  649. ComponentDAddr: ipPort(v.DAddr,v.Dport),
  650. }
  651. if req.Protocol == l7.ProtocolHTTP{
  652. klog.Infof("runEventsReader ComponentSAddr.String %s", req.ComponentSAddr.String())
  653. klog.Infof("runEventsReader ComponentDAddr.String %s", req.ComponentDAddr.String())
  654. }
  655. if v.TraceEnd == 1 {
  656. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  657. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  658. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  659. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  660. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  661. req.SAddr = ipPort(v.SAddr,v.Sport)
  662. req.DAddr = ipPort(v.DAddr,v.Dport)
  663. klog.Infof("runEventsReader SAddr.String %s", req.SAddr.String())
  664. klog.Infof("runEventsReader DAddr.String %s", req.DAddr.String())
  665. }
  666. switch {
  667. case v.PayloadSize == 0:
  668. case v.PayloadSize > MaxPayloadSize:
  669. req.Payload = payload[:MaxPayloadSize]
  670. default:
  671. req.Payload = payload[:v.PayloadSize]
  672. }
  673. //fmt.Println("==========")
  674. //fmt.Println("req.Payload:", string(req.Payload))
  675. //fmt.Println("==========")
  676. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  677. case perfMapTypeFileEvents:
  678. v := &fileEvent{}
  679. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  680. klog.Warningln("failed to read msg:", err)
  681. continue
  682. }
  683. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  684. case perfMapTypeProcEvents:
  685. v := &procEvent{}
  686. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  687. klog.Warningln("failed to read msg:", err)
  688. continue
  689. }
  690. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  691. case perfMapTypeTCPEvents:
  692. v := &tcpEvent{}
  693. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  694. klog.Warningln("failed to read msg:", err)
  695. continue
  696. }
  697. event = Event{
  698. Type: v.Type,
  699. Pid: v.Pid,
  700. SrcAddr: ipPort(v.SAddr, v.SPort),
  701. DstAddr: ipPort(v.DAddr, v.DPort),
  702. Fd: v.Fd,
  703. Timestamp: v.Timestamp,
  704. Duration: time.Duration(v.Duration),
  705. }
  706. if v.Type == EventTypeConnectionClose {
  707. event.TrafficStats = &TrafficStats{
  708. BytesSent: v.BytesSent,
  709. BytesReceived: v.BytesReceived,
  710. }
  711. }
  712. event.FirstReadTime = v.FirstReadTime
  713. event.FirstWriteTime = v.FirstWriteTime
  714. event.NewReadTime = v.NewReadTime
  715. if v.Type == EventTypeAcceptClose {
  716. event.TrafficStats = &TrafficStats{
  717. BytesSent: v.BytesSent,
  718. BytesReceived: v.BytesReceived,
  719. }
  720. }
  721. case perfMapTypePythonThreadEvents:
  722. v := &pythonThreadEvent{}
  723. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  724. klog.Warningln("failed to read msg:", err)
  725. continue
  726. }
  727. event = Event{
  728. Type: v.Type,
  729. Pid: v.Pid,
  730. Duration: time.Duration(v.Duration),
  731. }
  732. case perfMapTypeEventQueue:
  733. v := &StackEvent{}
  734. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  735. klog.Warningln("failed to read msg:", err)
  736. continue
  737. }
  738. event = Event{
  739. Type: EventTypeFunEnt,
  740. StackEvent: v,
  741. }
  742. default:
  743. continue
  744. }
  745. ch <- event
  746. }
  747. }
  748. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  749. i, _ := netaddr.FromStdIP(ip[:])
  750. return netaddr.IPPortFrom(i, port)
  751. }
  752. func (t *Tracer) InitKProcInfo(pid uint32, appInfo *AppInfo) error {
  753. var err error
  754. var info EbpfProcInfo
  755. if appInfo.EBPFProcInfo == nil {
  756. info = EbpfProcInfo{
  757. InstanceId: appInfo.InstanceIdHash.HashtVal,
  758. AppId: appInfo.AppIdHash.HashtVal,
  759. CodeType: uint16(appInfo.CodeType),
  760. }
  761. } else {
  762. info = *appInfo.EBPFProcInfo
  763. info.AppId = appInfo.AppIdHash.HashtVal
  764. }
  765. _, err = tracer.UpdateProcInfoToMap(t.collection, pid, info)
  766. if err != nil {
  767. klog.Error("failed to update program info", err)
  768. }
  769. appInfo.EBPFProcInfo = &info
  770. return err
  771. }
  772. // TODO check language
  773. func (t *Tracer) DisableL7Tracing() bool {
  774. return t.disableL7Tracing
  775. }
  776. func (t *Tracer) DisableE2ETracing() bool {
  777. return t.disableE2ETracing
  778. }
  779. func (t *Tracer) DisableStackTracing() bool {
  780. return t.disableStackTracing
  781. }