profiling.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. package profiling
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "hash/fnv"
  7. "io"
  8. "net/http"
  9. "net/url"
  10. "sync"
  11. "time"
  12. "github.com/coroot/coroot-node-agent/common"
  13. "github.com/coroot/coroot-node-agent/containers"
  14. "github.com/coroot/coroot-node-agent/flags"
  15. "github.com/go-kit/log"
  16. ebpfspy "github.com/grafana/pyroscope/ebpf"
  17. "github.com/grafana/pyroscope/ebpf/metrics"
  18. "github.com/grafana/pyroscope/ebpf/pprof"
  19. "github.com/grafana/pyroscope/ebpf/sd"
  20. "github.com/grafana/pyroscope/ebpf/symtab"
  21. "github.com/grafana/pyroscope/ebpf/symtab/elf"
  22. "github.com/prometheus/client_golang/prometheus"
  23. "github.com/prometheus/prometheus/model/labels"
  24. klog "github.com/sirupsen/logrus"
  25. )
  26. const (
  27. CollectInterval = time.Minute
  28. SampleRate = 100
  29. UploadTimeout = 10 * time.Second
  30. )
  31. var (
  32. constLabels labels.Labels
  33. httpClient = http.Client{
  34. Timeout: UploadTimeout,
  35. Transport: &http.Transport{
  36. TLSClientConfig: &tls.Config{InsecureSkipVerify: *flags.InsecureSkipVerify},
  37. },
  38. }
  39. endpointUrl *url.URL
  40. session ebpfspy.Session
  41. targetFinder = &TargetFinder{
  42. processes: map[uint32]*processInfo{},
  43. }
  44. )
  45. func Init(hostId, hostName string) chan<- containers.ProcessInfo {
  46. endpointUrl = *flags.ProfilesEndpoint
  47. if endpointUrl == nil {
  48. klog.Infoln("no profiles endpoint configured")
  49. return nil
  50. }
  51. klog.Infoln("profiles endpoint:", endpointUrl.String())
  52. constLabels = labels.Labels{
  53. {Name: "host.name", Value: hostName},
  54. {Name: "host.id", Value: hostId},
  55. }
  56. reg := prometheus.NewRegistry()
  57. so := ebpfspy.SessionOptions{
  58. CollectUser: true,
  59. CollectKernel: false,
  60. UnknownSymbolModuleOffset: true,
  61. UnknownSymbolAddress: false,
  62. PythonEnabled: true,
  63. CacheOptions: symtab.CacheOptions{
  64. PidCacheOptions: symtab.GCacheOptions{
  65. Size: 256,
  66. KeepRounds: 8,
  67. },
  68. BuildIDCacheOptions: symtab.GCacheOptions{
  69. Size: 256,
  70. KeepRounds: 8,
  71. },
  72. SameFileCacheOptions: symtab.GCacheOptions{
  73. Size: 256,
  74. KeepRounds: 8,
  75. },
  76. SymbolOptions: symtab.SymbolOptions{
  77. GoTableFallback: true,
  78. PythonFullFilePath: false,
  79. DemangleOptions: elf.DemangleFull,
  80. },
  81. },
  82. Metrics: &metrics.Metrics{
  83. Symtab: metrics.NewSymtabMetrics(reg),
  84. Python: metrics.NewPythonMetrics(reg),
  85. },
  86. SampleRate: SampleRate,
  87. }
  88. var err error
  89. session, err = ebpfspy.NewSession(log.NewNopLogger(), targetFinder, so)
  90. if err != nil {
  91. klog.Errorln(err)
  92. session = nil
  93. return nil
  94. }
  95. err = session.Start()
  96. if err != nil {
  97. klog.Errorln(err)
  98. session = nil
  99. return nil
  100. }
  101. go collect()
  102. processInfoCh := make(chan containers.ProcessInfo)
  103. targetFinder.start(processInfoCh)
  104. return processInfoCh
  105. }
  106. func Start() {
  107. if session == nil {
  108. return
  109. }
  110. targetFinder.now = time.Now()
  111. session.UpdateTargets(sd.TargetsOptions{})
  112. }
  113. func Stop() {
  114. if session != nil {
  115. session.Stop()
  116. }
  117. }
  118. func collect() {
  119. ticker := time.NewTicker(CollectInterval)
  120. defer ticker.Stop()
  121. for t := range ticker.C {
  122. session.UpdateTargets(sd.TargetsOptions{})
  123. bs := pprof.NewProfileBuilders(SampleRate)
  124. err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation ebpfspy.SampleAggregation) {
  125. pi := targetFinder.get(pid)
  126. if pi == nil {
  127. return
  128. }
  129. b := bs.BuilderForTarget(pi.hash, pi.labels)
  130. if aggregation == ebpfspy.SampleAggregated {
  131. b.CreateSample(stack, value)
  132. } else {
  133. b.CreateSampleOrAddValue(stack, value)
  134. }
  135. })
  136. klog.Infof("collected %d profiles in %s", len(bs.Builders), time.Since(t).Truncate(time.Millisecond))
  137. if err != nil {
  138. klog.Errorln(err)
  139. }
  140. t = time.Now()
  141. var uploaded int
  142. for _, b := range bs.Builders {
  143. err = upload(b)
  144. if err != nil {
  145. klog.Errorln(err)
  146. break
  147. }
  148. uploaded++
  149. }
  150. klog.Infof("uploaded %d profiles in %s", uploaded, time.Since(t).Truncate(time.Millisecond))
  151. }
  152. }
  153. func upload(b *pprof.ProfileBuilder) error {
  154. u := *endpointUrl
  155. q := u.Query()
  156. for _, l := range append(b.Labels, constLabels...) {
  157. q.Set(l.Name, l.Value)
  158. }
  159. u.RawQuery = q.Encode()
  160. b.Profile.SampleType[0].Type = "ebpf:cpu:nanoseconds"
  161. b.Profile.DurationNanos = CollectInterval.Nanoseconds()
  162. body := bytes.NewBuffer(nil)
  163. _, err := b.Write(body)
  164. if err != nil {
  165. return err
  166. }
  167. req, err := http.NewRequest(http.MethodPost, u.String(), body)
  168. if err != nil {
  169. return err
  170. }
  171. for k, v := range common.AuthHeaders() {
  172. req.Header.Set(k, v)
  173. }
  174. resp, err := httpClient.Do(req)
  175. if err != nil {
  176. return err
  177. }
  178. defer resp.Body.Close()
  179. respBody, err := io.ReadAll(resp.Body)
  180. if err != nil {
  181. return err
  182. }
  183. if resp.StatusCode != 200 {
  184. return fmt.Errorf("failed to upload %d: %s", resp.StatusCode, string(respBody))
  185. }
  186. return nil
  187. }
  188. type TargetFinder struct {
  189. processes map[uint32]*processInfo
  190. lock sync.Mutex
  191. now time.Time
  192. }
  193. func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
  194. go func() {
  195. for pi := range processInfoCh {
  196. tf.lock.Lock()
  197. tf.processes[pi.Pid] = &processInfo{
  198. containerId: string(pi.ContainerId),
  199. startedAt: pi.StartedAt,
  200. }
  201. tf.lock.Unlock()
  202. }
  203. }()
  204. }
  205. func (tf *TargetFinder) get(pid uint32) *processInfo {
  206. tf.lock.Lock()
  207. pi := tf.processes[pid]
  208. tf.lock.Unlock()
  209. if pi == nil {
  210. return nil
  211. }
  212. if tf.now.Sub(pi.startedAt) < CollectInterval {
  213. return nil
  214. }
  215. if pi.hash == 0 {
  216. pi.calcHashAndLabels()
  217. }
  218. return pi
  219. }
  220. func (tf *TargetFinder) FindTarget(pid uint32) *sd.Target {
  221. p := tf.get(pid)
  222. if p == nil {
  223. return nil
  224. }
  225. return &sd.Target{}
  226. }
  227. func (tf *TargetFinder) RemoveDeadPID(pid uint32) {
  228. tf.lock.Lock()
  229. defer tf.lock.Unlock()
  230. delete(tf.processes, pid)
  231. }
  232. func (tf *TargetFinder) DebugInfo() []string {
  233. return nil
  234. }
  235. func (tf *TargetFinder) Update(_ sd.TargetsOptions) {
  236. tf.now = time.Now()
  237. }
  238. type processInfo struct {
  239. containerId string
  240. startedAt time.Time
  241. labels labels.Labels
  242. hash uint64
  243. }
  244. func (pi *processInfo) calcHashAndLabels() {
  245. hash := fnv.New64a()
  246. _, _ = hash.Write([]byte(pi.containerId))
  247. pi.hash = hash.Sum64()
  248. pi.labels = labels.Labels{
  249. {Name: "service.name", Value: common.ContainerIdToOtelServiceName(pi.containerId)},
  250. {Name: "container.id", Value: pi.containerId},
  251. }
  252. }