profiling.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. package profiling
  2. import (
  3. "bytes"
  4. "fmt"
  5. "hash/fnv"
  6. "io"
  7. "net/http"
  8. "net/url"
  9. "sync"
  10. "time"
  11. "github.com/coroot/coroot-node-agent/common"
  12. "github.com/coroot/coroot-node-agent/containers"
  13. "github.com/coroot/coroot-node-agent/flags"
  14. "github.com/go-kit/log"
  15. ebpfspy "github.com/grafana/pyroscope/ebpf"
  16. "github.com/grafana/pyroscope/ebpf/metrics"
  17. "github.com/grafana/pyroscope/ebpf/pprof"
  18. "github.com/grafana/pyroscope/ebpf/sd"
  19. "github.com/grafana/pyroscope/ebpf/symtab"
  20. "github.com/grafana/pyroscope/ebpf/symtab/elf"
  21. "github.com/prometheus/client_golang/prometheus"
  22. "github.com/prometheus/prometheus/model/labels"
  23. "k8s.io/klog/v2"
  24. )
  25. const (
  26. CollectInterval = time.Minute
  27. SampleRate = 100
  28. UploadTimeout = 10 * time.Second
  29. )
  30. var (
  31. constLabels labels.Labels
  32. httpClient = http.Client{
  33. Timeout: UploadTimeout,
  34. }
  35. endpointUrl *url.URL
  36. session ebpfspy.Session
  37. targetFinder = &TargetFinder{
  38. processes: map[uint32]*processInfo{},
  39. }
  40. )
  41. func Init(hostId, hostName string) chan<- containers.ProcessInfo {
  42. endpointUrl = *flags.ProfilesEndpoint
  43. if endpointUrl == nil {
  44. klog.Infoln("no profiles endpoint configured")
  45. return nil
  46. }
  47. klog.Infoln("profiles endpoint:", endpointUrl.String())
  48. constLabels = labels.Labels{
  49. {Name: "host.name", Value: hostName},
  50. {Name: "host.id", Value: hostId},
  51. }
  52. reg := prometheus.NewRegistry()
  53. so := ebpfspy.SessionOptions{
  54. CollectUser: true,
  55. CollectKernel: false,
  56. UnknownSymbolModuleOffset: true,
  57. UnknownSymbolAddress: false,
  58. PythonEnabled: true,
  59. CacheOptions: symtab.CacheOptions{
  60. PidCacheOptions: symtab.GCacheOptions{
  61. Size: 256,
  62. KeepRounds: 8,
  63. },
  64. BuildIDCacheOptions: symtab.GCacheOptions{
  65. Size: 256,
  66. KeepRounds: 8,
  67. },
  68. SameFileCacheOptions: symtab.GCacheOptions{
  69. Size: 256,
  70. KeepRounds: 8,
  71. },
  72. SymbolOptions: symtab.SymbolOptions{
  73. GoTableFallback: true,
  74. PythonFullFilePath: false,
  75. DemangleOptions: elf.DemangleFull,
  76. },
  77. },
  78. Metrics: &metrics.Metrics{
  79. Symtab: metrics.NewSymtabMetrics(reg),
  80. Python: metrics.NewPythonMetrics(reg),
  81. },
  82. SampleRate: SampleRate,
  83. }
  84. var err error
  85. session, err = ebpfspy.NewSession(log.NewNopLogger(), targetFinder, so)
  86. if err != nil {
  87. klog.Errorln(err)
  88. session = nil
  89. return nil
  90. }
  91. err = session.Start()
  92. if err != nil {
  93. klog.Errorln(err)
  94. session = nil
  95. return nil
  96. }
  97. go collect()
  98. processInfoCh := make(chan containers.ProcessInfo)
  99. targetFinder.start(processInfoCh)
  100. return processInfoCh
  101. }
  102. func Start() {
  103. if session == nil {
  104. return
  105. }
  106. targetFinder.now = time.Now()
  107. session.UpdateTargets(sd.TargetsOptions{})
  108. }
  109. func Stop() {
  110. if session != nil {
  111. session.Stop()
  112. }
  113. }
  114. func collect() {
  115. ticker := time.NewTicker(CollectInterval)
  116. defer ticker.Stop()
  117. for t := range ticker.C {
  118. session.UpdateTargets(sd.TargetsOptions{})
  119. bs := pprof.NewProfileBuilders(SampleRate)
  120. err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation ebpfspy.SampleAggregation) {
  121. pi := targetFinder.get(pid)
  122. if pi == nil {
  123. return
  124. }
  125. b := bs.BuilderForTarget(pi.hash, pi.labels)
  126. if aggregation == ebpfspy.SampleAggregated {
  127. b.CreateSample(stack, value)
  128. } else {
  129. b.CreateSampleOrAddValue(stack, value)
  130. }
  131. })
  132. klog.Infof("collected %d profiles in %s", len(bs.Builders), time.Since(t).Truncate(time.Millisecond))
  133. if err != nil {
  134. klog.Errorln(err)
  135. }
  136. t = time.Now()
  137. var uploaded int
  138. for _, b := range bs.Builders {
  139. err = upload(b)
  140. if err != nil {
  141. klog.Errorln(err)
  142. break
  143. }
  144. uploaded++
  145. }
  146. klog.Infof("uploaded %d profiles in %s", uploaded, time.Since(t).Truncate(time.Millisecond))
  147. }
  148. }
  149. func upload(b *pprof.ProfileBuilder) error {
  150. u := *endpointUrl
  151. q := u.Query()
  152. for _, l := range append(b.Labels, constLabels...) {
  153. q.Set(l.Name, l.Value)
  154. }
  155. u.RawQuery = q.Encode()
  156. b.Profile.SampleType[0].Type = "ebpf:cpu:nanoseconds"
  157. b.Profile.DurationNanos = CollectInterval.Nanoseconds()
  158. body := bytes.NewBuffer(nil)
  159. _, err := b.Write(body)
  160. if err != nil {
  161. return err
  162. }
  163. req, err := http.NewRequest(http.MethodPost, u.String(), body)
  164. if err != nil {
  165. return err
  166. }
  167. for k, v := range common.AuthHeaders() {
  168. req.Header.Set(k, v)
  169. }
  170. resp, err := httpClient.Do(req)
  171. if err != nil {
  172. return err
  173. }
  174. defer resp.Body.Close()
  175. respBody, err := io.ReadAll(resp.Body)
  176. if err != nil {
  177. return err
  178. }
  179. if resp.StatusCode != 200 {
  180. return fmt.Errorf("failed to upload %d: %s", resp.StatusCode, string(respBody))
  181. }
  182. return nil
  183. }
  184. type TargetFinder struct {
  185. processes map[uint32]*processInfo
  186. lock sync.Mutex
  187. now time.Time
  188. }
  189. func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
  190. go func() {
  191. for pi := range processInfoCh {
  192. tf.lock.Lock()
  193. tf.processes[pi.Pid] = &processInfo{
  194. containerId: string(pi.ContainerId),
  195. startedAt: pi.StartedAt,
  196. }
  197. tf.lock.Unlock()
  198. }
  199. }()
  200. }
  201. func (tf *TargetFinder) get(pid uint32) *processInfo {
  202. tf.lock.Lock()
  203. pi := tf.processes[pid]
  204. tf.lock.Unlock()
  205. if pi == nil {
  206. return nil
  207. }
  208. if tf.now.Sub(pi.startedAt) < CollectInterval {
  209. return nil
  210. }
  211. if pi.hash == 0 {
  212. pi.calcHashAndLabels()
  213. }
  214. return pi
  215. }
  216. func (tf *TargetFinder) FindTarget(pid uint32) *sd.Target {
  217. p := tf.get(pid)
  218. if p == nil {
  219. return nil
  220. }
  221. return &sd.Target{}
  222. }
  223. func (tf *TargetFinder) RemoveDeadPID(pid uint32) {
  224. tf.lock.Lock()
  225. defer tf.lock.Unlock()
  226. delete(tf.processes, pid)
  227. }
  228. func (tf *TargetFinder) DebugInfo() []string {
  229. return nil
  230. }
  231. func (tf *TargetFinder) Update(_ sd.TargetsOptions) {
  232. tf.now = time.Now()
  233. }
  234. type processInfo struct {
  235. containerId string
  236. startedAt time.Time
  237. labels labels.Labels
  238. hash uint64
  239. }
  240. func (pi *processInfo) calcHashAndLabels() {
  241. hash := fnv.New64a()
  242. _, _ = hash.Write([]byte(pi.containerId))
  243. pi.hash = hash.Sum64()
  244. pi.labels = labels.Labels{
  245. {Name: "service.name", Value: common.ContainerIdToOtelServiceName(pi.containerId)},
  246. {Name: "container.id", Value: pi.containerId},
  247. }
  248. }