client_apm.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. package otlptracehttp // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
  2. import (
  3. "bytes"
  4. "compress/gzip"
  5. "context"
  6. "encoding/json"
  7. "fmt"
  8. "github.com/coroot/coroot-node-agent/utils"
  9. . "github.com/coroot/coroot-node-agent/utils/modelse"
  10. "github.com/klauspost/compress/zstd"
  11. klog "github.com/sirupsen/logrus"
  12. "go.opentelemetry.io/otel"
  13. "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
  14. "io"
  15. "net/http"
  16. "net/url"
  17. "strconv"
  18. )
  19. func (d *client) UploadApmTraces(ctx context.Context, rootData []otlptrace.RootDataT, codeType int) error {
  20. //pbRequest := &coltracepb.ExportTraceServiceRequest{
  21. // ResourceSpans: protoSpans,
  22. //}
  23. //klog.Infoln("enter the UploadApmTraces newApmRequest-----")
  24. rawRequest, err := json.Marshal(rootData)
  25. if err != nil {
  26. return err
  27. }
  28. ctx, cancel := d.contextWithStop(ctx)
  29. defer cancel()
  30. mapLen := len(rootData)
  31. klog.Debugln("enter the UploadApmTraces newApmRequest")
  32. request, err := d.newApmRequest(rawRequest, mapLen, CodeType(codeType))
  33. if err != nil {
  34. return err
  35. }
  36. return d.requestFunc(ctx, func(ctx context.Context) error {
  37. select {
  38. case <-ctx.Done():
  39. return ctx.Err()
  40. default:
  41. }
  42. request.reset(ctx)
  43. resp, err := d.client.Do(request.Request)
  44. if err != nil {
  45. return err
  46. }
  47. if resp != nil && resp.Body != nil {
  48. defer func() {
  49. if err := resp.Body.Close(); err != nil {
  50. otel.Handle(err)
  51. }
  52. }()
  53. }
  54. switch sc := resp.StatusCode; {
  55. case sc >= 200 && sc <= 299:
  56. // Success, do not retry.
  57. // Read the partial success message, if any.
  58. var respData bytes.Buffer
  59. if _, err := io.Copy(&respData, resp.Body); err != nil {
  60. return err
  61. }
  62. if respData.Len() != 0 {
  63. var respJsonData RespDataT
  64. if err := json.Unmarshal(respData.Bytes(), &respJsonData); err != nil {
  65. return err
  66. }
  67. msg := respJsonData.Msg
  68. code := respJsonData.Code
  69. klog.Infof("data/receive url: %s, response body: %s\n", request.Request.URL.String(), respData.String())
  70. if msg != "send ok" || code != 1000 {
  71. return fmt.Errorf("resp error msg:<%s> code:<%d>", msg, code)
  72. }
  73. }
  74. return nil
  75. case sc == http.StatusTooManyRequests, sc == http.StatusServiceUnavailable:
  76. // Retry-able failures. Drain the body to reuse the connection.
  77. if _, err := io.Copy(io.Discard, resp.Body); err != nil {
  78. otel.Handle(err)
  79. }
  80. return newResponseError(resp.Header)
  81. default:
  82. return fmt.Errorf("failed to send to %s: %s", request.URL, resp.Status)
  83. }
  84. })
  85. }
  86. type RespDataT struct {
  87. Code int `json:"code"`
  88. Msg string `json:"message"`
  89. }
  90. func (d *client) newApmRequest(body []byte, mapLen int, codeType CodeType) (request, error) {
  91. u := url.URL{Scheme: d.getScheme(), Host: d.cfg.Endpoint, Path: d.cfg.URLPath}
  92. r, err := http.NewRequest(http.MethodPost, u.String(), nil)
  93. if err != nil {
  94. return request{Request: r}, err
  95. }
  96. userAgent := "OTel OTLP Exporter Go/" + otlptrace.Version()
  97. r.Header.Set("User-Agent", userAgent)
  98. for k, v := range d.cfg.Headers {
  99. r.Header.Set(k, v)
  100. }
  101. //r.Header.Set("Content-Type", contentTypeProto)
  102. r.Header.Set("Content-Type", "text/plain;charset=utf-8")
  103. r.Header.Set("routingKey", codeType.Topic())
  104. klog.Debugf("codeType %d,routingKey:%s", codeType, codeType.Topic())
  105. r.Header.Set("DataCount", strconv.Itoa(mapLen))
  106. // 对接op
  107. r.Header.Set("AccountId", strconv.Itoa(utils.GetAccountID()))
  108. req := request{Request: r}
  109. // icase := ZstdCompression
  110. switch Compression(d.cfg.Compression) {
  111. // switch Compression(icase) {
  112. case NoCompression:
  113. r.ContentLength = (int64)(len(body))
  114. req.bodyReader = bodyReader(body)
  115. klog.Infoln("enter the NoCompression newApmRequest")
  116. case GzipCompression:
  117. klog.Infoln("enter the GzipCompression newApmRequest")
  118. // Ensure the content length is not used.
  119. r.ContentLength = -1
  120. r.Header.Set("Content-Encoding", "gzip")
  121. gz := gzPool.Get().(*gzip.Writer)
  122. defer gzPool.Put(gz)
  123. var b bytes.Buffer
  124. gz.Reset(&b)
  125. if _, err := gz.Write(body); err != nil {
  126. return req, err
  127. }
  128. // Close needs to be called to ensure body if fully written.
  129. if err := gz.Close(); err != nil {
  130. return req, err
  131. }
  132. req.bodyReader = bodyReader(b.Bytes())
  133. case ZstdCompression:
  134. klog.Debugln("enter the ZstdCompression newApmRequest")
  135. r.ContentLength = -1
  136. r.Header.Set("Content-Encoding", "zstd")
  137. encoder, err := zstd.NewWriter(nil)
  138. if err != nil {
  139. return req, err
  140. }
  141. defer encoder.Close()
  142. compressedData := encoder.EncodeAll(body, make([]byte, 0, len(body)))
  143. req.bodyReader = bodyReader(compressedData)
  144. }
  145. return req, nil
  146. }