tracer.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "github.com/cilium/ebpf"
  8. "github.com/cilium/ebpf/link"
  9. "github.com/cilium/ebpf/perf"
  10. "github.com/coroot/coroot-node-agent/common"
  11. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  12. "github.com/coroot/coroot-node-agent/proc"
  13. "golang.org/x/mod/semver"
  14. "golang.org/x/sys/unix"
  15. "inet.af/netaddr"
  16. "k8s.io/klog/v2"
  17. "os"
  18. "runtime"
  19. "strconv"
  20. "strings"
  21. "time"
  22. )
  23. const MaxPayloadSize = 1024
  24. type EventType uint32
  25. type EventReason uint32
  26. const (
  27. EventTypeProcessStart EventType = 1
  28. EventTypeProcessExit EventType = 2
  29. EventTypeConnectionOpen EventType = 3
  30. EventTypeConnectionClose EventType = 4
  31. EventTypeConnectionError EventType = 5
  32. EventTypeListenOpen EventType = 6
  33. EventTypeListenClose EventType = 7
  34. EventTypeFileOpen EventType = 8
  35. EventTypeTCPRetransmit EventType = 9
  36. EventTypeL7Request EventType = 10
  37. EventReasonNone EventReason = 0
  38. EventReasonOOMKill EventReason = 1
  39. )
  40. type Event struct {
  41. Type EventType
  42. Reason EventReason
  43. Pid uint32
  44. SrcAddr netaddr.IPPort
  45. DstAddr netaddr.IPPort
  46. Fd uint64
  47. Timestamp uint64
  48. L7Request *l7.RequestData
  49. }
  50. type Tracer struct {
  51. kernelVersion string
  52. disableL7Tracing bool
  53. collection *ebpf.Collection
  54. readers map[string]*perf.Reader
  55. links []link.Link
  56. uprobes map[string]*ebpf.Program
  57. }
  58. func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
  59. if disableL7Tracing {
  60. klog.Infoln("L7 tracing is disabled")
  61. }
  62. return &Tracer{
  63. kernelVersion: kernelVersion,
  64. disableL7Tracing: disableL7Tracing,
  65. readers: map[string]*perf.Reader{},
  66. uprobes: map[string]*ebpf.Program{},
  67. }
  68. }
  69. func (t *Tracer) Run(events chan<- Event) error {
  70. if err := t.ebpf(events); err != nil {
  71. return err
  72. }
  73. if err := t.init(events); err != nil {
  74. return err
  75. }
  76. return nil
  77. }
  78. func (t *Tracer) Close() {
  79. for _, p := range t.uprobes {
  80. _ = p.Close()
  81. }
  82. for _, l := range t.links {
  83. _ = l.Close()
  84. }
  85. for _, r := range t.readers {
  86. _ = r.Close()
  87. }
  88. t.collection.Close()
  89. }
  90. func (t *Tracer) init(ch chan<- Event) error {
  91. pids, err := proc.ListPids()
  92. if err != nil {
  93. return fmt.Errorf("failed to list pids: %w", err)
  94. }
  95. for _, pid := range pids {
  96. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  97. }
  98. fds, sockets := readFds(pids)
  99. for _, fd := range fds {
  100. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  101. }
  102. listens := map[uint64]bool{}
  103. for _, s := range sockets {
  104. if s.Listen {
  105. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  106. }
  107. }
  108. for _, s := range sockets {
  109. typ := EventTypeConnectionOpen
  110. if s.Listen {
  111. typ = EventTypeListenOpen
  112. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  113. continue
  114. }
  115. ch <- Event{
  116. Type: typ,
  117. Pid: s.pid,
  118. Fd: s.fd,
  119. SrcAddr: s.SAddr,
  120. DstAddr: s.DAddr,
  121. }
  122. }
  123. return nil
  124. }
  125. type perfMap struct {
  126. name string
  127. perCPUBufferSizePages int
  128. event rawEvent
  129. }
  130. func (t *Tracer) ebpf(ch chan<- Event) error {
  131. if _, ok := ebpfProg[runtime.GOARCH]; !ok {
  132. return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
  133. }
  134. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  135. var prg []byte
  136. for _, p := range ebpfProg[runtime.GOARCH] {
  137. if semver.Compare(kv, p.v) >= 0 {
  138. prg = p.p
  139. break
  140. }
  141. }
  142. if len(prg) == 0 {
  143. return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
  144. }
  145. if _, err := os.Stat("/sys/kernel/debug/tracing"); err != nil {
  146. return fmt.Errorf("kernel tracing is not available: %w", err)
  147. }
  148. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  149. if err != nil {
  150. return fmt.Errorf("failed to load collection spec: %w", err)
  151. }
  152. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  153. c, err := ebpf.NewCollectionWithOptions(collectionSpec, ebpf.CollectionOptions{
  154. //Programs: ebpf.ProgramOptions{LogLevel: 2, LogSize: 20 * 1024 * 1024},
  155. })
  156. if err != nil {
  157. var verr *ebpf.VerifierError
  158. if errors.As(err, &verr) {
  159. klog.Errorf("%+v", verr)
  160. }
  161. return fmt.Errorf("failed to load collection: %w", err)
  162. }
  163. t.collection = c
  164. perfMaps := []perfMap{
  165. {name: "proc_events", event: &procEvent{}, perCPUBufferSizePages: 4},
  166. {name: "tcp_listen_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
  167. {name: "tcp_connect_events", event: &tcpEvent{}, perCPUBufferSizePages: 8},
  168. {name: "tcp_retransmit_events", event: &tcpEvent{}, perCPUBufferSizePages: 4},
  169. {name: "file_events", event: &fileEvent{}, perCPUBufferSizePages: 4},
  170. }
  171. if !t.disableL7Tracing {
  172. perfMaps = append(perfMaps, perfMap{name: "l7_events", event: &l7Event{}, perCPUBufferSizePages: 32})
  173. }
  174. for _, pm := range perfMaps {
  175. r, err := perf.NewReader(t.collection.Maps[pm.name], pm.perCPUBufferSizePages*os.Getpagesize())
  176. if err != nil {
  177. t.Close()
  178. return fmt.Errorf("failed to create ebpf reader: %w", err)
  179. }
  180. t.readers[pm.name] = r
  181. go runEventsReader(pm.name, r, ch, pm.event)
  182. }
  183. for _, programSpec := range collectionSpec.Programs {
  184. program := t.collection.Programs[programSpec.Name]
  185. if t.disableL7Tracing {
  186. switch programSpec.Name {
  187. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
  188. continue
  189. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  190. continue
  191. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  192. continue
  193. }
  194. }
  195. var l link.Link
  196. switch programSpec.Type {
  197. case ebpf.TracePoint:
  198. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  199. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  200. case ebpf.Kprobe:
  201. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  202. t.uprobes[programSpec.Name] = program
  203. continue
  204. }
  205. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  206. }
  207. if err != nil {
  208. t.Close()
  209. return fmt.Errorf("failed to link program: %w", err)
  210. }
  211. t.links = append(t.links, l)
  212. }
  213. return nil
  214. }
  215. func (t EventType) String() string {
  216. switch t {
  217. case EventTypeProcessStart:
  218. return "process-start"
  219. case EventTypeProcessExit:
  220. return "process-exit"
  221. case EventTypeConnectionOpen:
  222. return "connection-open"
  223. case EventTypeConnectionClose:
  224. return "connection-close"
  225. case EventTypeConnectionError:
  226. return "connection-error"
  227. case EventTypeListenOpen:
  228. return "listen-open"
  229. case EventTypeListenClose:
  230. return "listen-close"
  231. case EventTypeFileOpen:
  232. return "file-open"
  233. case EventTypeTCPRetransmit:
  234. return "tcp-retransmit"
  235. case EventTypeL7Request:
  236. return "l7-request"
  237. }
  238. return "unknown: " + strconv.Itoa(int(t))
  239. }
  240. func (t EventReason) String() string {
  241. switch t {
  242. case EventReasonNone:
  243. return "none"
  244. case EventReasonOOMKill:
  245. return "oom-kill"
  246. }
  247. return "unknown: " + strconv.Itoa(int(t))
  248. }
  249. type rawEvent interface {
  250. Event() Event
  251. }
  252. type procEvent struct {
  253. Type uint32
  254. Pid uint32
  255. Reason uint32
  256. }
  257. func (e procEvent) Event() Event {
  258. return Event{Type: EventType(e.Type), Reason: EventReason(e.Reason), Pid: e.Pid}
  259. }
  260. type tcpEvent struct {
  261. Fd uint64
  262. Timestamp uint64
  263. Type uint32
  264. Pid uint32
  265. SPort uint16
  266. DPort uint16
  267. SAddr [16]byte
  268. DAddr [16]byte
  269. }
  270. func (e tcpEvent) Event() Event {
  271. return Event{
  272. Type: EventType(e.Type),
  273. Pid: e.Pid,
  274. SrcAddr: ipPort(e.SAddr, e.SPort),
  275. DstAddr: ipPort(e.DAddr, e.DPort),
  276. Fd: e.Fd,
  277. Timestamp: e.Timestamp,
  278. }
  279. }
  280. type fileEvent struct {
  281. Type uint32
  282. Pid uint32
  283. Fd uint64
  284. }
  285. func (e fileEvent) Event() Event {
  286. return Event{Type: EventType(e.Type), Pid: e.Pid, Fd: e.Fd}
  287. }
  288. type l7Event struct {
  289. Fd uint64
  290. ConnectionTimestamp uint64
  291. Pid uint32
  292. Status uint32
  293. Duration uint64
  294. Protocol uint8
  295. Method uint8
  296. Padding uint16
  297. StatementId uint32
  298. PayloadSize uint64
  299. Payload [MaxPayloadSize]byte
  300. }
  301. func (e l7Event) Event() Event {
  302. r := &l7.RequestData{
  303. Protocol: l7.Protocol(e.Protocol),
  304. Status: l7.Status(e.Status),
  305. Duration: time.Duration(e.Duration),
  306. Method: l7.Method(e.Method),
  307. StatementId: e.StatementId,
  308. }
  309. switch {
  310. case e.PayloadSize == 0:
  311. case e.PayloadSize > MaxPayloadSize:
  312. r.Payload = e.Payload[:MaxPayloadSize]
  313. default:
  314. r.Payload = e.Payload[:e.PayloadSize]
  315. }
  316. return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, Timestamp: e.ConnectionTimestamp, L7Request: r}
  317. }
  318. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
  319. for {
  320. rec, err := r.Read()
  321. if err != nil {
  322. if errors.Is(err, perf.ErrClosed) {
  323. break
  324. }
  325. continue
  326. }
  327. if rec.LostSamples > 0 {
  328. klog.Errorln(name, "lost samples:", rec.LostSamples)
  329. continue
  330. }
  331. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, e); err != nil {
  332. klog.Warningln("failed to read msg:", err)
  333. continue
  334. }
  335. ch <- e.Event()
  336. }
  337. }
  338. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  339. i, _ := netaddr.FromStdIP(ip[:])
  340. return netaddr.IPPortFrom(i, port)
  341. }