profiling.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package profiling
  2. import (
  3. "bytes"
  4. "fmt"
  5. "hash/fnv"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "sync"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/common"
  12. "github.com/coroot/coroot-node-agent/containers"
  13. "github.com/coroot/coroot-node-agent/flags"
  14. "github.com/go-kit/log"
  15. ebpfspy "github.com/grafana/pyroscope/ebpf"
  16. "github.com/grafana/pyroscope/ebpf/metrics"
  17. "github.com/grafana/pyroscope/ebpf/pprof"
  18. "github.com/grafana/pyroscope/ebpf/sd"
  19. "github.com/grafana/pyroscope/ebpf/symtab"
  20. "github.com/grafana/pyroscope/ebpf/symtab/elf"
  21. "github.com/prometheus/client_golang/prometheus"
  22. "github.com/prometheus/prometheus/model/labels"
  23. "k8s.io/klog/v2"
  24. )
  25. const (
  26. CollectInterval = time.Minute
  27. SampleRate = 100
  28. UploadTimeout = 10 * time.Second
  29. )
  30. var (
  31. constLabels labels.Labels
  32. httpClient = http.Client{
  33. Timeout: UploadTimeout,
  34. }
  35. endpointUrl *url.URL
  36. session ebpfspy.Session
  37. targetFinder = &TargetFinder{
  38. processes: map[uint32]*processInfo{},
  39. }
  40. )
  41. func Init(hostId, hostName string) chan<- containers.ProcessInfo {
  42. endpointUrl = *flags.ProfilesEndpoint
  43. if endpointUrl == nil {
  44. klog.Infoln("no profiles endpoint configured")
  45. return nil
  46. }
  47. klog.Infoln("profiles endpoint:", endpointUrl.String())
  48. constLabels = labels.Labels{
  49. {Name: "host.name", Value: hostName},
  50. {Name: "host.id", Value: hostId},
  51. {Name: "profile.source", Value: "ebpf"},
  52. }
  53. reg := prometheus.NewRegistry()
  54. so := ebpfspy.SessionOptions{
  55. CollectUser: true,
  56. CollectKernel: false,
  57. UnknownSymbolModuleOffset: true,
  58. UnknownSymbolAddress: false,
  59. PythonEnabled: true,
  60. CacheOptions: symtab.CacheOptions{
  61. PidCacheOptions: symtab.GCacheOptions{
  62. Size: 256,
  63. KeepRounds: 8,
  64. },
  65. BuildIDCacheOptions: symtab.GCacheOptions{
  66. Size: 256,
  67. KeepRounds: 8,
  68. },
  69. SameFileCacheOptions: symtab.GCacheOptions{
  70. Size: 256,
  71. KeepRounds: 8,
  72. },
  73. SymbolOptions: symtab.SymbolOptions{
  74. GoTableFallback: true,
  75. PythonFullFilePath: false,
  76. DemangleOptions: elf.DemangleFull,
  77. },
  78. },
  79. Metrics: &metrics.Metrics{
  80. Symtab: metrics.NewSymtabMetrics(reg),
  81. Python: metrics.NewPythonMetrics(reg),
  82. },
  83. SampleRate: SampleRate,
  84. }
  85. var err error
  86. session, err = ebpfspy.NewSession(log.NewNopLogger(), targetFinder, so)
  87. if err != nil {
  88. klog.Errorln(err)
  89. session = nil
  90. return nil
  91. }
  92. err = session.Start()
  93. if err != nil {
  94. klog.Errorln(err)
  95. session = nil
  96. return nil
  97. }
  98. go collect()
  99. processInfoCh := make(chan containers.ProcessInfo)
  100. targetFinder.start(processInfoCh)
  101. return processInfoCh
  102. }
  103. func Start() {
  104. if session == nil {
  105. return
  106. }
  107. targetFinder.now = time.Now()
  108. session.UpdateTargets(sd.TargetsOptions{})
  109. }
  110. func Stop() {
  111. if session != nil {
  112. session.Stop()
  113. }
  114. }
  115. func collect() {
  116. ticker := time.NewTicker(CollectInterval)
  117. defer ticker.Stop()
  118. for t := range ticker.C {
  119. session.UpdateTargets(sd.TargetsOptions{})
  120. bs := pprof.NewProfileBuilders(SampleRate)
  121. err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation ebpfspy.SampleAggregation) {
  122. pi := targetFinder.get(pid)
  123. if pi == nil {
  124. return
  125. }
  126. b := bs.BuilderForTarget(pi.hash, pi.labels)
  127. if aggregation == ebpfspy.SampleAggregated {
  128. b.CreateSample(stack, value)
  129. } else {
  130. b.CreateSampleOrAddValue(stack, value)
  131. }
  132. })
  133. klog.Infof("collected %d profiles in %s", len(bs.Builders), time.Since(t).Truncate(time.Millisecond))
  134. if err != nil {
  135. klog.Errorln(err)
  136. }
  137. t = time.Now()
  138. var uploaded int
  139. for _, b := range bs.Builders {
  140. err = upload(b)
  141. if err != nil {
  142. klog.Errorln(err)
  143. break
  144. }
  145. uploaded++
  146. }
  147. klog.Infof("uploaded %d profiles in %s", uploaded, time.Since(t).Truncate(time.Millisecond))
  148. }
  149. }
  150. func upload(b *pprof.ProfileBuilder) error {
  151. u := *endpointUrl
  152. q := u.Query()
  153. for _, l := range append(b.Labels, constLabels...) {
  154. q.Set(l.Name, l.Value)
  155. }
  156. u.RawQuery = q.Encode()
  157. b.Profile.DurationNanos = CollectInterval.Nanoseconds()
  158. body := bytes.NewBuffer(nil)
  159. _, err := b.Write(body)
  160. if err != nil {
  161. return err
  162. }
  163. req, err := http.NewRequest("POST", u.String(), body)
  164. if err != nil {
  165. return err
  166. }
  167. for k, v := range common.AuthHeaders() {
  168. req.Header.Set(k, v)
  169. }
  170. resp, err := httpClient.Do(req)
  171. if err != nil {
  172. return err
  173. }
  174. defer resp.Body.Close()
  175. respBody, err := io.ReadAll(resp.Body)
  176. if err != nil {
  177. return err
  178. }
  179. if resp.StatusCode != 200 {
  180. return fmt.Errorf("failed to upload %d: %s", resp.StatusCode, string(respBody))
  181. }
  182. return nil
  183. }
  184. type TargetFinder struct {
  185. processes map[uint32]*processInfo
  186. lock sync.Mutex
  187. now time.Time
  188. }
  189. func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
  190. go func() {
  191. for pi := range processInfoCh {
  192. tf.lock.Lock()
  193. tf.processes[pi.Pid] = &processInfo{
  194. containerId: string(pi.ContainerId),
  195. startedAt: pi.StartedAt,
  196. }
  197. tf.lock.Unlock()
  198. }
  199. }()
  200. }
  201. func (tf *TargetFinder) get(pid uint32) *processInfo {
  202. tf.lock.Lock()
  203. pi := tf.processes[pid]
  204. tf.lock.Unlock()
  205. if pi == nil {
  206. return nil
  207. }
  208. if tf.now.Sub(pi.startedAt) < CollectInterval {
  209. return nil
  210. }
  211. if pi.hash == 0 {
  212. pi.calcHashAndLabels()
  213. }
  214. return pi
  215. }
  216. func (tf *TargetFinder) FindTarget(pid uint32) *sd.Target {
  217. p := tf.get(pid)
  218. if p == nil {
  219. return nil
  220. }
  221. return &sd.Target{}
  222. }
  223. func (tf *TargetFinder) RemoveDeadPID(pid uint32) {
  224. tf.lock.Lock()
  225. defer tf.lock.Unlock()
  226. delete(tf.processes, pid)
  227. }
  228. func (tf *TargetFinder) DebugInfo() []string {
  229. return nil
  230. }
  231. func (tf *TargetFinder) Update(_ sd.TargetsOptions) {
  232. tf.now = time.Now()
  233. }
  234. type processInfo struct {
  235. containerId string
  236. startedAt time.Time
  237. labels labels.Labels
  238. hash uint64
  239. }
  240. func (pi *processInfo) calcHashAndLabels() {
  241. hash := fnv.New64a()
  242. _, _ = hash.Write([]byte(pi.containerId))
  243. pi.hash = hash.Sum64()
  244. pi.labels = labels.Labels{
  245. {Name: "service.name", Value: common.ContainerIdToOtelServiceName(pi.containerId)},
  246. {Name: "container.id", Value: pi.containerId},
  247. }
  248. }