tracer.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "github.com/cilium/ebpf"
  9. "github.com/cilium/ebpf/link"
  10. "github.com/cilium/ebpf/perf"
  11. "github.com/coroot/coroot-node-agent/common"
  12. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  13. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  14. "github.com/coroot/coroot-node-agent/proc"
  15. "github.com/coroot/coroot-node-agent/utils"
  16. "golang.org/x/mod/semver"
  17. "golang.org/x/sys/unix"
  18. "inet.af/netaddr"
  19. "k8s.io/klog/v2"
  20. "os"
  21. "runtime"
  22. "strconv"
  23. "strings"
  24. "time"
  25. )
  26. /*
  27. #define TASK_COMM_LEN 16
  28. #define BURST_DATA_BUF_SIZE 8192 // For brust send buffer
  29. #include <linux/types.h>
  30. struct __tuple_t {
  31. __u8 daddr[16];
  32. __u8 rcv_saddr[16];
  33. __u8 addr_len;
  34. __u8 l4_protocol;
  35. __u16 dport;
  36. __u16 num;
  37. };
  38. struct __socket_data {
  39. __u32 pid;
  40. __u32 tgid;
  41. __u64 coroutine_id;
  42. __u8 source;
  43. __u8 comm[TASK_COMM_LEN];
  44. __u64 socket_id;
  45. struct __tuple_t tuple;
  46. __u32 extra_data;
  47. __u32 extra_data_count;
  48. __u32 tcp_seq;
  49. __u64 thread_trace_id;
  50. __u64 timestamp;
  51. __u8 direction: 1;
  52. __u8 msg_type: 7;
  53. __u64 syscall_len;
  54. __u64 data_seq;
  55. __u16 data_type;
  56. __u16 data_len;
  57. char data[BURST_DATA_BUF_SIZE];
  58. } __attribute__((packed));
  59. struct __socket_data_buffer {
  60. __u32 events_num;
  61. __u32 len;
  62. char data[32760];
  63. };
  64. */
  65. import "C"
  66. type SocketData C.struct___socket_data
  67. type SocketDataBuffer C.struct___socket_data_buffer
  68. const MaxPayloadSize = 1024
  69. type EventType uint32
  70. type EventReason uint32
  71. const (
  72. EventTypeProcessStart EventType = 1
  73. EventTypeProcessExit EventType = 2
  74. EventTypeConnectionOpen EventType = 3
  75. EventTypeConnectionClose EventType = 4
  76. EventTypeConnectionError EventType = 5
  77. EventTypeListenOpen EventType = 6
  78. EventTypeListenClose EventType = 7
  79. EventTypeFileOpen EventType = 8
  80. EventTypeTCPRetransmit EventType = 9
  81. EventTypeL7Request EventType = 10
  82. EventReasonNone EventReason = 0
  83. EventReasonOOMKill EventReason = 1
  84. )
  85. type Event struct {
  86. Type EventType
  87. Reason EventReason
  88. Pid uint32
  89. SrcAddr netaddr.IPPort
  90. DstAddr netaddr.IPPort
  91. Fd uint64
  92. Timestamp uint64
  93. L7Request *l7.RequestData
  94. }
  95. type perfMapType uint8
  96. const (
  97. perfMapTypeProcEvents perfMapType = 1
  98. perfMapTypeTCPEvents perfMapType = 2
  99. perfMapTypeFileEvents perfMapType = 3
  100. perfMapTypeL7Events perfMapType = 4
  101. perfMapTypeSocketEvents perfMapType = 5
  102. )
  103. type Tracer struct {
  104. kernelVersion string
  105. disableL7Tracing bool
  106. collection *ebpf.Collection
  107. readers map[string]*perf.Reader
  108. links []link.Link
  109. uprobes map[string]*ebpf.Program
  110. }
  111. func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
  112. if disableL7Tracing {
  113. klog.Infoln("L7 tracing is disabled")
  114. }
  115. return &Tracer{
  116. kernelVersion: kernelVersion,
  117. disableL7Tracing: disableL7Tracing,
  118. readers: map[string]*perf.Reader{},
  119. uprobes: map[string]*ebpf.Program{},
  120. }
  121. }
  122. func (t *Tracer) Run(events chan<- Event) error {
  123. if err := t.ebpf(events); err != nil {
  124. return err
  125. }
  126. if err := t.init(events); err != nil {
  127. return err
  128. }
  129. return nil
  130. }
  131. func (t *Tracer) Close() {
  132. for _, p := range t.uprobes {
  133. _ = p.Close()
  134. }
  135. for _, l := range t.links {
  136. _ = l.Close()
  137. }
  138. for _, r := range t.readers {
  139. _ = r.Close()
  140. }
  141. t.collection.Close()
  142. }
  143. func (t *Tracer) init(ch chan<- Event) error {
  144. pids, err := proc.ListPids()
  145. if err != nil {
  146. return fmt.Errorf("failed to list pids: %w", err)
  147. }
  148. for _, pid := range pids {
  149. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  150. }
  151. fds, sockets := readFds(pids)
  152. for _, fd := range fds {
  153. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  154. }
  155. listens := map[uint64]bool{}
  156. for _, s := range sockets {
  157. if s.Listen {
  158. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  159. }
  160. }
  161. for _, s := range sockets {
  162. typ := EventTypeConnectionOpen
  163. if s.Listen {
  164. typ = EventTypeListenOpen
  165. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  166. continue
  167. }
  168. ch <- Event{
  169. Type: typ,
  170. Pid: s.pid,
  171. Fd: s.fd,
  172. SrcAddr: s.SAddr,
  173. DstAddr: s.DAddr,
  174. }
  175. }
  176. return nil
  177. }
  178. type perfMap struct {
  179. name string
  180. perCPUBufferSizePages int
  181. typ perfMapType
  182. }
  183. func (t *Tracer) ebpf(ch chan<- Event) error {
  184. if _, ok := ebpfProg[runtime.GOARCH]; !ok {
  185. return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
  186. }
  187. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  188. var prg []byte
  189. for _, p := range ebpfProg[runtime.GOARCH] {
  190. if semver.Compare(kv, p.v) >= 0 {
  191. prg = p.p
  192. break
  193. }
  194. }
  195. if len(prg) == 0 {
  196. return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
  197. }
  198. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  199. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  200. if debugFsErr != nil && traceFsErr != nil {
  201. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  202. }
  203. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  204. if err != nil {
  205. return fmt.Errorf("failed to load collection spec: %w", err)
  206. }
  207. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  208. tracer.PidFilter(collectionSpec)
  209. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  210. for _, spec := range collectionSpec.Maps {
  211. fmt.Println("maps:", spec.Name)
  212. }
  213. tracer.MapInit(collectionSpec, opts)
  214. // TODO 多进程
  215. tracer.SetConstants(collectionSpec)
  216. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  217. if err != nil {
  218. var verr *ebpf.VerifierError
  219. if errors.As(err, &verr) {
  220. klog.Errorf("%+v", verr)
  221. }
  222. return fmt.Errorf("failed to load collection: %w", err)
  223. }
  224. tracer.Offset()
  225. t.collection = c
  226. perfMaps := []perfMap{
  227. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  228. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  229. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  230. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  231. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  232. }
  233. fmt.Println(len(collectionSpec.Programs))
  234. fmt.Println(len(c.Programs))
  235. tracer.MapInsert(c)
  236. if !t.disableL7Tracing {
  237. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  238. }
  239. perfMaps = append(perfMaps, perfMap{name: tracer.MAP_PERF_SOCKET_DATA_NAME, typ: perfMapTypeSocketEvents, perCPUBufferSizePages: 64})
  240. fmt.Println("perfMaps start --")
  241. for _, pm := range perfMaps {
  242. fmt.Println(pm.name)
  243. m, ok := t.collection.Maps[pm.name]
  244. if ok {
  245. r, err := perf.NewReader(m, pm.perCPUBufferSizePages*os.Getpagesize())
  246. if err != nil {
  247. t.Close()
  248. return fmt.Errorf("failed to create ebpf reader: %w", err)
  249. }
  250. t.readers[pm.name] = r
  251. // event监听
  252. go runEventsReader(pm.name, r, ch, pm.typ)
  253. }
  254. }
  255. fmt.Println("perfMaps end --")
  256. for _, programSpec := range collectionSpec.Programs {
  257. program := t.collection.Programs[programSpec.Name]
  258. fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type)
  259. if t.disableL7Tracing {
  260. switch programSpec.Name {
  261. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg", "sys_enter_sendmmsg":
  262. continue
  263. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  264. continue
  265. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  266. continue
  267. }
  268. }
  269. var l link.Link
  270. switch programSpec.Type {
  271. case ebpf.TracePoint:
  272. if strings.Contains(programSpec.SectionName, "prog") {
  273. continue
  274. }
  275. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  276. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  277. case ebpf.Kprobe:
  278. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  279. fmt.Println("==============uprobe s")
  280. fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type)
  281. fmt.Println("==============uprobe e")
  282. t.uprobes[programSpec.Name] = program
  283. continue
  284. }
  285. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  286. }
  287. if err != nil {
  288. t.Close()
  289. return fmt.Errorf("failed to link program: %w", err)
  290. }
  291. t.links = append(t.links, l)
  292. }
  293. return nil
  294. }
  295. func (t EventType) String() string {
  296. switch t {
  297. case EventTypeProcessStart:
  298. return "process-start"
  299. case EventTypeProcessExit:
  300. return "process-exit"
  301. case EventTypeConnectionOpen:
  302. return "connection-open"
  303. case EventTypeConnectionClose:
  304. return "connection-close"
  305. case EventTypeConnectionError:
  306. return "connection-error"
  307. case EventTypeListenOpen:
  308. return "listen-open"
  309. case EventTypeListenClose:
  310. return "listen-close"
  311. case EventTypeFileOpen:
  312. return "file-open"
  313. case EventTypeTCPRetransmit:
  314. return "tcp-retransmit"
  315. case EventTypeL7Request:
  316. return "l7-request"
  317. }
  318. return "unknown: " + strconv.Itoa(int(t))
  319. }
  320. func (t EventReason) String() string {
  321. switch t {
  322. case EventReasonNone:
  323. return "none"
  324. case EventReasonOOMKill:
  325. return "oom-kill"
  326. }
  327. return "unknown: " + strconv.Itoa(int(t))
  328. }
  329. type procEvent struct {
  330. Type EventType
  331. Pid uint32
  332. Reason uint32
  333. }
  334. type tcpEvent struct {
  335. Fd uint64
  336. Timestamp uint64
  337. Type EventType
  338. Pid uint32
  339. SPort uint16
  340. DPort uint16
  341. SAddr [16]byte
  342. DAddr [16]byte
  343. }
  344. type fileEvent struct {
  345. Type EventType
  346. Pid uint32
  347. Fd uint64
  348. }
  349. type l7Event struct {
  350. Fd uint64
  351. ConnectionTimestamp uint64
  352. Pid uint32
  353. Status uint32
  354. Duration uint64
  355. Protocol uint8
  356. Method uint8
  357. Padding uint16
  358. StatementId uint32
  359. PayloadSize uint64
  360. TraceId uint64
  361. TraceStart uint32
  362. TraceEnd uint32
  363. AssumedAppId utils.HashByte
  364. SpanId utils.HashByte
  365. TraceIdFrom utils.HashByte16
  366. CalledId utils.HashByte
  367. InstanceIdFrom utils.HashByte
  368. AppIdFrom utils.HashByte
  369. SpanIdFrom utils.HashByte
  370. }
  371. type SocketDataBufferddd struct {
  372. EventsNum uint32
  373. Len uint32
  374. Data [32760]byte
  375. }
  376. const (
  377. TASK_COMM_LEN = 16
  378. BURST_DATA_BUF_SIZE = 8192
  379. )
  380. type Tuple struct {
  381. Daddr [16]uint8
  382. RcvSaddr [16]uint8
  383. AddrLen uint8
  384. L4Protocol uint8
  385. Dport uint16
  386. Num uint16
  387. }
  388. type SocketDatadddd struct {
  389. Pid uint32 // 表示线程号 如果'pid == tgid'表示一个进程, 否则是线程
  390. Tgid uint32 // 进程号
  391. CoroutineID uint64
  392. Source uint8
  393. Comm [TASK_COMM_LEN]byte
  394. SocketID uint64
  395. Tuple Tuple
  396. ExtraData uint32
  397. ExtraDataCount uint32
  398. TcpSeq uint32
  399. ThreadTraceID uint64
  400. Timestamp uint64
  401. Direction uint8
  402. MsgType uint8
  403. SyscallLen uint64
  404. DataSeq uint64
  405. DataType uint16
  406. DataLen uint16
  407. Data [BURST_DATA_BUF_SIZE]byte
  408. }
  409. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  410. for {
  411. rec, err := r.Read()
  412. if err != nil {
  413. if errors.Is(err, perf.ErrClosed) {
  414. break
  415. }
  416. continue
  417. }
  418. if rec.LostSamples > 0 {
  419. klog.Errorln(name, "lost samples:", rec.LostSamples)
  420. continue
  421. }
  422. var event Event
  423. switch typ {
  424. case perfMapTypeSocketEvents:
  425. //fmt.Println("perfMapTypeSocketEvents")
  426. //// 假设 rec.RawSample 包含数据,类型为 []byte
  427. //rawData := rec.RawSample
  428. //fmt.Println("perfMapTypeSocketEvents2")
  429. //
  430. //// 创建一个 SocketDataBuffer 结构体实例
  431. //var buffer SocketDataBuffer
  432. //
  433. //// 创建一个字节缓冲区,并将数据填充到其中
  434. //reader := bytes.NewReader(rawData)
  435. //fmt.Println("perfMapTypeSocketEvents3")
  436. //fmt.Println(len(rawData))
  437. //// 使用 binary.Read 函数读取数据并解析为 SocketDataBuffer 结构体实例
  438. //if err := binary.Read(reader, binary.LittleEndian, &buffer); err != nil {
  439. // fmt.Println(reader.Len())
  440. // fmt.Println("Failed to read data:", err)
  441. // continue
  442. //}
  443. //fmt.Println("perfMapTypeSocketEvents4")
  444. //
  445. //// 打印解析后的数据
  446. //fmt.Println("EventsNum:", buffer.EventsNum)
  447. //fmt.Println("Len:", buffer.Len)
  448. //
  449. //// 打印 char data 的内容
  450. //fmt.Printf("Data: %s\n", string(buffer.Data[:buffer.Len])) // 仅打印实际长度的数据
  451. //socketDataBuffer := rec.RawSample
  452. /*todo */
  453. //buf := (*SocketDataBuffer)(unsafe.Pointer(&rec.RawSample[0])) //nolint:gosec
  454. //socketData := (*SocketData)(unsafe.Pointer(&buf.data[0])) //nolint:gosec
  455. /*todo */
  456. //socketData := (*(*[128]byte)(unsafe.Pointer(&eventC.line)))
  457. //dataPtr := unsafe.Pointer(&buf.data[0])
  458. //socketData := (*SocketData)(dataPtr)
  459. //reader2 := bytes.NewBuffer(rec.RawSample)
  460. // 222222
  461. //fmt.Println("socketData.Pid:", socketData.pid)
  462. //fmt.Println("socketData.Tgid:", socketData.tgid)
  463. //fmt.Println("socketData.CoroutineID:", socketData.coroutine_id)
  464. //fmt.Println("socketData.Source:", socketData.source)
  465. //
  466. //fmt.Printf("socketData.Comm: %s \n", socketData.comm)
  467. //fmt.Printf("socketData.SocketID :%v \n", socketData.socket_id)
  468. //fmt.Println("socketData.Tuple:", socketData.Tuple)
  469. //fmt.Println("socketData.ExtraData:", socketData.ExtraData)
  470. //fmt.Println("socketData.ExtraDataCount:", socketData.ExtraDataCount)
  471. //fmt.Println("socketData.TCPSeq:", socketData.TcpSeq)
  472. //fmt.Println("socketData.ThreadTraceID:", socketData.ThreadTraceID)
  473. //fmt.Println("socketData.Timestamp:", socketData.Timestamp)
  474. //fmt.Println("socketData.Direction:", socketData.Direction)
  475. //fmt.Println("socketData.MsgType:", socketData.MsgType)
  476. //fmt.Println("socketData.SyscallLen:", socketData.SyscallLen)
  477. //fmt.Println("socketData.DataSeq:", socketData.DataSeq)
  478. // todo
  479. // fmt.Printf("socketData.DataType:%d \n", (socketData.data_type))
  480. // fmt.Printf("socketData.DataLen:%d \n", (socketData.data_len))
  481. //fmt.Println("socketData.Data:", len(socketData.Data))
  482. //socketData := &SocketData{}
  483. //reader := bytes.NewBuffer(rec.RawSample)
  484. //if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  485. // klog.Warningln("failed1 to read msg:", err)
  486. // continue
  487. //}
  488. //
  489. //var data []byte
  490. //payload := reader.Bytes()
  491. //switch {
  492. //case v.Len == 0:
  493. //case v.Len > 32760:
  494. // data = payload[:32760]
  495. //default:
  496. // data = payload[:v.Len]
  497. //}
  498. //////data2 := data[:v.Len]
  499. ////fmt.Println("perfMapTypeSocketEvents")
  500. //fmt.Println(v.EventsNum)
  501. //fmt.Println(v.Len)
  502. //fmt.Println(string(data))
  503. //
  504. //var data2 SocketData
  505. //reader2 := bytes.NewBuffer(data)
  506. //if err := binary.Read(reader2, binary.LittleEndian, data2); err != nil {
  507. // klog.Warningln("failed2 to read msg:", err)
  508. // continue
  509. //}
  510. //
  511. //fmt.Println(data2.Pid)
  512. //fmt.Println(data2.Tgid)
  513. //fmt.Println(string(v.Data))
  514. //continue
  515. case perfMapTypeL7Events:
  516. v := &l7Event{}
  517. reader := bytes.NewBuffer(rec.RawSample)
  518. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  519. klog.Warningln("failed to read msg:", err)
  520. continue
  521. }
  522. fmt.Println("v.TraceIdFrom")
  523. fmt.Println(v.TraceIdFrom)
  524. a := hex.EncodeToString(v.TraceIdFrom[:])
  525. //for _, b := range v.AssumedAppId {
  526. // fmt.Printf("v.AssumedAppId- %02\n", b)
  527. //}
  528. fmt.Println(a)
  529. payload := reader.Bytes()
  530. req := &l7.RequestData{
  531. Protocol: l7.Protocol(v.Protocol),
  532. Status: l7.Status(v.Status),
  533. Duration: time.Duration(v.Duration),
  534. Method: l7.Method(v.Method),
  535. StatementId: v.StatementId,
  536. TraceId: v.TraceId,
  537. TraceStart: v.TraceStart,
  538. TraceEnd: v.TraceEnd,
  539. AssumedAppId: hex.EncodeToString(v.AssumedAppId[:]),
  540. SpanId: hex.EncodeToString(v.SpanId[:]),
  541. }
  542. if v.TraceEnd == 1 {
  543. req.ParentSpanContext.TraceIdFrom = hex.EncodeToString(v.TraceIdFrom[:])
  544. req.ParentSpanContext.CalledId = hex.EncodeToString(v.CalledId[:])
  545. req.ParentSpanContext.InstanceIdFrom = hex.EncodeToString(v.InstanceIdFrom[:])
  546. req.ParentSpanContext.AppIdFrom = hex.EncodeToString(v.AppIdFrom[:])
  547. req.ParentSpanContext.SpanIdFrom = hex.EncodeToString(v.SpanIdFrom[:])
  548. }
  549. switch {
  550. case v.PayloadSize == 0:
  551. case v.PayloadSize > MaxPayloadSize:
  552. req.Payload = payload[:MaxPayloadSize]
  553. default:
  554. req.Payload = payload[:v.PayloadSize]
  555. }
  556. fmt.Println("==========")
  557. fmt.Println("req.Payload:", string(req.Payload))
  558. fmt.Println("==========")
  559. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  560. case perfMapTypeFileEvents:
  561. v := &fileEvent{}
  562. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  563. klog.Warningln("failed to read msg:", err)
  564. continue
  565. }
  566. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  567. case perfMapTypeProcEvents:
  568. v := &procEvent{}
  569. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  570. klog.Warningln("failed to read msg:", err)
  571. continue
  572. }
  573. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  574. case perfMapTypeTCPEvents:
  575. v := &tcpEvent{}
  576. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  577. klog.Warningln("failed to read msg:", err)
  578. continue
  579. }
  580. event = Event{
  581. Type: v.Type,
  582. Pid: v.Pid,
  583. SrcAddr: ipPort(v.SAddr, v.SPort),
  584. DstAddr: ipPort(v.DAddr, v.DPort),
  585. Fd: v.Fd,
  586. Timestamp: v.Timestamp,
  587. }
  588. default:
  589. continue
  590. }
  591. ch <- event
  592. }
  593. }
  594. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  595. i, _ := netaddr.FromStdIP(ip[:])
  596. return netaddr.IPPortFrom(i, port)
  597. }