tracer.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "runtime"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/cilium/ebpf"
  13. "github.com/cilium/ebpf/link"
  14. "github.com/cilium/ebpf/perf"
  15. "github.com/coroot/coroot-node-agent/common"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/proc"
  18. "golang.org/x/mod/semver"
  19. "golang.org/x/sys/unix"
  20. "inet.af/netaddr"
  21. "k8s.io/klog/v2"
  22. )
  23. const MaxPayloadSize = 1024
  24. type EventType uint32
  25. type EventReason uint32
  26. const (
  27. EventTypeProcessStart EventType = 1
  28. EventTypeProcessExit EventType = 2
  29. EventTypeConnectionOpen EventType = 3
  30. EventTypeConnectionClose EventType = 4
  31. EventTypeConnectionError EventType = 5
  32. EventTypeListenOpen EventType = 6
  33. EventTypeListenClose EventType = 7
  34. EventTypeFileOpen EventType = 8
  35. EventTypeTCPRetransmit EventType = 9
  36. EventTypeL7Request EventType = 10
  37. EventReasonNone EventReason = 0
  38. EventReasonOOMKill EventReason = 1
  39. )
  40. type Event struct {
  41. Type EventType
  42. Reason EventReason
  43. Pid uint32
  44. SrcAddr netaddr.IPPort
  45. DstAddr netaddr.IPPort
  46. Fd uint64
  47. Timestamp uint64
  48. L7Request *l7.RequestData
  49. }
  50. type perfMapType uint8
  51. const (
  52. perfMapTypeProcEvents perfMapType = 1
  53. perfMapTypeTCPEvents perfMapType = 2
  54. perfMapTypeFileEvents perfMapType = 3
  55. perfMapTypeL7Events perfMapType = 4
  56. )
  57. type Tracer struct {
  58. kernelVersion string
  59. disableL7Tracing bool
  60. collection *ebpf.Collection
  61. readers map[string]*perf.Reader
  62. links []link.Link
  63. uprobes map[string]*ebpf.Program
  64. }
  65. func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
  66. if disableL7Tracing {
  67. klog.Infoln("L7 tracing is disabled")
  68. }
  69. return &Tracer{
  70. kernelVersion: kernelVersion,
  71. disableL7Tracing: disableL7Tracing,
  72. readers: map[string]*perf.Reader{},
  73. uprobes: map[string]*ebpf.Program{},
  74. }
  75. }
  76. func (t *Tracer) Run(events chan<- Event) error {
  77. if err := t.ebpf(events); err != nil {
  78. return err
  79. }
  80. if err := t.init(events); err != nil {
  81. return err
  82. }
  83. return nil
  84. }
  85. func (t *Tracer) Close() {
  86. for _, p := range t.uprobes {
  87. _ = p.Close()
  88. }
  89. for _, l := range t.links {
  90. _ = l.Close()
  91. }
  92. for _, r := range t.readers {
  93. _ = r.Close()
  94. }
  95. t.collection.Close()
  96. }
  97. func (t *Tracer) init(ch chan<- Event) error {
  98. pids, err := proc.ListPids()
  99. if err != nil {
  100. return fmt.Errorf("failed to list pids: %w", err)
  101. }
  102. for _, pid := range pids {
  103. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  104. }
  105. fds, sockets := readFds(pids)
  106. for _, fd := range fds {
  107. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  108. }
  109. listens := map[uint64]bool{}
  110. for _, s := range sockets {
  111. if s.Listen {
  112. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  113. }
  114. }
  115. for _, s := range sockets {
  116. typ := EventTypeConnectionOpen
  117. if s.Listen {
  118. typ = EventTypeListenOpen
  119. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  120. continue
  121. }
  122. ch <- Event{
  123. Type: typ,
  124. Pid: s.pid,
  125. Fd: s.fd,
  126. SrcAddr: s.SAddr,
  127. DstAddr: s.DAddr,
  128. }
  129. }
  130. return nil
  131. }
  132. type perfMap struct {
  133. name string
  134. perCPUBufferSizePages int
  135. typ perfMapType
  136. }
  137. func (t *Tracer) ebpf(ch chan<- Event) error {
  138. if _, ok := ebpfProg[runtime.GOARCH]; !ok {
  139. return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
  140. }
  141. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  142. var prg []byte
  143. for _, p := range ebpfProg[runtime.GOARCH] {
  144. if semver.Compare(kv, p.v) >= 0 {
  145. prg = p.p
  146. break
  147. }
  148. }
  149. if len(prg) == 0 {
  150. return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
  151. }
  152. _, debugFsErr := os.Stat("/sys/kernel/debug/tracing")
  153. _, traceFsErr := os.Stat("/sys/kernel/tracing")
  154. if debugFsErr != nil && traceFsErr != nil {
  155. return fmt.Errorf("kernel tracing is not available: debugfs or tracefs must be mounted")
  156. }
  157. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  158. if err != nil {
  159. return fmt.Errorf("failed to load collection spec: %w", err)
  160. }
  161. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  162. c, err := ebpf.NewCollectionWithOptions(collectionSpec, ebpf.CollectionOptions{
  163. //Programs: ebpf.ProgramOptions{LogLevel: 2, LogSize: 20 * 1024 * 1024},
  164. })
  165. if err != nil {
  166. var verr *ebpf.VerifierError
  167. if errors.As(err, &verr) {
  168. klog.Errorf("%+v", verr)
  169. }
  170. return fmt.Errorf("failed to load collection: %w", err)
  171. }
  172. t.collection = c
  173. perfMaps := []perfMap{
  174. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  175. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  176. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  177. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  178. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  179. }
  180. if !t.disableL7Tracing {
  181. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  182. }
  183. for _, pm := range perfMaps {
  184. r, err := perf.NewReader(t.collection.Maps[pm.name], pm.perCPUBufferSizePages*os.Getpagesize())
  185. if err != nil {
  186. t.Close()
  187. return fmt.Errorf("failed to create ebpf reader: %w", err)
  188. }
  189. t.readers[pm.name] = r
  190. go runEventsReader(pm.name, r, ch, pm.typ)
  191. }
  192. for _, programSpec := range collectionSpec.Programs {
  193. program := t.collection.Programs[programSpec.Name]
  194. if t.disableL7Tracing {
  195. switch programSpec.Name {
  196. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
  197. continue
  198. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  199. continue
  200. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  201. continue
  202. }
  203. }
  204. var l link.Link
  205. switch programSpec.Type {
  206. case ebpf.TracePoint:
  207. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  208. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  209. case ebpf.Kprobe:
  210. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  211. t.uprobes[programSpec.Name] = program
  212. continue
  213. }
  214. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  215. }
  216. if err != nil {
  217. t.Close()
  218. return fmt.Errorf("failed to link program: %w", err)
  219. }
  220. t.links = append(t.links, l)
  221. }
  222. return nil
  223. }
  224. func (t EventType) String() string {
  225. switch t {
  226. case EventTypeProcessStart:
  227. return "process-start"
  228. case EventTypeProcessExit:
  229. return "process-exit"
  230. case EventTypeConnectionOpen:
  231. return "connection-open"
  232. case EventTypeConnectionClose:
  233. return "connection-close"
  234. case EventTypeConnectionError:
  235. return "connection-error"
  236. case EventTypeListenOpen:
  237. return "listen-open"
  238. case EventTypeListenClose:
  239. return "listen-close"
  240. case EventTypeFileOpen:
  241. return "file-open"
  242. case EventTypeTCPRetransmit:
  243. return "tcp-retransmit"
  244. case EventTypeL7Request:
  245. return "l7-request"
  246. }
  247. return "unknown: " + strconv.Itoa(int(t))
  248. }
  249. func (t EventReason) String() string {
  250. switch t {
  251. case EventReasonNone:
  252. return "none"
  253. case EventReasonOOMKill:
  254. return "oom-kill"
  255. }
  256. return "unknown: " + strconv.Itoa(int(t))
  257. }
  258. type procEvent struct {
  259. Type EventType
  260. Pid uint32
  261. Reason uint32
  262. }
  263. type tcpEvent struct {
  264. Fd uint64
  265. Timestamp uint64
  266. Type EventType
  267. Pid uint32
  268. SPort uint16
  269. DPort uint16
  270. SAddr [16]byte
  271. DAddr [16]byte
  272. }
  273. type fileEvent struct {
  274. Type EventType
  275. Pid uint32
  276. Fd uint64
  277. }
  278. type l7Event struct {
  279. Fd uint64
  280. ConnectionTimestamp uint64
  281. Pid uint32
  282. Status uint32
  283. Duration uint64
  284. Protocol uint8
  285. Method uint8
  286. Padding uint16
  287. StatementId uint32
  288. PayloadSize uint64
  289. }
  290. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  291. for {
  292. rec, err := r.Read()
  293. if err != nil {
  294. if errors.Is(err, perf.ErrClosed) {
  295. break
  296. }
  297. continue
  298. }
  299. if rec.LostSamples > 0 {
  300. klog.Errorln(name, "lost samples:", rec.LostSamples)
  301. continue
  302. }
  303. var event Event
  304. switch typ {
  305. case perfMapTypeL7Events:
  306. v := &l7Event{}
  307. reader := bytes.NewBuffer(rec.RawSample)
  308. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  309. klog.Warningln("failed to read msg:", err)
  310. continue
  311. }
  312. payload := reader.Bytes()
  313. req := &l7.RequestData{
  314. Protocol: l7.Protocol(v.Protocol),
  315. Status: l7.Status(v.Status),
  316. Duration: time.Duration(v.Duration),
  317. Method: l7.Method(v.Method),
  318. StatementId: v.StatementId,
  319. }
  320. switch {
  321. case v.PayloadSize == 0:
  322. case v.PayloadSize > MaxPayloadSize:
  323. req.Payload = payload[:MaxPayloadSize]
  324. default:
  325. req.Payload = payload[:v.PayloadSize]
  326. }
  327. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  328. case perfMapTypeFileEvents:
  329. v := &fileEvent{}
  330. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  331. klog.Warningln("failed to read msg:", err)
  332. continue
  333. }
  334. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  335. case perfMapTypeProcEvents:
  336. v := &procEvent{}
  337. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  338. klog.Warningln("failed to read msg:", err)
  339. continue
  340. }
  341. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  342. case perfMapTypeTCPEvents:
  343. v := &tcpEvent{}
  344. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  345. klog.Warningln("failed to read msg:", err)
  346. continue
  347. }
  348. event = Event{
  349. Type: v.Type,
  350. Pid: v.Pid,
  351. SrcAddr: ipPort(v.SAddr, v.SPort),
  352. DstAddr: ipPort(v.DAddr, v.DPort),
  353. Fd: v.Fd,
  354. Timestamp: v.Timestamp,
  355. }
  356. default:
  357. continue
  358. }
  359. ch <- event
  360. }
  361. }
  362. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  363. i, _ := netaddr.FromStdIP(ip[:])
  364. return netaddr.IPPortFrom(i, port)
  365. }