tracer.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package ebpftracer
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "os"
  8. "runtime"
  9. "strconv"
  10. "strings"
  11. "time"
  12. "github.com/cilium/ebpf"
  13. "github.com/cilium/ebpf/link"
  14. "github.com/cilium/ebpf/perf"
  15. "github.com/coroot/coroot-node-agent/common"
  16. "github.com/coroot/coroot-node-agent/ebpftracer/l7"
  17. "github.com/coroot/coroot-node-agent/proc"
  18. "golang.org/x/mod/semver"
  19. "golang.org/x/sys/unix"
  20. "inet.af/netaddr"
  21. "k8s.io/klog/v2"
  22. )
  23. const MaxPayloadSize = 1024
  24. type EventType uint32
  25. type EventReason uint32
  26. const (
  27. EventTypeProcessStart EventType = 1
  28. EventTypeProcessExit EventType = 2
  29. EventTypeConnectionOpen EventType = 3
  30. EventTypeConnectionClose EventType = 4
  31. EventTypeConnectionError EventType = 5
  32. EventTypeListenOpen EventType = 6
  33. EventTypeListenClose EventType = 7
  34. EventTypeFileOpen EventType = 8
  35. EventTypeTCPRetransmit EventType = 9
  36. EventTypeL7Request EventType = 10
  37. EventReasonNone EventReason = 0
  38. EventReasonOOMKill EventReason = 1
  39. )
  40. type Event struct {
  41. Type EventType
  42. Reason EventReason
  43. Pid uint32
  44. SrcAddr netaddr.IPPort
  45. DstAddr netaddr.IPPort
  46. Fd uint64
  47. Timestamp uint64
  48. L7Request *l7.RequestData
  49. }
  50. type perfMapType uint8
  51. const (
  52. perfMapTypeProcEvents perfMapType = 1
  53. perfMapTypeTCPEvents perfMapType = 2
  54. perfMapTypeFileEvents perfMapType = 3
  55. perfMapTypeL7Events perfMapType = 4
  56. )
  57. type Tracer struct {
  58. kernelVersion string
  59. disableL7Tracing bool
  60. collection *ebpf.Collection
  61. readers map[string]*perf.Reader
  62. links []link.Link
  63. uprobes map[string]*ebpf.Program
  64. }
  65. func NewTracer(kernelVersion string, disableL7Tracing bool) *Tracer {
  66. if disableL7Tracing {
  67. klog.Infoln("L7 tracing is disabled")
  68. }
  69. return &Tracer{
  70. kernelVersion: kernelVersion,
  71. disableL7Tracing: disableL7Tracing,
  72. readers: map[string]*perf.Reader{},
  73. uprobes: map[string]*ebpf.Program{},
  74. }
  75. }
  76. func (t *Tracer) Run(events chan<- Event) error {
  77. if err := t.ebpf(events); err != nil {
  78. return err
  79. }
  80. if err := t.init(events); err != nil {
  81. return err
  82. }
  83. return nil
  84. }
  85. func (t *Tracer) Close() {
  86. for _, p := range t.uprobes {
  87. _ = p.Close()
  88. }
  89. for _, l := range t.links {
  90. _ = l.Close()
  91. }
  92. for _, r := range t.readers {
  93. _ = r.Close()
  94. }
  95. t.collection.Close()
  96. }
  97. func (t *Tracer) init(ch chan<- Event) error {
  98. pids, err := proc.ListPids()
  99. if err != nil {
  100. return fmt.Errorf("failed to list pids: %w", err)
  101. }
  102. for _, pid := range pids {
  103. ch <- Event{Type: EventTypeProcessStart, Pid: pid}
  104. }
  105. fds, sockets := readFds(pids)
  106. for _, fd := range fds {
  107. ch <- Event{Type: EventTypeFileOpen, Pid: fd.pid, Fd: fd.fd}
  108. }
  109. listens := map[uint64]bool{}
  110. for _, s := range sockets {
  111. if s.Listen {
  112. listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] = true
  113. }
  114. }
  115. for _, s := range sockets {
  116. typ := EventTypeConnectionOpen
  117. if s.Listen {
  118. typ = EventTypeListenOpen
  119. } else if listens[uint64(s.pid)<<32|uint64(s.SAddr.Port())] || s.DAddr.Port() > s.SAddr.Port() { // inbound
  120. continue
  121. }
  122. ch <- Event{
  123. Type: typ,
  124. Pid: s.pid,
  125. Fd: s.fd,
  126. SrcAddr: s.SAddr,
  127. DstAddr: s.DAddr,
  128. }
  129. }
  130. return nil
  131. }
  132. type perfMap struct {
  133. name string
  134. perCPUBufferSizePages int
  135. typ perfMapType
  136. }
  137. func (t *Tracer) ebpf(ch chan<- Event) error {
  138. if _, ok := ebpfProg[runtime.GOARCH]; !ok {
  139. return fmt.Errorf("unsupported architecture: %s", runtime.GOARCH)
  140. }
  141. kv := "v" + common.KernelMajorMinor(t.kernelVersion)
  142. var prg []byte
  143. for _, p := range ebpfProg[runtime.GOARCH] {
  144. if semver.Compare(kv, p.v) >= 0 {
  145. prg = p.p
  146. break
  147. }
  148. }
  149. if len(prg) == 0 {
  150. return fmt.Errorf("unsupported kernel version: %s", t.kernelVersion)
  151. }
  152. if _, err := os.Stat("/sys/kernel/debug/tracing"); err != nil {
  153. return fmt.Errorf("kernel tracing is not available: %w", err)
  154. }
  155. collectionSpec, err := ebpf.LoadCollectionSpecFromReader(bytes.NewReader(prg))
  156. if err != nil {
  157. return fmt.Errorf("failed to load collection spec: %w", err)
  158. }
  159. _ = unix.Setrlimit(unix.RLIMIT_MEMLOCK, &unix.Rlimit{Cur: unix.RLIM_INFINITY, Max: unix.RLIM_INFINITY})
  160. c, err := ebpf.NewCollectionWithOptions(collectionSpec, ebpf.CollectionOptions{
  161. //Programs: ebpf.ProgramOptions{LogLevel: 2, LogSize: 20 * 1024 * 1024},
  162. })
  163. if err != nil {
  164. var verr *ebpf.VerifierError
  165. if errors.As(err, &verr) {
  166. klog.Errorf("%+v", verr)
  167. }
  168. return fmt.Errorf("failed to load collection: %w", err)
  169. }
  170. t.collection = c
  171. perfMaps := []perfMap{
  172. {name: "proc_events", typ: perfMapTypeProcEvents, perCPUBufferSizePages: 4},
  173. {name: "tcp_listen_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  174. {name: "tcp_connect_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 8},
  175. {name: "tcp_retransmit_events", typ: perfMapTypeTCPEvents, perCPUBufferSizePages: 4},
  176. {name: "file_events", typ: perfMapTypeFileEvents, perCPUBufferSizePages: 4},
  177. }
  178. if !t.disableL7Tracing {
  179. perfMaps = append(perfMaps, perfMap{name: "l7_events", typ: perfMapTypeL7Events, perCPUBufferSizePages: 32})
  180. }
  181. for _, pm := range perfMaps {
  182. r, err := perf.NewReader(t.collection.Maps[pm.name], pm.perCPUBufferSizePages*os.Getpagesize())
  183. if err != nil {
  184. t.Close()
  185. return fmt.Errorf("failed to create ebpf reader: %w", err)
  186. }
  187. t.readers[pm.name] = r
  188. // event监听
  189. go runEventsReader(pm.name, r, ch, pm.typ)
  190. }
  191. for _, programSpec := range collectionSpec.Programs {
  192. program := t.collection.Programs[programSpec.Name]
  193. fmt.Println(programSpec.Name, programSpec.SectionName, programSpec.Type)
  194. if t.disableL7Tracing {
  195. switch programSpec.Name {
  196. case "sys_enter_writev", "sys_enter_write", "sys_enter_sendto", "sys_enter_sendmsg":
  197. continue
  198. case "sys_enter_read", "sys_enter_readv", "sys_enter_recvfrom", "sys_enter_recvmsg":
  199. continue
  200. case "sys_exit_read", "sys_exit_readv", "sys_exit_recvfrom", "sys_exit_recvmsg":
  201. continue
  202. }
  203. }
  204. var l link.Link
  205. switch programSpec.Type {
  206. case ebpf.TracePoint:
  207. parts := strings.SplitN(programSpec.AttachTo, "/", 2)
  208. l, err = link.Tracepoint(parts[0], parts[1], program, nil)
  209. case ebpf.Kprobe:
  210. if strings.HasPrefix(programSpec.SectionName, "uprobe/") {
  211. t.uprobes[programSpec.Name] = program
  212. continue
  213. }
  214. l, err = link.Kprobe(programSpec.AttachTo, program, nil)
  215. }
  216. if err != nil {
  217. t.Close()
  218. return fmt.Errorf("failed to link program: %w", err)
  219. }
  220. t.links = append(t.links, l)
  221. }
  222. return nil
  223. }
  224. func (t EventType) String() string {
  225. switch t {
  226. case EventTypeProcessStart:
  227. return "process-start"
  228. case EventTypeProcessExit:
  229. return "process-exit"
  230. case EventTypeConnectionOpen:
  231. return "connection-open"
  232. case EventTypeConnectionClose:
  233. return "connection-close"
  234. case EventTypeConnectionError:
  235. return "connection-error"
  236. case EventTypeListenOpen:
  237. return "listen-open"
  238. case EventTypeListenClose:
  239. return "listen-close"
  240. case EventTypeFileOpen:
  241. return "file-open"
  242. case EventTypeTCPRetransmit:
  243. return "tcp-retransmit"
  244. case EventTypeL7Request:
  245. return "l7-request"
  246. }
  247. return "unknown: " + strconv.Itoa(int(t))
  248. }
  249. func (t EventReason) String() string {
  250. switch t {
  251. case EventReasonNone:
  252. return "none"
  253. case EventReasonOOMKill:
  254. return "oom-kill"
  255. }
  256. return "unknown: " + strconv.Itoa(int(t))
  257. }
  258. type procEvent struct {
  259. Type EventType
  260. Pid uint32
  261. Reason uint32
  262. }
  263. type tcpEvent struct {
  264. Fd uint64
  265. Timestamp uint64
  266. Type EventType
  267. Pid uint32
  268. SPort uint16
  269. DPort uint16
  270. SAddr [16]byte
  271. DAddr [16]byte
  272. }
  273. type fileEvent struct {
  274. Type EventType
  275. Pid uint32
  276. Fd uint64
  277. }
  278. type l7Event struct {
  279. Fd uint64
  280. ConnectionTimestamp uint64
  281. Pid uint32
  282. Status uint32
  283. Duration uint64
  284. Protocol uint8
  285. Method uint8
  286. Padding uint16
  287. StatementId uint32
  288. PayloadSize uint64
  289. TraceId uint64
  290. TraceStart uint32
  291. TraceEnd uint32
  292. }
  293. func runEventsReader(name string, r *perf.Reader, ch chan<- Event, typ perfMapType) {
  294. for {
  295. rec, err := r.Read()
  296. if err != nil {
  297. if errors.Is(err, perf.ErrClosed) {
  298. break
  299. }
  300. continue
  301. }
  302. if rec.LostSamples > 0 {
  303. klog.Errorln(name, "lost samples:", rec.LostSamples)
  304. continue
  305. }
  306. var event Event
  307. switch typ {
  308. case perfMapTypeL7Events:
  309. v := &l7Event{}
  310. reader := bytes.NewBuffer(rec.RawSample)
  311. if err := binary.Read(reader, binary.LittleEndian, v); err != nil {
  312. klog.Warningln("failed to read msg:", err)
  313. continue
  314. }
  315. payload := reader.Bytes()
  316. req := &l7.RequestData{
  317. Protocol: l7.Protocol(v.Protocol),
  318. Status: l7.Status(v.Status),
  319. Duration: time.Duration(v.Duration),
  320. Method: l7.Method(v.Method),
  321. StatementId: v.StatementId,
  322. TraceId: v.TraceId,
  323. TraceStart: v.TraceStart,
  324. TraceEnd: v.TraceEnd,
  325. }
  326. switch {
  327. case v.PayloadSize == 0:
  328. case v.PayloadSize > MaxPayloadSize:
  329. req.Payload = payload[:MaxPayloadSize]
  330. default:
  331. req.Payload = payload[:v.PayloadSize]
  332. }
  333. event = Event{Type: EventTypeL7Request, Pid: v.Pid, Fd: v.Fd, Timestamp: v.ConnectionTimestamp, L7Request: req}
  334. case perfMapTypeFileEvents:
  335. v := &fileEvent{}
  336. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  337. klog.Warningln("failed to read msg:", err)
  338. continue
  339. }
  340. event = Event{Type: v.Type, Pid: v.Pid, Fd: v.Fd}
  341. case perfMapTypeProcEvents:
  342. v := &procEvent{}
  343. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  344. klog.Warningln("failed to read msg:", err)
  345. continue
  346. }
  347. event = Event{Type: v.Type, Reason: EventReason(v.Reason), Pid: v.Pid}
  348. case perfMapTypeTCPEvents:
  349. v := &tcpEvent{}
  350. if err := binary.Read(bytes.NewBuffer(rec.RawSample), binary.LittleEndian, v); err != nil {
  351. klog.Warningln("failed to read msg:", err)
  352. continue
  353. }
  354. event = Event{
  355. Type: v.Type,
  356. Pid: v.Pid,
  357. SrcAddr: ipPort(v.SAddr, v.SPort),
  358. DstAddr: ipPort(v.DAddr, v.DPort),
  359. Fd: v.Fd,
  360. Timestamp: v.Timestamp,
  361. }
  362. default:
  363. continue
  364. }
  365. ch <- event
  366. }
  367. }
  368. func ipPort(ip [16]byte, port uint16) netaddr.IPPort {
  369. i, _ := netaddr.FromStdIP(ip[:])
  370. return netaddr.IPPortFrom(i, port)
  371. }