tracer.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "runtime"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/cilium/ebpf"
  13. "github.com/cilium/ebpf/link"
  14. "github.com/cilium/ebpf/perf"
  15. "github.com/coroot/coroot-node-agent/common"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/proc"
  18. "golang.org/x/mod/semver"
  19. "golang.org/x/sys/unix"
  20. "inet.af/netaddr"
  21. "k8s.io/klog/v2"
  22. )
  23. const MaxPayloadSize = 1024
  24. type EventType uint32
  25. type EventReason uint32
  26. const (
  27. EventTypeProcessStart EventType = 1
  28. EventTypeProcessExit EventType = 2
  29. EventTypeConnectionOpen EventType = 3
  30. EventTypeConnectionClose EventType = 4
  31. EventTypeConnectionError EventType = 5
  32. EventTypeListenOpen EventType = 6
  33. EventTypeListenClose EventType = 7
  34. EventTypeFileOpen EventType = 8
  35. EventTypeTCPRetransmit EventType = 9
  36. EventTypeL7Request EventType = 10
  37. EventReasonNone EventReason = 0
  38. EventReasonOOMKill EventReason = 1
  39. )
  40. type Event struct {
  41. Type EventType
  42. Reason EventReason
  43. Pid uint32
  44. SrcAddr netaddr.IPPort
  45. DstAddr netaddr.IPPort
  46. Fd uint64
  47. Timestamp uint64
  48. L7Request *l7.RequestData
  49. }
  50. type perfMapType uint8
  51. const (
  52. perfMapTypeProcEvents perfMapType = 1
  53. perfMapTypeTCPEvents perfMapType = 2
  54. perfMapTypeFileEvents perfMapType = 3
  55. perfMapTypeL7Events perfMapType = 4
  56. )
  57. type Tracer struct {
  58. kernelVersion string
  59. disableL7Tracing bool
  60. collection *ebpf.Collection
  61. readers map[string]*perf.Reader
  62. links []link.Link
  63. uprobes map[string]*ebpf.Program
  64. }
  65. func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
  66. if disableL7Tracing {
  67. klog.Infoln("L7 tracing is disabled")
  68. }
  69. return &Tracer{
  70. kernelVersion: kernelVersion,
  71. disableL7Tracing: disableL7Tracing,
  72. readers: map[string]*perf.Reader{},
  73. uprobes: map[string]*ebpf.Program{},
  74. }
  75. }
  76. func (t *Tracer) Run(events chan<- Event) error {
  77. if err := t.ebpf(events); err != nil {
  78. return err
  79. }
  80. if err := t.init(events); err != nil {
  81. return err
  82. }
  83. return nil
  84. }
  85. func (t *Tracer) Close() {
  86. for _, p := range t.uprobes {
  87. _ = p.Close()
  88. }
  89. for _, l := range t.links {
  90. _ = l.Close()
  91. }
  92. for _, r := range t.readers {
  93. _ = r.Close()
  94. }
  95. t.collection.Close()
  96. }
  97. func (t *Tracer) init(ch chan<- Event) error {
  98. pids, err := proc.ListPids()
  99. if err != nil {
  100. return fmt.Errorf("failed to list pids: %w", err)
  101. }
  102. for _, pid := range pids {
  103. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  104. }
  105. fds, sockets := readFds(pids)
  106. for _, fd := range fds {
  107. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  108. }
  109. listens := map[uint64]bool{}
  110. for _, s := range sockets {
  111. if s.Listen {
  112. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  113. }
  114. }
  115. for _, s := range sockets {
  116. typ := EventTypeConnectionOpen
  117. if s.Listen {
  118. typ = EventTypeListenOpen
  119. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  120. continue
  121. }
  122. ch <- Event{
  123. Type: typ,
  124. Pid: s.pid,
  125. Fd: s.fd,
  126. SrcAddr: s.SAddr,
  127. DstAddr: s.DAddr,
  128. }
  129. }
  130. return nil
  131. }
  132. type perfMap struct {
  133. name string
  134. perCPUBufferSizePages int
  135. typ perfMapType
  136. }
  137. func (t *Tracer) ebpf(ch chan<- Event) error {
  138. if _, ok := ebpfProg[runtime.GOARCH]; !ok {
  139. return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
  140. }
  141. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  142. var prg []byte
  143. for _, p := range ebpfProg[runtime.GOARCH] {
  144. if semver.Compare(kv, p.v) >= 0 {
  145. prg = p.p
  146. break
  147. }
  148. }
  149. if len(prg) == 0 {
  150. return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
  151. }
  152. if _, err := os.Stat("/sys/kernel/debug/tracing"); err != nil {
  153. return fmt.Errorf("kernel tracing is not available: %w", err)
  154. }
  155. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  156. if err != nil {
  157. return fmt.Errorf("failed to load collection spec: %w", err)
  158. }
  159. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  160. c, err := ebpf.NewCollectionWithOptions(collectionSpec, ebpf.CollectionOptions{
  161. //Programs: ebpf.ProgramOptions{LogLevel: 2, LogSize: 20 * 1024 * 1024},
  162. })
  163. if err != nil {
  164. var verr *ebpf.VerifierError
  165. if errors.As(err, &verr) {
  166. klog.Errorf("%+v", verr)
  167. }
  168. return fmt.Errorf("failed to load collection: %w", err)
  169. }
  170. t.collection = c
  171. perfMaps := []perfMap{
  172. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  173. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  174. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  175. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  176. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  177. }
  178. if !t.disableL7Tracing {
  179. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  180. }
  181. for _, pm := range perfMaps {
  182. r, err := perf.NewReader(t.collection.Maps[pm.name], pm.perCPUBufferSizePages*os.Getpagesize())
  183. if err != nil {
  184. t.Close()
  185. return fmt.Errorf("failed to create ebpf reader: %w", err)
  186. }
  187. t.readers[pm.name] = r
  188. go runEventsReader(pm.name, r, ch, pm.typ)
  189. }
  190. for _, programSpec := range collectionSpec.Programs {
  191. program := t.collection.Programs[programSpec.Name]
  192. if t.disableL7Tracing {
  193. switch programSpec.Name {
  194. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
  195. continue
  196. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  197. continue
  198. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  199. continue
  200. }
  201. }
  202. var l link.Link
  203. switch programSpec.Type {
  204. case ebpf.TracePoint:
  205. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  206. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  207. case ebpf.Kprobe:
  208. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  209. t.uprobes[programSpec.Name] = program
  210. continue
  211. }
  212. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  213. }
  214. if err != nil {
  215. t.Close()
  216. return fmt.Errorf("failed to link program: %w", err)
  217. }
  218. t.links = append(t.links, l)
  219. }
  220. return nil
  221. }
  222. func (t EventType) String() string {
  223. switch t {
  224. case EventTypeProcessStart:
  225. return "process-start"
  226. case EventTypeProcessExit:
  227. return "process-exit"
  228. case EventTypeConnectionOpen:
  229. return "connection-open"
  230. case EventTypeConnectionClose:
  231. return "connection-close"
  232. case EventTypeConnectionError:
  233. return "connection-error"
  234. case EventTypeListenOpen:
  235. return "listen-open"
  236. case EventTypeListenClose:
  237. return "listen-close"
  238. case EventTypeFileOpen:
  239. return "file-open"
  240. case EventTypeTCPRetransmit:
  241. return "tcp-retransmit"
  242. case EventTypeL7Request:
  243. return "l7-request"
  244. }
  245. return "unknown: " + strconv.Itoa(int(t))
  246. }
  247. func (t EventReason) String() string {
  248. switch t {
  249. case EventReasonNone:
  250. return "none"
  251. case EventReasonOOMKill:
  252. return "oom-kill"
  253. }
  254. return "unknown: " + strconv.Itoa(int(t))
  255. }
  256. type procEvent struct {
  257. Type EventType
  258. Pid uint32
  259. Reason uint32
  260. }
  261. type tcpEvent struct {
  262. Fd uint64
  263. Timestamp uint64
  264. Type EventType
  265. Pid uint32
  266. SPort uint16
  267. DPort uint16
  268. SAddr [16]byte
  269. DAddr [16]byte
  270. }
  271. type fileEvent struct {
  272. Type EventType
  273. Pid uint32
  274. Fd uint64
  275. }
  276. type l7Event struct {
  277. Fd uint64
  278. ConnectionTimestamp uint64
  279. Pid uint32
  280. Status uint32
  281. Duration uint64
  282. Protocol uint8
  283. Method uint8
  284. Padding uint16
  285. StatementId uint32
  286. PayloadSize uint64
  287. }
  288. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  289. for {
  290. rec, err := r.Read()
  291. if err != nil {
  292. if errors.Is(err, perf.ErrClosed) {
  293. break
  294. }
  295. continue
  296. }
  297. if rec.LostSamples > 0 {
  298. klog.Errorln(name, "lost samples:", rec.LostSamples)
  299. continue
  300. }
  301. var event Event
  302. switch typ {
  303. case perfMapTypeL7Events:
  304. v := &l7Event{}
  305. reader := bytes.NewBuffer(rec.RawSample)
  306. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  307. klog.Warningln("failed to read msg:", err)
  308. continue
  309. }
  310. payload := reader.Bytes()
  311. req := &l7.RequestData{
  312. Protocol: l7.Protocol(v.Protocol),
  313. Status: l7.Status(v.Status),
  314. Duration: time.Duration(v.Duration),
  315. Method: l7.Method(v.Method),
  316. StatementId: v.StatementId,
  317. }
  318. switch {
  319. case v.PayloadSize == 0:
  320. case v.PayloadSize > MaxPayloadSize:
  321. req.Payload = payload[:MaxPayloadSize]
  322. default:
  323. req.Payload = payload[:v.PayloadSize]
  324. }
  325. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  326. case perfMapTypeFileEvents:
  327. v := &fileEvent{}
  328. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  329. klog.Warningln("failed to read msg:", err)
  330. continue
  331. }
  332. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  333. case perfMapTypeProcEvents:
  334. v := &procEvent{}
  335. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  336. klog.Warningln("failed to read msg:", err)
  337. continue
  338. }
  339. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  340. case perfMapTypeTCPEvents:
  341. v := &tcpEvent{}
  342. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  343. klog.Warningln("failed to read msg:", err)
  344. continue
  345. }
  346. event = Event{
  347. Type: v.Type,
  348. Pid: v.Pid,
  349. SrcAddr: ipPort(v.SAddr, v.SPort),
  350. DstAddr: ipPort(v.DAddr, v.DPort),
  351. Fd: v.Fd,
  352. Timestamp: v.Timestamp,
  353. }
  354. default:
  355. continue
  356. }
  357. ch <- event
  358. }
  359. }
  360. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  361. i, _ := netaddr.FromStdIP(ip[:])
  362. return netaddr.IPPortFrom(i, port)
  363. }