profiling.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. package profiling
  2. import (
  3. "bytes"
  4. "fmt"
  5. "hash/fnv"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "strconv"
  11. "sync"
  12. "time"
  13. "github.com/coroot/coroot-node-agent/common"
  14. "github.com/coroot/coroot-node-agent/containers"
  15. "github.com/go-kit/log"
  16. ebpfspy "github.com/grafana/pyroscope/ebpf"
  17. "github.com/grafana/pyroscope/ebpf/metrics"
  18. "github.com/grafana/pyroscope/ebpf/pprof"
  19. "github.com/grafana/pyroscope/ebpf/sd"
  20. "github.com/grafana/pyroscope/ebpf/symtab"
  21. "github.com/grafana/pyroscope/ebpf/symtab/elf"
  22. "github.com/prometheus/client_golang/prometheus"
  23. "github.com/prometheus/prometheus/model/labels"
  24. "k8s.io/klog/v2"
  25. )
  26. const (
  27. CollectInterval = time.Minute
  28. SampleRate = 100
  29. UploadTimeout = 10 * time.Second
  30. )
  31. var (
  32. httpClient = http.Client{
  33. Timeout: UploadTimeout,
  34. }
  35. endpointUrl *url.URL
  36. session ebpfspy.Session
  37. targetFinder = &TargetFinder{
  38. processes: map[uint32]*processInfo{},
  39. }
  40. )
  41. func Init() chan<- containers.ProcessInfo {
  42. endpoint := os.Getenv("PROFILES_ENDPOINT")
  43. if endpoint == "" {
  44. klog.Infoln("no profiles endpoint configured")
  45. return nil
  46. }
  47. klog.Infoln("profiles endpoint:", endpoint)
  48. var err error
  49. endpointUrl, err = url.Parse(endpoint)
  50. if err != nil {
  51. klog.Exitln(err)
  52. }
  53. reg := prometheus.NewRegistry()
  54. so := ebpfspy.SessionOptions{
  55. CollectUser: true,
  56. CollectKernel: true,
  57. UnknownSymbolModuleOffset: true,
  58. UnknownSymbolAddress: false,
  59. PythonEnabled: true,
  60. CacheOptions: symtab.CacheOptions{
  61. PidCacheOptions: symtab.GCacheOptions{
  62. Size: 256,
  63. KeepRounds: 8,
  64. },
  65. BuildIDCacheOptions: symtab.GCacheOptions{
  66. Size: 256,
  67. KeepRounds: 8,
  68. },
  69. SameFileCacheOptions: symtab.GCacheOptions{
  70. Size: 256,
  71. KeepRounds: 8,
  72. },
  73. SymbolOptions: symtab.SymbolOptions{
  74. GoTableFallback: true,
  75. PythonFullFilePath: false,
  76. DemangleOptions: elf.DemangleFull,
  77. },
  78. },
  79. Metrics: &metrics.Metrics{
  80. Symtab: metrics.NewSymtabMetrics(reg),
  81. Python: metrics.NewPythonMetrics(reg),
  82. },
  83. SampleRate: SampleRate,
  84. }
  85. session, err = ebpfspy.NewSession(log.NewNopLogger(), targetFinder, so)
  86. if err != nil {
  87. klog.Errorln(err)
  88. session = nil
  89. return nil
  90. }
  91. err = session.Start()
  92. if err != nil {
  93. klog.Errorln(err)
  94. session = nil
  95. return nil
  96. }
  97. go collect()
  98. processInfoCh := make(chan containers.ProcessInfo)
  99. targetFinder.start(processInfoCh)
  100. return processInfoCh
  101. }
  102. func Start() {
  103. if session == nil {
  104. return
  105. }
  106. targetFinder.now = time.Now()
  107. session.UpdateTargets(sd.TargetsOptions{})
  108. }
  109. func Stop() {
  110. if session != nil {
  111. session.Stop()
  112. }
  113. }
  114. func collect() {
  115. ticker := time.NewTicker(CollectInterval)
  116. defer ticker.Stop()
  117. tPrev := time.Now()
  118. for t := range ticker.C {
  119. tCurr := t
  120. session.UpdateTargets(sd.TargetsOptions{})
  121. bs := pprof.NewProfileBuilders(SampleRate)
  122. err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32) {
  123. p := targetFinder.get(pid)
  124. if p == nil {
  125. return
  126. }
  127. b := bs.BuilderForTarget(p.hash, labels.Labels{{Value: p.labels}})
  128. b.AddSample(stack, value)
  129. })
  130. klog.Infof("collected %d profiles in %s", len(bs.Builders), time.Since(t).Truncate(time.Millisecond))
  131. if err != nil {
  132. klog.Errorln(err)
  133. }
  134. t = time.Now()
  135. var uploaded int
  136. for _, b := range bs.Builders {
  137. err = upload(b, tPrev, tCurr)
  138. if err != nil {
  139. klog.Errorln(err)
  140. break
  141. }
  142. uploaded++
  143. }
  144. klog.Infof("uploaded %d profiles in %s", uploaded, time.Since(t).Truncate(time.Millisecond))
  145. tPrev = tCurr
  146. }
  147. }
  148. func upload(b *pprof.ProfileBuilder, from, until time.Time) error {
  149. u := *endpointUrl
  150. q := u.Query()
  151. q.Set("name", "ebpf"+b.Labels[0].Value)
  152. q.Set("format", "pprof")
  153. q.Set("from", strconv.Itoa(int(from.Unix())))
  154. q.Set("until", strconv.Itoa(int(until.Unix())))
  155. q.Set("spyName", "coroot-node-agent")
  156. u.RawQuery = q.Encode()
  157. b.Profile.SampleType[0].Type = "samples"
  158. b.Profile.SampleType[0].Unit = "samples"
  159. for _, s := range b.Profile.Sample {
  160. s.Value[0] = s.Value[0] / b.Profile.Period
  161. }
  162. body := bytes.NewBuffer(nil)
  163. _, err := b.Write(body)
  164. if err != nil {
  165. return err
  166. }
  167. req, err := http.NewRequest("POST", u.String(), body)
  168. if err != nil {
  169. return err
  170. }
  171. resp, err := httpClient.Do(req)
  172. if err != nil {
  173. return err
  174. }
  175. defer resp.Body.Close()
  176. respBody, err := io.ReadAll(resp.Body)
  177. if err != nil {
  178. return err
  179. }
  180. if resp.StatusCode != 200 {
  181. return fmt.Errorf("failed to upload %d: %s", resp.StatusCode, string(respBody))
  182. }
  183. return nil
  184. }
  185. type TargetFinder struct {
  186. processes map[uint32]*processInfo
  187. lock sync.Mutex
  188. now time.Time
  189. }
  190. func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
  191. go func() {
  192. for pi := range processInfoCh {
  193. tf.lock.Lock()
  194. tf.processes[pi.Pid] = &processInfo{
  195. containerId: string(pi.ContainerId),
  196. startedAt: pi.StartedAt,
  197. }
  198. tf.lock.Unlock()
  199. }
  200. }()
  201. }
  202. func (tf *TargetFinder) get(pid uint32) *processInfo {
  203. tf.lock.Lock()
  204. pi := tf.processes[pid]
  205. tf.lock.Unlock()
  206. if pi == nil {
  207. return nil
  208. }
  209. if tf.now.Sub(pi.startedAt) < CollectInterval {
  210. return nil
  211. }
  212. if pi.hash == 0 {
  213. pi.calcHashAndLabels()
  214. }
  215. return pi
  216. }
  217. func (tf *TargetFinder) FindTarget(pid uint32) *sd.Target {
  218. p := tf.get(pid)
  219. if p == nil {
  220. return nil
  221. }
  222. return &sd.Target{}
  223. }
  224. func (tf *TargetFinder) RemoveDeadPID(pid uint32) {
  225. tf.lock.Lock()
  226. defer tf.lock.Unlock()
  227. delete(tf.processes, pid)
  228. }
  229. func (tf *TargetFinder) DebugInfo() []string {
  230. return nil
  231. }
  232. func (tf *TargetFinder) Update(_ sd.TargetsOptions) {
  233. tf.now = time.Now()
  234. }
  235. type processInfo struct {
  236. containerId string
  237. startedAt time.Time
  238. labels string
  239. hash uint64
  240. }
  241. func (pi *processInfo) calcHashAndLabels() {
  242. hash := fnv.New64a()
  243. _, _ = hash.Write([]byte(pi.containerId))
  244. pi.hash = hash.Sum64()
  245. var buf bytes.Buffer
  246. buf.WriteByte('{')
  247. buf.WriteString("container_id")
  248. buf.WriteByte('=')
  249. buf.WriteString(pi.containerId)
  250. buf.WriteByte(',')
  251. buf.WriteString("service_name")
  252. buf.WriteByte('=')
  253. buf.WriteString(common.ContainerIdToOtelServiceName(pi.containerId))
  254. buf.WriteByte('}')
  255. pi.labels = buf.String()
  256. }