tracer.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "github.com/cilium/ebpf"
  7. "github.com/cilium/ebpf/link"
  8. "github.com/cilium/ebpf/perf"
  9. "github.com/coroot/coroot-node-agent/common"
  10. "github.com/coroot/coroot-node-agent/proc"
  11. "golang.org/x/mod/semver"
  12. "golang.org/x/sys/unix"
  13. "inet.af/netaddr"
  14. "k8s.io/klog/v2"
  15. "os"
  16. "runtime"
  17. "strconv"
  18. "strings"
  19. "time"
  20. )
  21. type EventType uint32
  22. type EventReason uint32
  23. const (
  24. EventTypeProcessStart EventType = 1
  25. EventTypeProcessExit EventType = 2
  26. EventTypeConnectionOpen EventType = 3
  27. EventTypeConnectionClose EventType = 4
  28. EventTypeConnectionError EventType = 5
  29. EventTypeListenOpen EventType = 6
  30. EventTypeListenClose EventType = 7
  31. EventTypeFileOpen EventType = 8
  32. EventTypeTCPRetransmit EventType = 9
  33. EventTypeL7Request EventType = 10
  34. EventReasonNone EventReason = 0
  35. EventReasonOOMKill EventReason = 1
  36. )
  37. type L7Protocol uint8
  38. const (
  39. L7ProtocolHTTP L7Protocol = 1
  40. L7ProtocolPostgres L7Protocol = 2
  41. )
  42. func (p L7Protocol) String() string {
  43. switch p {
  44. case L7ProtocolHTTP:
  45. return "HTTP"
  46. case L7ProtocolPostgres:
  47. return "Postgres"
  48. }
  49. return "UNKNOWN:" + strconv.Itoa(int(p))
  50. }
  51. type L7Request struct {
  52. Protocol L7Protocol
  53. Status int
  54. Duration time.Duration
  55. }
  56. type Event struct {
  57. Type EventType
  58. Reason EventReason
  59. Pid uint32
  60. SrcAddr netaddr.IPPort
  61. DstAddr netaddr.IPPort
  62. Fd uint64
  63. L7Request *L7Request
  64. }
  65. type Tracer struct {
  66. collection *ebpf.Collection
  67. readers map[string]*perf.Reader
  68. links []link.Link
  69. }
  70. func NewTracer(events chan<- Event, kernelVersion string, disableL7Tracing bool) (*Tracer, error) {
  71. t := &Tracer{readers: map[string]*perf.Reader{}}
  72. if err := t.ebpf(events, kernelVersion, disableL7Tracing); err != nil {
  73. return nil, err
  74. }
  75. if disableL7Tracing {
  76. klog.Infoln("L7 tracing is disabled")
  77. }
  78. if err := t.init(events); err != nil {
  79. return nil, err
  80. }
  81. return t, nil
  82. }
  83. func (t *Tracer) Close() {
  84. for _, l := range t.links {
  85. l.Close()
  86. }
  87. for _, r := range t.readers {
  88. r.Close()
  89. }
  90. t.collection.Close()
  91. }
  92. func (t *Tracer) init(ch chan<- Event) error {
  93. pids, err := proc.ListPids()
  94. if err != nil {
  95. return fmt.Errorf("failed to list pids: %w", err)
  96. }
  97. for _, pid := range pids {
  98. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  99. }
  100. fds, sockets := readFds(pids)
  101. for _, fd := range fds {
  102. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  103. }
  104. listens := map[uint64]bool{}
  105. for _, s := range sockets {
  106. if s.Listen {
  107. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  108. }
  109. }
  110. for _, s := range sockets {
  111. typ := EventTypeConnectionOpen
  112. if s.Listen {
  113. typ = EventTypeListenOpen
  114. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  115. continue
  116. }
  117. ch <- Event{
  118. Type: typ,
  119. Pid: s.pid,
  120. Fd: s.fd,
  121. SrcAddr: s.SAddr,
  122. DstAddr: s.DAddr,
  123. }
  124. }
  125. return nil
  126. }
  127. func (t *Tracer) ebpf(ch chan<- Event, kernelVersion string, disableL7Tracing bool) error {
  128. kv := "v" + common.KernelMajorMinor(kernelVersion)
  129. var prg []byte
  130. for _, p := range ebpfProg {
  131. if semver.Compare(kv, p.v) >= 0 {
  132. prg = p.p
  133. break
  134. }
  135. }
  136. if len(prg) == 0 {
  137. return fmt.Errorf("unsupported kernel version: %s", kernelVersion)
  138. }
  139. if _, err := os.Stat("/sys/kernel/debug/tracing"); err != nil {
  140. return fmt.Errorf("kernel tracing is not available: %w", err)
  141. }
  142. spec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  143. if err != nil {
  144. return fmt.Errorf("failed to load spec: %w", err)
  145. }
  146. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  147. c, err := ebpf.NewCollection(spec)
  148. if err != nil {
  149. return fmt.Errorf("failed to load collection: %w", err)
  150. }
  151. t.collection = c
  152. events := map[string]rawEvent{
  153. "proc_events": &procEvent{},
  154. "tcp_listen_events": &tcpEvent{},
  155. "tcp_connect_events": &tcpEvent{},
  156. "tcp_retransmit_events": &tcpEvent{},
  157. "file_events": &fileEvent{},
  158. }
  159. if !disableL7Tracing {
  160. events["l7_events"] = &l7Event{}
  161. }
  162. for name, typ := range events {
  163. r, err := perf.NewReader(t.collection.Maps[name], os.Getpagesize())
  164. if err != nil {
  165. t.Close()
  166. return fmt.Errorf("failed to create ebpf reader: %w", err)
  167. }
  168. t.readers[name] = r
  169. go runEventsReader(name, r, ch, typ)
  170. }
  171. for name, spec := range spec.Programs {
  172. p := t.collection.Programs[name]
  173. if runtime.GOARCH == "arm64" && (spec.Name == "sys_enter_open" || spec.Name == "sys_exit_open") {
  174. continue
  175. }
  176. if disableL7Tracing {
  177. switch spec.Name {
  178. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto":
  179. continue
  180. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom":
  181. continue
  182. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom":
  183. continue
  184. }
  185. }
  186. var err error
  187. var l link.Link
  188. switch spec.Type {
  189. case ebpf.TracePoint:
  190. parts := strings.SplitN(spec.AttachTo, "/", 2)
  191. l, err = link.Tracepoint(parts[0], parts[1], p)
  192. case ebpf.Kprobe:
  193. l, err = link.Kprobe(spec.AttachTo, p)
  194. }
  195. if err != nil {
  196. t.Close()
  197. return fmt.Errorf("failed to link program: %w", err)
  198. }
  199. t.links = append(t.links, l)
  200. }
  201. return nil
  202. }
  203. func (t EventType) String() string {
  204. switch t {
  205. case EventTypeProcessStart:
  206. return "process-start"
  207. case EventTypeProcessExit:
  208. return "process-exit"
  209. case EventTypeConnectionOpen:
  210. return "connection-open"
  211. case EventTypeConnectionClose:
  212. return "connection-close"
  213. case EventTypeConnectionError:
  214. return "connection-error"
  215. case EventTypeListenOpen:
  216. return "listen-open"
  217. case EventTypeListenClose:
  218. return "listen-close"
  219. case EventTypeFileOpen:
  220. return "file-open"
  221. case EventTypeTCPRetransmit:
  222. return "tcp-retransmit"
  223. case EventTypeL7Request:
  224. return "l7-request"
  225. }
  226. return "unknown: " + strconv.Itoa(int(t))
  227. }
  228. func (t EventReason) String() string {
  229. switch t {
  230. case EventReasonNone:
  231. return "none"
  232. case EventReasonOOMKill:
  233. return "oom-kill"
  234. }
  235. return "unknown: " + strconv.Itoa(int(t))
  236. }
  237. type rawEvent interface {
  238. Event() Event
  239. }
  240. type procEvent struct {
  241. Type uint32
  242. Pid uint32
  243. Reason uint32
  244. }
  245. func (e procEvent) Event() Event {
  246. return Event{Type: EventType(e.Type), Reason: EventReason(e.Reason), Pid: e.Pid}
  247. }
  248. type tcpEvent struct {
  249. Fd uint64
  250. Type uint32
  251. Pid uint32
  252. SPort uint16
  253. DPort uint16
  254. SAddr [16]byte
  255. DAddr [16]byte
  256. }
  257. func (e tcpEvent) Event() Event {
  258. return Event{Type: EventType(e.Type), Pid: e.Pid, SrcAddr: ipPort(e.SAddr, e.SPort), DstAddr: ipPort(e.DAddr, e.DPort), Fd: e.Fd}
  259. }
  260. type fileEvent struct {
  261. Type uint32
  262. Pid uint32
  263. Fd uint64
  264. }
  265. func (e fileEvent) Event() Event {
  266. return Event{Type: EventType(e.Type), Pid: e.Pid, Fd: e.Fd}
  267. }
  268. type l7Event struct {
  269. Fd uint64
  270. Pid uint32
  271. Status uint32
  272. Duration uint64
  273. Protocol uint8
  274. }
  275. func (e l7Event) Event() Event {
  276. return Event{Type: EventTypeL7Request, Pid: e.Pid, Fd: e.Fd, L7Request: &L7Request{
  277. Protocol: L7Protocol(e.Protocol),
  278. Status: int(e.Status),
  279. Duration: time.Duration(e.Duration),
  280. }}
  281. }
  282. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, e rawEvent) {
  283. for {
  284. rec, err := r.Read()
  285. if err != nil {
  286. if perf.IsClosed(err) {
  287. break
  288. }
  289. continue
  290. }
  291. if rec.LostSamples > 0 {
  292. klog.Errorln(name, "lost samples:", rec.LostSamples)
  293. continue
  294. }
  295. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, e); err != nil {
  296. klog.Warningln("failed to read msg:", err)
  297. continue
  298. }
  299. ch <- e.Event()
  300. }
  301. }
  302. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  303. i, _ := netaddr.FromStdIP(ip[:])
  304. return netaddr.IPPortFrom(i, port)
  305. }