tracer.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "github.com/cilium/ebpf"
  8. "github.com/cilium/ebpf/link"
  9. "github.com/cilium/ebpf/perf"
  10. "github.com/coroot/coroot-node-agent/common"
  11. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  12. "github.com/coroot/coroot-node-agent/ebpftracer/tracer"
  13. "github.com/coroot/coroot-node-agent/proc"
  14. "golang.org/x/mod/semver"
  15. "golang.org/x/sys/unix"
  16. "inet.af/netaddr"
  17. "k8s.io/klog/v2"
  18. "os"
  19. "runtime"
  20. "strconv"
  21. "strings"
  22. "time"
  23. )
  24. const MaxPayloadSize = 1024
  25. type EventType uint32
  26. type EventReason uint32
  27. const (
  28. EventTypeProcessStart EventType = 1
  29. EventTypeProcessExit EventType = 2
  30. EventTypeConnectionOpen EventType = 3
  31. EventTypeConnectionClose EventType = 4
  32. EventTypeConnectionError EventType = 5
  33. EventTypeListenOpen EventType = 6
  34. EventTypeListenClose EventType = 7
  35. EventTypeFileOpen EventType = 8
  36. EventTypeTCPRetransmit EventType = 9
  37. EventTypeL7Request EventType = 10
  38. EventReasonNone EventReason = 0
  39. EventReasonOOMKill EventReason = 1
  40. )
  41. type Event struct {
  42. Type EventType
  43. Reason EventReason
  44. Pid uint32
  45. SrcAddr netaddr.IPPort
  46. DstAddr netaddr.IPPort
  47. Fd uint64
  48. Timestamp uint64
  49. L7Request *l7.RequestData
  50. }
  51. type perfMapType uint8
  52. const (
  53. perfMapTypeProcEvents perfMapType = 1
  54. perfMapTypeTCPEvents perfMapType = 2
  55. perfMapTypeFileEvents perfMapType = 3
  56. perfMapTypeL7Events perfMapType = 4
  57. )
  58. type Tracer struct {
  59. kernelVersion string
  60. disableL7Tracing bool
  61. collection *ebpf.Collection
  62. readers map[string]*perf.Reader
  63. links []link.Link
  64. uprobes map[string]*ebpf.Program
  65. }
  66. func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
  67. if disableL7Tracing {
  68. klog.Infoln("L7 tracing is disabled")
  69. }
  70. return &Tracer{
  71. kernelVersion: kernelVersion,
  72. disableL7Tracing: disableL7Tracing,
  73. readers: map[string]*perf.Reader{},
  74. uprobes: map[string]*ebpf.Program{},
  75. }
  76. }
  77. func (t *Tracer) Run(events chan<- Event) error {
  78. if err := t.ebpf(events); err != nil {
  79. return err
  80. }
  81. if err := t.init(events); err != nil {
  82. return err
  83. }
  84. return nil
  85. }
  86. func (t *Tracer) Close() {
  87. for _, p := range t.uprobes {
  88. _ = p.Close()
  89. }
  90. for _, l := range t.links {
  91. _ = l.Close()
  92. }
  93. for _, r := range t.readers {
  94. _ = r.Close()
  95. }
  96. t.collection.Close()
  97. }
  98. func (t *Tracer) init(ch chan<- Event) error {
  99. pids, err := proc.ListPids()
  100. if err != nil {
  101. return fmt.Errorf("failed to list pids: %w", err)
  102. }
  103. for _, pid := range pids {
  104. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  105. }
  106. fds, sockets := readFds(pids)
  107. for _, fd := range fds {
  108. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  109. }
  110. listens := map[uint64]bool{}
  111. for _, s := range sockets {
  112. if s.Listen {
  113. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  114. }
  115. }
  116. for _, s := range sockets {
  117. typ := EventTypeConnectionOpen
  118. if s.Listen {
  119. typ = EventTypeListenOpen
  120. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  121. continue
  122. }
  123. ch <- Event{
  124. Type: typ,
  125. Pid: s.pid,
  126. Fd: s.fd,
  127. SrcAddr: s.SAddr,
  128. DstAddr: s.DAddr,
  129. }
  130. }
  131. return nil
  132. }
  133. type perfMap struct {
  134. name string
  135. perCPUBufferSizePages int
  136. typ perfMapType
  137. }
  138. func (t *Tracer) ebpf(ch chan<- Event) error {
  139. if _, ok := ebpfProg[runtime.GOARCH]; !ok {
  140. return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
  141. }
  142. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  143. var prg []byte
  144. for _, p := range ebpfProg[runtime.GOARCH] {
  145. if semver.Compare(kv, p.v) >= 0 {
  146. prg = p.p
  147. break
  148. }
  149. }
  150. if len(prg) == 0 {
  151. return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
  152. }
  153. if _, err := os.Stat("/sys/kernel/debug/tracing"); err != nil {
  154. return fmt.Errorf("kernel tracing is not available: %w", err)
  155. }
  156. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  157. if err != nil {
  158. return fmt.Errorf("failed to load collection spec: %w", err)
  159. }
  160. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  161. tracer.PidFilter(collectionSpec)
  162. opts := &ebpf.CollectionOptions{MapReplacements: make(map[string]*ebpf.Map)}
  163. //for _, spec := range collectionSpec.Maps {
  164. // fmt.Println("s:", spec.Name)
  165. //}
  166. //os.Exit(1)
  167. tracer.MapInit(collectionSpec, opts)
  168. c, err := ebpf.NewCollectionWithOptions(collectionSpec, *opts)
  169. if err != nil {
  170. var verr *ebpf.VerifierError
  171. if errors.As(err, &verr) {
  172. klog.Errorf("%+v", verr)
  173. }
  174. return fmt.Errorf("failed to load collection: %w", err)
  175. }
  176. tracer.Offset()
  177. t.collection = c
  178. perfMaps := []perfMap{
  179. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  180. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  181. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  182. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  183. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  184. }
  185. fmt.Println(len(collectionSpec.Programs))
  186. fmt.Println(len(c.Programs))
  187. tracer.MapInsert(c)
  188. //os.Exit(1)
  189. if !t.disableL7Tracing {
  190. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  191. }
  192. fmt.Println("perfMaps start --")
  193. for _, pm := range perfMaps {
  194. fmt.Println(pm.name)
  195. r, err := perf.NewReader(t.collection.Maps[pm.name], pm.perCPUBufferSizePages*os.Getpagesize())
  196. if err != nil {
  197. t.Close()
  198. return fmt.Errorf("failed to create ebpf reader: %w", err)
  199. }
  200. t.readers[pm.name] = r
  201. // event监听
  202. go runEventsReader(pm.name, r, ch, pm.typ)
  203. }
  204. fmt.Println("perfMaps end --")
  205. for _, programSpec := range collectionSpec.Programs {
  206. program := t.collection.Programs[programSpec.Name]
  207. fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type)
  208. if t.disableL7Tracing {
  209. switch programSpec.Name {
  210. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
  211. continue
  212. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  213. continue
  214. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  215. continue
  216. }
  217. }
  218. var l link.Link
  219. switch programSpec.Type {
  220. case ebpf.TracePoint:
  221. if strings.Contains(programSpec.SectionName, "prog") {
  222. continue
  223. }
  224. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  225. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  226. case ebpf.Kprobe:
  227. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  228. fmt.Println("==============uprobe s")
  229. fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type)
  230. fmt.Println("==============uprobe e")
  231. t.uprobes[programSpec.Name] = program
  232. continue
  233. }
  234. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  235. }
  236. if err != nil {
  237. t.Close()
  238. return fmt.Errorf("failed to link program: %w", err)
  239. }
  240. t.links = append(t.links, l)
  241. }
  242. return nil
  243. }
  244. func (t EventType) String() string {
  245. switch t {
  246. case EventTypeProcessStart:
  247. return "process-start"
  248. case EventTypeProcessExit:
  249. return "process-exit"
  250. case EventTypeConnectionOpen:
  251. return "connection-open"
  252. case EventTypeConnectionClose:
  253. return "connection-close"
  254. case EventTypeConnectionError:
  255. return "connection-error"
  256. case EventTypeListenOpen:
  257. return "listen-open"
  258. case EventTypeListenClose:
  259. return "listen-close"
  260. case EventTypeFileOpen:
  261. return "file-open"
  262. case EventTypeTCPRetransmit:
  263. return "tcp-retransmit"
  264. case EventTypeL7Request:
  265. return "l7-request"
  266. }
  267. return "unknown: " + strconv.Itoa(int(t))
  268. }
  269. func (t EventReason) String() string {
  270. switch t {
  271. case EventReasonNone:
  272. return "none"
  273. case EventReasonOOMKill:
  274. return "oom-kill"
  275. }
  276. return "unknown: " + strconv.Itoa(int(t))
  277. }
  278. type procEvent struct {
  279. Type EventType
  280. Pid uint32
  281. Reason uint32
  282. }
  283. type tcpEvent struct {
  284. Fd uint64
  285. Timestamp uint64
  286. Type EventType
  287. Pid uint32
  288. SPort uint16
  289. DPort uint16
  290. SAddr [16]byte
  291. DAddr [16]byte
  292. }
  293. type fileEvent struct {
  294. Type EventType
  295. Pid uint32
  296. Fd uint64
  297. }
  298. type l7Event struct {
  299. Fd uint64
  300. ConnectionTimestamp uint64
  301. Pid uint32
  302. Status uint32
  303. Duration uint64
  304. Protocol uint8
  305. Method uint8
  306. Padding uint16
  307. StatementId uint32
  308. PayloadSize uint64
  309. TraceId uint64
  310. TraceStart uint32
  311. TraceEnd uint32
  312. }
  313. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  314. for {
  315. rec, err := r.Read()
  316. if err != nil {
  317. if errors.Is(err, perf.ErrClosed) {
  318. break
  319. }
  320. continue
  321. }
  322. if rec.LostSamples > 0 {
  323. klog.Errorln(name, "lost samples:", rec.LostSamples)
  324. continue
  325. }
  326. var event Event
  327. switch typ {
  328. case perfMapTypeL7Events:
  329. v := &l7Event{}
  330. reader := bytes.NewBuffer(rec.RawSample)
  331. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  332. klog.Warningln("failed to read msg:", err)
  333. continue
  334. }
  335. payload := reader.Bytes()
  336. req := &l7.RequestData{
  337. Protocol: l7.Protocol(v.Protocol),
  338. Status: l7.Status(v.Status),
  339. Duration: time.Duration(v.Duration),
  340. Method: l7.Method(v.Method),
  341. StatementId: v.StatementId,
  342. TraceId: v.TraceId,
  343. TraceStart: v.TraceStart,
  344. TraceEnd: v.TraceEnd,
  345. }
  346. switch {
  347. case v.PayloadSize == 0:
  348. case v.PayloadSize > MaxPayloadSize:
  349. req.Payload = payload[:MaxPayloadSize]
  350. default:
  351. req.Payload = payload[:v.PayloadSize]
  352. }
  353. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  354. case perfMapTypeFileEvents:
  355. v := &fileEvent{}
  356. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  357. klog.Warningln("failed to read msg:", err)
  358. continue
  359. }
  360. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  361. case perfMapTypeProcEvents:
  362. v := &procEvent{}
  363. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  364. klog.Warningln("failed to read msg:", err)
  365. continue
  366. }
  367. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  368. case perfMapTypeTCPEvents:
  369. v := &tcpEvent{}
  370. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  371. klog.Warningln("failed to read msg:", err)
  372. continue
  373. }
  374. event = Event{
  375. Type: v.Type,
  376. Pid: v.Pid,
  377. SrcAddr: ipPort(v.SAddr, v.SPort),
  378. DstAddr: ipPort(v.DAddr, v.DPort),
  379. Fd: v.Fd,
  380. Timestamp: v.Timestamp,
  381. }
  382. default:
  383. continue
  384. }
  385. ch <- event
  386. }
  387. }
  388. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  389. i, _ := netaddr.FromStdIP(ip[:])
  390. return netaddr.IPPortFrom(i, port)
  391. }