profiling.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. package profiling
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "github.com/coroot/coroot-node-agent/utils"
  7. "github.com/coroot/coroot-node-agent/utils/try"
  8. "hash/fnv"
  9. "io"
  10. "net/http"
  11. "net/url"
  12. "sync"
  13. "time"
  14. "github.com/coroot/coroot-node-agent/common"
  15. "github.com/coroot/coroot-node-agent/containers"
  16. "github.com/coroot/coroot-node-agent/flags"
  17. "github.com/go-kit/log"
  18. ebpfspy "github.com/grafana/pyroscope/ebpf"
  19. "github.com/grafana/pyroscope/ebpf/metrics"
  20. "github.com/grafana/pyroscope/ebpf/pprof"
  21. "github.com/grafana/pyroscope/ebpf/sd"
  22. "github.com/grafana/pyroscope/ebpf/symtab"
  23. "github.com/grafana/pyroscope/ebpf/symtab/elf"
  24. "github.com/prometheus/client_golang/prometheus"
  25. "github.com/prometheus/prometheus/model/labels"
  26. klog "github.com/sirupsen/logrus"
  27. )
  28. const (
  29. CollectInterval = time.Minute
  30. SampleRate = 100
  31. UploadTimeout = 10 * time.Second
  32. )
  33. var (
  34. constLabels labels.Labels
  35. httpClient = http.Client{
  36. Timeout: UploadTimeout,
  37. Transport: &http.Transport{
  38. TLSClientConfig: &tls.Config{InsecureSkipVerify: *flags.InsecureSkipVerify},
  39. },
  40. }
  41. endpointUrl *url.URL
  42. session ebpfspy.Session
  43. targetFinder = &TargetFinder{
  44. processes: map[uint32]*processInfo{},
  45. }
  46. )
  47. func Init(hostId, hostName string) chan<- containers.ProcessInfo {
  48. endpointUrl = *flags.ProfilesEndpoint
  49. if endpointUrl == nil {
  50. klog.Infoln("no profiles endpoint configured")
  51. return nil
  52. }
  53. klog.Infoln("profiles endpoint:", endpointUrl.String())
  54. constLabels = labels.Labels{
  55. {Name: "host.name", Value: hostName},
  56. {Name: "host.id", Value: hostId},
  57. }
  58. reg := prometheus.NewRegistry()
  59. so := ebpfspy.SessionOptions{
  60. CollectUser: true,
  61. CollectKernel: false,
  62. UnknownSymbolModuleOffset: true,
  63. UnknownSymbolAddress: false,
  64. PythonEnabled: true,
  65. CacheOptions: symtab.CacheOptions{
  66. PidCacheOptions: symtab.GCacheOptions{
  67. Size: 256,
  68. KeepRounds: 8,
  69. },
  70. BuildIDCacheOptions: symtab.GCacheOptions{
  71. Size: 256,
  72. KeepRounds: 8,
  73. },
  74. SameFileCacheOptions: symtab.GCacheOptions{
  75. Size: 256,
  76. KeepRounds: 8,
  77. },
  78. SymbolOptions: symtab.SymbolOptions{
  79. GoTableFallback: true,
  80. PythonFullFilePath: false,
  81. DemangleOptions: elf.DemangleFull,
  82. },
  83. },
  84. Metrics: &metrics.Metrics{
  85. Symtab: metrics.NewSymtabMetrics(reg),
  86. Python: metrics.NewPythonMetrics(reg),
  87. },
  88. SampleRate: SampleRate,
  89. }
  90. var err error
  91. session, err = ebpfspy.NewSession(log.NewNopLogger(), targetFinder, so)
  92. if err != nil {
  93. klog.Errorln(err)
  94. session = nil
  95. return nil
  96. }
  97. err = session.Start()
  98. if err != nil {
  99. klog.Errorln(err)
  100. session = nil
  101. return nil
  102. }
  103. //go collect()
  104. try.Go(collect, utils.CatchFn)
  105. processInfoCh := make(chan containers.ProcessInfo)
  106. targetFinder.start(processInfoCh)
  107. return processInfoCh
  108. }
  109. func Start() {
  110. if session == nil {
  111. return
  112. }
  113. targetFinder.now = time.Now()
  114. session.UpdateTargets(sd.TargetsOptions{})
  115. }
  116. func Stop() {
  117. if session != nil {
  118. session.Stop()
  119. }
  120. }
  121. func collect() {
  122. ticker := time.NewTicker(CollectInterval)
  123. defer ticker.Stop()
  124. for t := range ticker.C {
  125. session.UpdateTargets(sd.TargetsOptions{})
  126. bs := pprof.NewProfileBuilders(SampleRate)
  127. err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation ebpfspy.SampleAggregation) {
  128. pi := targetFinder.get(pid)
  129. if pi == nil {
  130. return
  131. }
  132. b := bs.BuilderForTarget(pi.hash, pi.labels)
  133. if aggregation == ebpfspy.SampleAggregated {
  134. b.CreateSample(stack, value)
  135. } else {
  136. b.CreateSampleOrAddValue(stack, value)
  137. }
  138. })
  139. klog.Infof("collected %d profiles in %s", len(bs.Builders), time.Since(t).Truncate(time.Millisecond))
  140. if err != nil {
  141. klog.Errorln(err)
  142. }
  143. t = time.Now()
  144. var uploaded int
  145. for _, b := range bs.Builders {
  146. err = upload(b)
  147. if err != nil {
  148. klog.Errorln(err)
  149. break
  150. }
  151. uploaded++
  152. }
  153. klog.Infof("uploaded %d profiles in %s", uploaded, time.Since(t).Truncate(time.Millisecond))
  154. }
  155. }
  156. func upload(b *pprof.ProfileBuilder) error {
  157. u := *endpointUrl
  158. q := u.Query()
  159. for _, l := range append(b.Labels, constLabels...) {
  160. q.Set(l.Name, l.Value)
  161. }
  162. u.RawQuery = q.Encode()
  163. b.Profile.SampleType[0].Type = "ebpf:cpu:nanoseconds"
  164. b.Profile.DurationNanos = CollectInterval.Nanoseconds()
  165. body := bytes.NewBuffer(nil)
  166. _, err := b.Write(body)
  167. if err != nil {
  168. return err
  169. }
  170. req, err := http.NewRequest(http.MethodPost, u.String(), body)
  171. if err != nil {
  172. return err
  173. }
  174. for k, v := range common.AuthHeaders() {
  175. req.Header.Set(k, v)
  176. }
  177. resp, err := httpClient.Do(req)
  178. if err != nil {
  179. return err
  180. }
  181. defer resp.Body.Close()
  182. respBody, err := io.ReadAll(resp.Body)
  183. if err != nil {
  184. return err
  185. }
  186. if resp.StatusCode != 200 {
  187. return fmt.Errorf("failed to upload %d: %s", resp.StatusCode, string(respBody))
  188. }
  189. return nil
  190. }
  191. type TargetFinder struct {
  192. processes map[uint32]*processInfo
  193. lock sync.Mutex
  194. now time.Time
  195. }
  196. func (tf *TargetFinder) start(processInfoCh <-chan containers.ProcessInfo) {
  197. try.Go(func() {
  198. for pi := range processInfoCh {
  199. tf.lock.Lock()
  200. tf.processes[pi.Pid] = &processInfo{
  201. containerId: string(pi.ContainerId),
  202. startedAt: pi.StartedAt,
  203. }
  204. tf.lock.Unlock()
  205. }
  206. }, utils.CatchFn)
  207. }
  208. func (tf *TargetFinder) get(pid uint32) *processInfo {
  209. tf.lock.Lock()
  210. pi := tf.processes[pid]
  211. tf.lock.Unlock()
  212. if pi == nil {
  213. return nil
  214. }
  215. if tf.now.Sub(pi.startedAt) < CollectInterval {
  216. return nil
  217. }
  218. if pi.hash == 0 {
  219. pi.calcHashAndLabels()
  220. }
  221. return pi
  222. }
  223. func (tf *TargetFinder) FindTarget(pid uint32) *sd.Target {
  224. p := tf.get(pid)
  225. if p == nil {
  226. return nil
  227. }
  228. return &sd.Target{}
  229. }
  230. func (tf *TargetFinder) RemoveDeadPID(pid uint32) {
  231. tf.lock.Lock()
  232. defer tf.lock.Unlock()
  233. delete(tf.processes, pid)
  234. }
  235. func (tf *TargetFinder) DebugInfo() []string {
  236. return nil
  237. }
  238. func (tf *TargetFinder) Update(_ sd.TargetsOptions) {
  239. tf.now = time.Now()
  240. }
  241. type processInfo struct {
  242. containerId string
  243. startedAt time.Time
  244. labels labels.Labels
  245. hash uint64
  246. }
  247. func (pi *processInfo) calcHashAndLabels() {
  248. hash := fnv.New64a()
  249. _, _ = hash.Write([]byte(pi.containerId))
  250. pi.hash = hash.Sum64()
  251. pi.labels = labels.Labels{
  252. {Name: "service.name", Value: common.ContainerIdToOtelServiceName(pi.containerId)},
  253. {Name: "container.id", Value: pi.containerId},
  254. }
  255. }